
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [158]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [159]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [160]:

from pyspark.sql.types import IntegerType,StringType,StructField,StructType,DateType,DoubleType

schema = StructType([StructField("Index", IntegerType(), True),
	StructField("Date", DateType(), True),
	StructField("Time", StringType(), True),
	StructField("Total_Precipitation", DoubleType(), True),
	StructField("Atmospheric_Pressure", DoubleType(), True),
	StructField("Maximum_Atmospheric_Pressure", DoubleType(), True),
	StructField("Minimum_Atmospheric_Pressure", DoubleType(), True),
	StructField("Solar_Radiation", DoubleType(), True),
	StructField("Air_Temperature", DoubleType(), True),
	StructField("Dew_Point_Temperature", DoubleType(), True),
	StructField("Maximum_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Minimum_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Maximum_Air_Pressure_Dew_Point_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Minimum_Air_Pressure_Dew_Point_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Maximum_Relative_Humid_Temperature_For_The_Last_Hour", DoubleType(), True),	
	StructField("Minimum_Relative_Humid_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Air_Relative_Humid_Temperature_For_The_Last_Hour", DoubleType(), True),
	StructField("Wind_Direction_Radius_Degree", DoubleType(), True),
	StructField("Wind_Gust_In_Metres_Per_Second", DoubleType(), True),
	StructField("Wind_speed_In_Metres_Per_Second", DoubleType(), True),
    StructField("Region", StringType(), True),
	StructField("State", StringType(), True),
	StructField("Station_Name", StringType(), True),
	StructField("Station_Code", StringType(), True),
	StructField("Latitude", DoubleType(), True),
	StructField("Longitude", DoubleType(), True),
	StructField("Elevation", DoubleType(), True)
	])

In [161]:
file_location = "file:///home/talentum/shared/CDAC_Project_Weather_Anaysis/central_west.csv"
df1 = spark.read.csv(file_location,header=True,schema=schema)

In [162]:
df1.show(5)

+------+----------+-----+-------------------+--------------------+----------------------------+----------------------------+---------------+---------------+---------------------+-------------------------------------+-------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------------------------------------------------+----------------------------------------------------+------------------------------------------------+----------------------------+------------------------------+-------------------------------+------+-----+------------------+------------+------------+---------+---------+
| Index|      Date| Time|Total_Precipitation|Atmospheric_Pressure|Maximum_Atmospheric_Pressure|Minimum_Atmospheric_Pressure|Solar_Radiation|Air_Temperature|Dew_Point_Temperature|Maximum_Temperature_For_The_Last_Hour|Minimum_Temperature_For_The_Last_Hour|Maximum_Air_Pressure_Dew_Point_Temperature_F

In [166]:
df1.filter(df1.Date=='2000-09-10').show(5)

+-----+----------+-----+-------------------+--------------------+----------------------------+----------------------------+---------------+---------------+---------------------+-------------------------------------+-------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------------------------------------------------+----------------------------------------------------+------------------------------------------------+----------------------------+------------------------------+-------------------------------+------+-----+------------+------------+------------+------------+---------+
|Index|      Date| Time|Total_Precipitation|Atmospheric_Pressure|Maximum_Atmospheric_Pressure|Minimum_Atmospheric_Pressure|Solar_Radiation|Air_Temperature|Dew_Point_Temperature|Maximum_Temperature_For_The_Last_Hour|Minimum_Temperature_For_The_Last_Hour|Maximum_Air_Pressure_Dew_Point_Temperature_For_Th

In [167]:
#df2 = df1.filter(df1.Station_Name=='MANAUS')
#df2 = df1.filter(df1.Station_Name=='PORTO ALEGRE')
#df2 = df1.filter(df1.Station_Name=='PORTO ALEGRE')
#df2 = df1.filter(df1.Station_Name=='ECOLOGIA AGRICOLA')
df2 = df1.filter(df1.Station_Name=='BRASILIA')


In [131]:
df2.describe().show()

KeyboardInterrupt: 

In [168]:
df3 = df2.select('Date','Time','Total_Precipitation','Atmospheric_Pressure','Solar_Radiation','Maximum_Temperature_For_The_Last_Hour',
'Minimum_Temperature_For_The_Last_Hour',
'Maximum_Relative_Humid_Temperature_For_The_Last_Hour',
'Minimum_Relative_Humid_Temperature_For_The_Last_Hour',
'Wind_speed_In_Metres_Per_Second',
'Region','State','Station_Name','Station_Code','Latitude','Longitude','Elevation')

df3.show()

+----------+-----+-------------------+--------------------+---------------+-------------------------------------+-------------------------------------+----------------------------------------------------+----------------------------------------------------+-------------------------------+------+-----+------------+------------+------------+------------+---------+
|      Date| Time|Total_Precipitation|Atmospheric_Pressure|Solar_Radiation|Maximum_Temperature_For_The_Last_Hour|Minimum_Temperature_For_The_Last_Hour|Maximum_Relative_Humid_Temperature_For_The_Last_Hour|Minimum_Relative_Humid_Temperature_For_The_Last_Hour|Wind_speed_In_Metres_Per_Second|Region|State|Station_Name|Station_Code|    Latitude|   Longitude|Elevation|
+----------+-----+-------------------+--------------------+---------------+-------------------------------------+-------------------------------------+----------------------------------------------------+----------------------------------------------------+-------------

In [169]:
df4 = df3.filter((df3.Total_Precipitation>=0) & (df3.Atmospheric_Pressure>=100) & (df3.Atmospheric_Pressure<=1050) & (df3.Solar_Radiation>=0) & (df3.Maximum_Temperature_For_The_Last_Hour>-273) & (df3.Maximum_Temperature_For_The_Last_Hour<135) & (df3.Minimum_Temperature_For_The_Last_Hour>-273) &(df3.Minimum_Temperature_For_The_Last_Hour<135) & (df3.Maximum_Relative_Humid_Temperature_For_The_Last_Hour>=0) & (df3.Maximum_Relative_Humid_Temperature_For_The_Last_Hour<=100) & (df3.Minimum_Relative_Humid_Temperature_For_The_Last_Hour>=0) & (df3.Minimum_Relative_Humid_Temperature_For_The_Last_Hour<=100) & (df3.Wind_speed_In_Metres_Per_Second<=150) & (df3.Wind_speed_In_Metres_Per_Second>=0) )

df4.count()

91120

In [80]:
df4.describe().show()

+-------+-----+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------+----------------------------------------------------+----------------------------------------------------+-------------------------------+------+-----+------------+------------+--------------------+--------------------+-----------------+
|summary| Time|Total_Precipitation|Atmospheric_Pressure|   Solar_Radiation|Maximum_Temperature_For_The_Last_Hour|Minimum_Temperature_For_The_Last_Hour|Maximum_Relative_Humid_Temperature_For_The_Last_Hour|Minimum_Relative_Humid_Temperature_For_The_Last_Hour|Wind_speed_In_Metres_Per_Second|Region|State|Station_Name|Station_Code|            Latitude|           Longitude|        Elevation|
+-------+-----+-------------------+--------------------+------------------+-------------------------------------+-------------------------------------+----------------------------------------------------+------------------

In [81]:
#df4.select(df4.Date).distinct().count()

6182

In [170]:
from pyspark.sql.functions import col, count
grouped_df = df4.groupBy("Date").agg(count("*").alias("Count"))


In [171]:
df5 = grouped_df.filter(grouped_df.Count<6)
df5.show(5)

+----------+-----+
|      Date|Count|
+----------+-----+
|2005-01-16|    3|
|2005-06-06|    2|
|2007-11-15|    4|
|2004-07-12|    2|
|2004-07-27|    2|
+----------+-----+
only showing top 5 rows



In [91]:
df5.count()

129

In [90]:
# values_to_filter = df5.select("Date")

# # Remove rows from df1 that match column values in df2
# result_df = df4.filter(~col("Date").isin(values_to_filter["Date"]))

# # Show the resulting DataFrame
# result_df.show()

# #values_to_filter.show()

0

In [172]:
from pyspark.sql.functions import col

# Assuming df4 and df5 are your DataFrames
values_to_filter = df5.select("Date")

# Perform a left anti join to keep only rows from df4 where Date has no match in values_to_filter
result_df = df4.join(values_to_filter, "Date", "left_anti")

# Show the resulting DataFrame
result_df.show(5)

+----------+-----+-------------------+--------------------+---------------+-------------------------------------+-------------------------------------+----------------------------------------------------+----------------------------------------------------+-------------------------------+------+-----+------------+------------+------------+------------+---------+
|      Date| Time|Total_Precipitation|Atmospheric_Pressure|Solar_Radiation|Maximum_Temperature_For_The_Last_Hour|Minimum_Temperature_For_The_Last_Hour|Maximum_Relative_Humid_Temperature_For_The_Last_Hour|Minimum_Relative_Humid_Temperature_For_The_Last_Hour|Wind_speed_In_Metres_Per_Second|Region|State|Station_Name|Station_Code|    Latitude|   Longitude|Elevation|
+----------+-----+-------------------+--------------------+---------------+-------------------------------------+-------------------------------------+----------------------------------------------------+----------------------------------------------------+-------------

In [173]:
result_df.count()

89974

In [94]:

from pyspark.sql.functions import col, count
grouped_df = result_df.groupBy("Date").agg(count("*").alias("Count"))

df6 = grouped_df.filter(grouped_df.Count>=6)
df6.count()

6053

In [95]:
df6.show()

+----------+-----+
|      Date|Count|
+----------+-----+
|2000-07-03|   11|
|2000-12-26|   13|
|2001-05-16|   12|
|2002-06-20|   10|
|2002-12-06|    6|
|2002-12-25|   10|
|2006-05-17|   12|
|2007-04-20|   12|
|2007-11-15|   13|
|2007-11-23|   13|
|2010-08-11|   12|
|2012-04-17|   12|
|2012-10-06|   13|
|2013-01-22|   13|
|2013-03-26|   12|
|2013-05-21|   13|
|2013-09-09|   12|
|2014-09-26|   13|
|2014-11-12|   13|
|2015-03-09|   16|
+----------+-----+
only showing top 20 rows



In [89]:
df4.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- Total_Precipitation: double (nullable = true)
 |-- Atmospheric_Pressure: double (nullable = true)
 |-- Solar_Radiation: double (nullable = true)
 |-- Maximum_Temperature_For_The_Last_Hour: double (nullable = true)
 |-- Minimum_Temperature_For_The_Last_Hour: double (nullable = true)
 |-- Maximum_Relative_Humid_Temperature_For_The_Last_Hour: double (nullable = true)
 |-- Minimum_Relative_Humid_Temperature_For_The_Last_Hour: double (nullable = true)
 |-- Wind_speed_In_Metres_Per_Second: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Station_Name: string (nullable = true)
 |-- Station_Code: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Elevation: double (nullable = true)



In [174]:
output_csv_path = "final_weather_data_centralwest"
df_transformed_single_partition = result_df.coalesce(1)

# Write the DataFrame to a single CSV file
df_transformed_single_partition.write.mode("overwrite").csv(output_csv_path, header=True, compression="none")