# Using Azure Open Datasets in Synapse - Enrich NYC Green Taxi Data with Holiday and Weather

In [1]:
from azureml.opendatasets import NycTlcGreen

from datetime import datetime
from dateutil import parser
end_date = parser.parse('2018-06-12')
start_date = parser.parse('2018-06-01')

nyc_tlc = NycTlcGreen(start_date=start_date, end_date=end_date)
nyc_tlc_df = nyc_tlc.to_spark_dataframe()

# Perform data exploratory analysis

In [4]:
# Schema of the dataset

nyc_tlc_df.printSchema()

# Display 5 rows

nyc_tlc_df.show(5, truncate = False)

# Display number of the record in the dataframe

nyc_tlc_df.count()



# Statistical properties of a field

nyc_tlc_df.describe(["fareAmount"]).show()


# Remove unused columns from nyc green taxi data
# To remove any column from the pyspark dataframe, use the drop function. Example df = df.drop(“col_name”)

columns_to_remove = ["lpepDropoffDatetime", "puLocationId", "doLocationId",  
                     "pickupLatitude", "dropoffLongitude","dropoffLatitude" ,"rateCodeID", 
                     "storeAndFwdFlag","paymentType", "fareAmount", "extra", "mtaTax",
                     "improvementSurcharge", "tollsAmount", "ehailFee", "tripType "  
                    ]

nyc_tlc_df_clean = nyc_tlc_df.select([column for column in nyc_tlc_df.columns if column not in columns_to_remove])

# Display 5 rows
nyc_tlc_df_clean.show(5)

# Find unique values of a categorical column

nyc_tlc_df_clean.select('vendorID').distinct()


# Count the missing values of PySpark Dataframe column
# To know the missing values, we first count the null values in a dataframe.
nyc_tlc_df_clean.filter(nyc_tlc_df_clean['pickupLongitude'].isNull()).count()

# Tranformation of Spark dataframe 

In [5]:
# Extract month, day of month, and day of week from pickup datetime and add a static column for the country code to join holiday data. 

import pyspark.sql.functions as f

nyc_tlc_df_expand = nyc_tlc_df.withColumn('datetime',f.to_date('lpepPickupDatetime'))\
                .withColumn('month_num',f.month(nyc_tlc_df.lpepPickupDatetime))\
                .withColumn('day_of_month',f.dayofmonth(nyc_tlc_df.lpepPickupDatetime))\
                .withColumn('day_of_week',f.dayofweek(nyc_tlc_df.lpepPickupDatetime))\
                .withColumn('hour_of_day',f.hour(nyc_tlc_df.lpepPickupDatetime))\
                .withColumn('country_code',f.lit('US'))

Remove some of the columns that won't need for modeling or additional feature building.




## Enrich with holiday data

In [6]:
from azureml.opendatasets import PublicHolidays

hol = PublicHolidays(start_date=start_date, end_date=end_date)
hol_df = hol.to_spark_dataframe()

# Display data
hol_df.show(5, truncate = False)

Rename the countryRegionCode and date columns to match the respective field names from the taxi data, and also normalize the time so it can be used as a key. 

In [7]:
hol_df_clean = hol_df.withColumnRenamed('countryRegionCode','country_code')\
            .withColumn('datetime',f.to_date('date'))

hol_df_clean.show(5)

Next, join the holiday data with the taxi data by performing a left-join. This will preserve all records from taxi data, but add in holiday data where it exists for the corresponding datetime and country_code, which in this case is always "US". Preview the data to verify that they were merged correctly.

In [8]:
# enrich taxi data with holiday data
nyc_taxi_holiday_df = nyc_tlc_df_clean.join(hol_df_clean, on = ['datetime', 'country_code'] , how = 'left')

nyc_taxi_holiday_df.show(5)

In [9]:
# Create a temp table and filter out non empty holiday rows

nyc_taxi_holiday_df.createOrReplaceTempView("nyc_taxi_holiday_df")
spark.sql("SELECT * from nyc_taxi_holiday_df WHERE holidayName is NOT NULL ").show(5, truncate = False)

## Enrich with weather data¶

Now we append NOAA surface weather data to the taxi and holiday data. Use a similar approach to fetch the [NOAA weather history data](https://azure.microsoft.com/en-us/services/open-datasets/catalog/noaa-integrated-surface-data/) from Azure Open Datasets. 

In [10]:
from azureml.opendatasets import NoaaIsdWeather

isd = NoaaIsdWeather(start_date, end_date)
isd_df = isd.to_spark_dataframe()

In [11]:
isd_df.show(5, truncate = False)

In [12]:
# Filter out weather info for new york city, remove the recording with null temperature 

weather_df = isd_df.filter(isd_df.latitude >= '40.53')\
                        .filter(isd_df.latitude <= '40.88')\
                        .filter(isd_df.longitude >= '-74.09')\
                        .filter(isd_df.longitude <= '-73.72')\
                        .filter(isd_df.temperature.isNotNull())\
                        .withColumnRenamed('datetime','datetime_full')
                         

In [13]:
# Remove unused columns

columns_to_remove_weather = ["usaf", "wban", "longitude", "latitude"]
weather_df_clean = weather_df.select([column for column in weather_df.columns if column not in columns_to_remove_weather])\
                        .withColumn('datetime',f.to_date('datetime_full'))

weather_df_clean.show(5, truncate = False)

Next group the weather data so that you have daily aggregated weather values. 


In [14]:
# Enrich weather data with aggregation statistics

aggregations = {"snowDepth": "mean", "precipTime": "max", "temperature": "mean", "precipDepth": "max"}
weather_df_grouped = weather_df_clean.groupby("datetime").agg(aggregations)

In [15]:
weather_df_grouped.show(5)

In [16]:
# Rename columns

weather_df_grouped = weather_df_grouped.withColumnRenamed('avg(snowDepth)','avg_snowDepth')\
                                       .withColumnRenamed('avg(temperature)','avg_temperature')\
                                       .withColumnRenamed('max(precipTime)','max_precipTime')\
                                       .withColumnRenamed('max(precipDepth)','max_precipDepth')

Merge the taxi and holiday data you prepared with the new weather data. This time you only need the datetime key, and again perform a left-join of the data. Run the describe() function on the new dataframe to see summary statistics for each field.

In [17]:
# enrich taxi data with weather
nyc_taxi_holiday_weather_df = nyc_taxi_holiday_df.join(weather_df_grouped, on = 'datetime' , how = 'left')
nyc_taxi_holiday_weather_df.cache()

In [18]:
nyc_taxi_holiday_weather_df.show(5)

In [19]:
# Run the describe() function on the new dataframe to see summary statistics for each field.

display(nyc_taxi_holiday_weather_df.describe())

The summary statistics shows that the totalAmount field has negative values, which don't make sense in the context.



In [20]:
# Remove invalid rows with less than 0 taxi fare or tip
final_df = nyc_taxi_holiday_weather_df.filter(nyc_taxi_holiday_weather_df.tipAmount > 0)\
                                      .filter(nyc_taxi_holiday_weather_df.totalAmount > 0)