In [0]:
spark

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.window import Window

In [0]:
display(dbutils.fs.ls("/FileStore/tables/"))

path,name,size
dbfs:/FileStore/tables/Most_Recent_Cohorts_Institution.csv,Most_Recent_Cohorts_Institution.csv,218920168
dbfs:/FileStore/tables/UniversityData/,UniversityData/,0
dbfs:/FileStore/tables/UniversityMasterData/,UniversityMasterData/,0
dbfs:/FileStore/tables/nyc_tripdata-1.csv,nyc_tripdata-1.csv,33920901
dbfs:/FileStore/tables/nyc_tripdata.csv,nyc_tripdata.csv,33920901
dbfs:/FileStore/tables/taxi_zone_lookup-1.csv,taxi_zone_lookup-1.csv,12322
dbfs:/FileStore/tables/taxi_zone_lookup.csv,taxi_zone_lookup.csv,12322
dbfs:/FileStore/tables/weatherDataFinal.csv,weatherDataFinal.csv,46036292
dbfs:/FileStore/tables/weather_data_snow-1.csv,weather_data_snow-1.csv,9768394
dbfs:/FileStore/tables/weather_data_snow.csv,weather_data_snow.csv,17600943


In [0]:
# Read the file in a dataframe
dfHourlyWeather = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/weatherDataFinal.csv")

## Setting up the List for Snow and Rain weather ID's based on values from https://openweathermap.org/weather-conditions
snow_id = [600,601,602,611,612,613,615,616,620,621,622]
rain_id = [200,201,202,232,300,301,302,310,311,312,313,314,321,500,501,502,503,504,511,520,521,522,531]

# Validate by Filtering data where weather description is snow
#dfHourlyWeather = dfHourlyWeather.filter(dfHourlyWeather.weatherId.isin(snow_id)).withColumn("didSnow",when(dfHourlyWeather.weatherId.isin(snow_id),lit("1")).otherwise(lit("0"))).show()


## Add new Columns for Each Hour to check if if Rained/Snowed and if it was cloudy 
dfHourlyWeather = dfHourlyWeather.withColumn("didSnow",when(dfHourlyWeather.weatherId.isin(snow_id),lit("1")).otherwise(lit("0"))) \
                  .withColumn("didRain",when(dfHourlyWeather.weatherId.isin(rain_id),lit("1")).otherwise(lit("0"))) \
                  .withColumn("IsSunny",when((dfHourlyWeather.isCloudyPerc <= 20), lit("1")).otherwise(lit("0")))\
                  .withColumn("tempFareinheit",((col("temp")-273.15)*1.8)+32)
                  ##.select("collegeId","city","zipCode","dateTime","tempFareinheit","snow1h","snow1h","rain1h","didSnow","didRain","IsSunny")  



## Create a Daily Summary Dataframe using the Hourly data and computer columns

partitionColumns= ["unitId","city","zipCode","dateTime"]
orderColumns = ["unitId","city","zipCode","dateTime"]
windowSpecAgg  = Window.partitionBy(partitionColumns).orderBy(orderColumns)

dfDailySummary = dfHourlyWeather.groupBy("unitId","city","zipCode","dateTime")\
                                 .agg(sum("IsSunny").alias("totalSunHours"),sum("didSnow").alias("totalSnowHours"),sum("didRain").alias("totalRainHours"))\

display(dfDailySummary.orderBy(desc("dateTime")).limit(5))
#display(dfDailySummary.filter(dfDailySummary.totalSnowHours >0 ))       

unitId,city,zipCode,dateTime,totalSunHours,totalSnowHours,totalRainHours
159382,Alexandria,71302-9121,2021-04-21T00:00:00.000+0000,17.0,0.0,0.0
232681,Fredericksburg,22401-5300,2021-04-21T00:00:00.000+0000,13.0,0.0,0.0
171571,Rochester Hills,48309-4401,2021-04-21T00:00:00.000+0000,1.0,0.0,0.0
155937,Sterling,67579,2021-04-21T00:00:00.000+0000,14.0,0.0,0.0
195164,Saint Bonaventure,14778-9800,2021-04-21T00:00:00.000+0000,0.0,17.0,0.0
