## Setup

In [1]:
import pyspark
from pyspark.sql import SparkSession, Row 
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [2]:
import numpy as np
import pandas as pd
import time

In [3]:
spark = SparkSession.builder.appName("WeatherDataAnalysis").getOrCreate()

In [4]:
spark

## Load WeatherStations Data

In [5]:
#Describe Schema
weatherStationsSchema = T.StructType([
    T.StructField("State",T.StringType()),
    T.StructField("District",T.StringType()),
    T.StructField("Latitude",T.DoubleType()),
    T.StructField("Longitude",T.DoubleType()),
    T.StructField("StationID",T.StringType()),
])

In [6]:
#Load Data
weatherStations = spark.read.load(path = "Datasets/Output/WeatherStationsId.csv",
                                  format = "csv",header=True,
                                  schema = weatherStationsSchema,)

In [7]:
#Check Schema
weatherStations.printSchema()

root
 |-- State: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- StationID: string (nullable = true)



In [8]:
#check Data
weatherStations.show()

+---------------+--------------+------------------+---------+---------+
|          State|      District|          Latitude|Longitude|StationID|
+---------------+--------------+------------------+---------+---------+
|      TELANGANA|      NALGONDA|  16.3920001983643|   79.375|   164794|
|          BIHAR|CHAMPARAN.WEST|  27.3199996948242|  83.4375|   273834|
|          BIHAR|   MUZAFFARPUR|  25.4466991424561|  85.3125|   254853|
|          BIHAR|    SAMASTIPUR|  25.4466991424561|  85.3125|   254853|
|        GUJARAT|     AHMEDABAD|  22.9487991333008|     72.5|   229725|
|      KARNATAKA|     BAGALKOTE|  16.0797996520996|   75.625|   161756|
|      KARNATAKA|BANGALORE URBA|12.957500457763699|     77.5|   130775|
|         KERALA|        IDUKKI|10.147399902343802|  77.1875|   101772|
|MADHAYA PRADESH|   NARSINGHPUR|  7.02514982223511|   79.375|    70794|
|    MAHARASHTRA|    AHMEDNAGAR|   18.577600479126|  75.3125|   186753|
|    MAHARASHTRA|       YEOTMAL|  20.1387996673584|  78.4375|   

## Load WeatherData

In [9]:
#Describe Schema
weatherDataSchema = T.StructType([
    T.StructField("Date",T.DateType()),
    T.StructField("MaxTemperature",T.DoubleType()),
    T.StructField("MinTemperature",T.DoubleType()),
    T.StructField("Precipitation",T.DoubleType()),
    T.StructField("RelativeHumidity",T.DoubleType()),
    T.StructField("StationId",T.StringType())
])

In [10]:
#Load Data
weatherData = spark.read.load("Datasets/Output/WeatherData.csv",
                             format="csv", schema=weatherDataSchema,
                             header=True)

In [11]:
#check schema
weatherData.printSchema()

root
 |-- Date: date (nullable = true)
 |-- MaxTemperature: double (nullable = true)
 |-- MinTemperature: double (nullable = true)
 |-- Precipitation: double (nullable = true)
 |-- RelativeHumidity: double (nullable = true)
 |-- StationId: string (nullable = true)



In [12]:
#check data
weatherData.show()

+----------+-----------------+--------------+-------------+-----------------+---------+
|      Date|   MaxTemperature|MinTemperature|Precipitation| RelativeHumidity|StationId|
+----------+-----------------+--------------+-------------+-----------------+---------+
|2000-01-01|           -5.726|       -16.477|  0.181961028|0.888501860192614|   323772|
|2000-01-02|-6.11599999999999|       -24.217| 0.0102996864|0.902365760303462|   323772|
|2000-01-03|-5.70600000000002|       -16.677| 0.1235961792|0.842162583304904|   323772|
|2000-01-04|-6.10000000000002|       -17.362|  0.401687496|0.866002168607601|   323772|
|2000-01-05|-7.59899999999999|        -18.76| 0.5081174424|0.817787436720489|   323772|
|2000-01-06|           -5.529|       -16.434|  0.065231316|0.810127597488599|   323772|
|2000-01-07|           -6.815|       -23.009|  3.738789252|0.830508587510486|   323772|
|2000-01-08|-6.96899999999999|       -11.726|  4.318999704| 0.90915788839414|   323772|
|2000-01-09|          -10.712|  

In [13]:
#check number of records
weatherData.count()

6852760

## Checking Districts

In [14]:
#Lets check the number of districts and stations mapped to each of them
tempDF = weatherStations.groupBy("State","District").count()

In [15]:
tempDF.count()

535

In [16]:
#Listing stations with most stations
tempDF.orderBy(F.desc("count")).show()

+--------------+------------+-----+
|         State|    District|count|
+--------------+------------+-----+
|     KARNATAKA|    GULBARGA|   13|
|ANDHRA PRADESH|   ANANTAPUR|   12|
|   MAHARASHTRA|       POONA|   11|
|ANDHRA PRADESH|     NELLORE|   11|
|     KARNATAKA|     BELGAUM|   11|
|     KARNATAKA|KANARA NORTH|   10|
|     KARNATAKA|     SHIMOGA|   10|
|ANDHRA PRADESH|     CHITTOR|   10|
|   MAHARASHTRA|    SHOLAPUR|   10|
|     KARNATAKA|   BAGALKOTE|    9|
|     KARNATAKA|      TUMKUR|    9|
|ANDHRA PRADESH|    CUDDAPAH|    9|
|       GUJARAT|       KUTCH|    9|
|     KARNATAKA|     RAICHUR|    9|
|     KARNATAKA|      HASSAN|    9|
|     TELANGANA|    ADILABAD|    9|
|   MAHARASHTRA|  AHMEDNAGAR|    9|
|ANDHRA PRADESH|     KURNOOL|    8|
|        ORISSA|  MAYURBHANJ|    8|
|       GUJARAT|   AHMEDABAD|    8|
+--------------+------------+-----+
only showing top 20 rows



In [17]:
tempDF.describe().show()

+-------+--------------------+-------------+-----------------+
|summary|               State|     District|            count|
+-------+--------------------+-------------+-----------------+
|  count|                 535|          535|              535|
|   mean|                null|         null|3.633644859813084|
| stddev|                null|         null| 2.14379794964921|
|    min|ANDAMAN AND NICOB...|24 PARGANAS N|                1|
|    max|         WEST BENGAL|      YEOTMAL|               13|
+-------+--------------------+-------------+-----------------+



## Merge data at district level

In [18]:
#UDF for generating list of StationID for each District
@F.pandas_udf(returnType=T.ArrayType(T.StringType()),functionType=F.PandasUDFType.GROUPED_AGG)
def genList(data):
     return list(data)



In [19]:
districtGrouped = (weatherStations
                   .groupBy("State","District")
                   .agg(genList("StationID").alias("StationIDs"))
                  )

In [20]:
districtGrouped.show()

+-----------------+---------------+--------------------+
|            State|       District|          StationIDs|
+-----------------+---------------+--------------------+
|          GUJARAT|      PORBANDAR|     [70794, 214694]|
|        RAJASTHAN| SAWAI MADHOPUR|[264766, 254763, ...|
|        TAMILNADU|     THIRUVARUR|    [105794, 101794]|
|      WEST BENGAL|    COOCH BEHAR|[261884, 261894, ...|
|          GUJARAT|  BULSAR/VALSAD|[205731, 201731, ...|
|      MAHARASHTRA|      NANDURBAR|[211734, 214744, ...|
|        TAMILNADU|     VILLUPURAM|[114784, 114791, ...|
|        TAMILNADU|     COIMBATORE|[105772, 111766, ...|
|    UTTAR PRADESH|KHERI LAKHIMPUR|[276800, 279806, ...|
|            ASSAM|         CACHAR|[245925, 245931, ...|
|       CHATISGARH|           DURG|[205800, 214813, ...|
| HIMACHAL PRADESH|       BILASPUR|[311766, 314763, ...|
|        KARNATAKA|        RAICHUR|[164766, 161772, ...|
|  MADHAYA PRADESH|          SAGAR|[233791, 236794, ...|
|       PUDUCHERRY|     PUDUCHE

For each district we need a summary of the data for every day in the data.<br>
A good approach will be use mean value of each variable across the stations mapped to the district.  
Another approach can be to use the median value of each variable across the stations mapped to the district. This approach will allow us to avoid affect of any possible outliers. 

We will use median value for _MinTemperature_ and _MaxTemperature_ and mean values for _Precipitation_ and _RelativeHumidity_

In [21]:
@F.pandas_udf(T.DoubleType(),F.PandasUDFType.GROUPED_AGG)
def median(data):
    return data.median()

@F.pandas_udf(T.DoubleType(),F.PandasUDFType.GROUPED_AGG)
def mean(data):
    return data.mean()

In [22]:
def generateDailySummary(stationIds):
    summaryData = (weatherData.filter(F.col("StationId").isin(stationIds))
                   .groupBy("Date")
                   .agg(median("MinTemperature"),
                        median("MaxTemperature"),
                        mean("Precipitation"),
                        mean("RelativeHumidity")))
    return summaryData

In [23]:
starttime = time.time()
districtsGroupedPDF = districtGrouped.toPandas()

for index in range(len(districtsGroupedPDF)):
    state,district,stationIds = districtsGroupedPDF.iloc[index]
    #print(state,district,stationIds)
    ( generateDailySummary(stationIds)
        .select("Date",
            F.col("median(MinTemperature)").alias("MinTemperature"),
            F.col("median(MaxTemperature)").alias("MaxTemperature"),
            F.col("mean(Precipitation)").alias("Precipitation"),
            F.col("mean(RelativeHumidity)").alias("RelativeHumidity"))
        .withColumn("State",F.lit(state))
        .withColumn("District",F.lit(district))
        .write.csv(path = "Datasets/Output/WeatherDistrictData.csv",
                 mode="append",header=True))
    if index%25==0:
        print("Completed "+str(index+1)+" runs")
print(time.time()-starttime)

Completed 1 runs
Completed 26 runs
Completed 51 runs
Completed 76 runs
Completed 101 runs
Completed 126 runs
Completed 151 runs
Completed 176 runs
Completed 201 runs
Completed 226 runs
Completed 251 runs
Completed 276 runs
Completed 301 runs
Completed 326 runs
Completed 351 runs
Completed 376 runs
Completed 401 runs
Completed 426 runs
Completed 451 runs
Completed 476 runs
Completed 501 runs
Completed 526 runs
1281.452843427658
