### Import all spark modules and libraries needed

In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col

### create Spark session

In [2]:
spark = SparkSession.builder.getOrCreate()

### Import the csv files and creating a dataframe for the two datasets and infer schema

In [3]:
file_location1 = "weather.20160201.csv"
file_location2 = "weather.20160301.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

In [4]:
WeatherSchema = StructType([
  StructField("ForecastSiteCode",IntegerType()),
  StructField("Time",IntegerType()),
  StructField("Date",TimestampType()),
  StructField("WindDirection",IntegerType()),
  StructField("WindSpeed",IntegerType()),
  StructField("WindGust",IntegerType()),
  StructField("Visibility",IntegerType()),
  StructField("Temperature",DoubleType()),
  StructField("Pressure",IntegerType()),
  StructField("WeatherCode",IntegerType()),
  StructField("SiteName",StringType()),
  StructField("Latitude",DoubleType()),
  StructField("Longitude",DoubleType()),
  StructField("Region",StringType()),
  StructField("COUNTRY",StringType())
])

In [5]:
df1 = spark.read.format(file_type) \
  .schema(WeatherSchema)\
  .option("header", first_row_is_header) \
  .option("mode", "failFast") \
  .option("sep", delimiter) \
  .load(file_location1)

df1.count()

93255

In [6]:
df2 = spark.read.format(file_type) \
  .schema(WeatherSchema)\
  .option("header", first_row_is_header) \
  .option("mode", "failFast") \
  .option("sep", delimiter) \
  .load(file_location2)

df1.count()

93255

### Get the number of partitions in the dataset

In [7]:
print(df1.rdd.getNumPartitions(), df1.rdd.getNumPartitions())

3 3


### Combined the two datasets and write the file in parquet file for optimisation query

In [None]:
weatherDataCombined = df1.union(df2)
weatherDataCombined.write.format("parquet").mode("overwrite").save("weatherDataCombined.parquet")

## create a temporary view table

In [None]:
weather = spark.read.parquet("weatherDataCombined.parquet") 
weather.createOrReplaceTempView('weather')           
spark.catalog.cacheTable("weather")
weather.count()

## Querying the weatherData to solve the solutions

In [10]:
spark.sql("SELECT Region,Date,Temperature FROM weather WHERE Temperature IN (SELECT MAX(Temperature) FROM weather)").show()

+--------------------+-------------------+-----------+
|              Region|               Date|Temperature|
+--------------------+-------------------+-----------+
|Highland & Eilean...|2016-03-17 00:00:00|       15.8|
+--------------------+-------------------+-----------+



In [29]:
maxTemperature = weather.selectExpr("max(Temperature)")
maxTemperature.show()

+----------------+
|max(Temperature)|
+----------------+
|            15.8|
+----------------+



In [30]:
Table1 = weather.select("Date", "Region", "Temperature")
Table1.filter(col("Temperature") == 15.8).show()

+-------------------+--------------------+-----------+
|               Date|              Region|Temperature|
+-------------------+--------------------+-----------+
|2016-03-17 00:00:00|Highland & Eilean...|       15.8|
+-------------------+--------------------+-----------+

