### Create sample datasets - NYC Yellow Taxi trip records and NOAA ISD Weather - in your data lake

The code of this notebook is based on a sample notebook _"Using Azure Open Datasets in Synapse"_ (PySpark version) available in the Knowledge Center of Synapse Studio.

#### NYC Yellow Taxi trips dataset

In [None]:
from azureml.opendatasets import NycTlcYellow

from datetime import datetime
from dateutil import parser
end_date = parser.parse('2018-12-31')
start_date = parser.parse('2009-01-01')

nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
df = nyc_tlc.to_spark_dataframe()

# TODO: Replace <storage_name> with the name of your Primary ADLS Gen2 storage account name
# and <container_name> with the name of the container chosen as the Primary ADLS Gen2 file system
path = "abfss://<container_name>@<storage_name>.dfs.core.windows.net/nycyellowtaxi"

df.write.partitionBy("puYear", "puMonth").mode("overwrite").parquet(path)

#### NOAA IDS Weather dataset

In [None]:
from azureml.opendatasets import NoaaIsdWeather
import pyspark.sql.functions as f

isd = NoaaIsdWeather(start_date, end_date)
isd_df = isd.to_spark_dataframe()

weather_df = isd_df.filter(isd_df.latitude >= '40.53')\
                        .filter(isd_df.latitude <= '40.88')\
                        .filter(isd_df.longitude >= '-74.09')\
                        .filter(isd_df.longitude <= '-73.72')\
                        .filter(isd_df.temperature.isNotNull())\
                        .withColumnRenamed('datetime','datetime_full')

columns_to_remove_weather = ["usaf", "wban", "longitude", "latitude"]
weather_df_clean = weather_df.select([column for column in weather_df.columns if column not in columns_to_remove_weather])\
                        .withColumn('datetime',f.to_date('datetime_full'))

aggregations = {"snowDepth": "mean", "precipTime": "max", "temperature": "mean", "precipDepth": "max"}
weather_df_grouped = weather_df_clean.groupby("datetime", "year", "month").agg(aggregations)

weather_df_grouped = weather_df_grouped.withColumnRenamed('avg(snowDepth)','avg_snowDepth')\
                                       .withColumnRenamed('avg(temperature)','avg_temperature')\
                                       .withColumnRenamed('max(precipTime)','max_precipTime')\
                                       .withColumnRenamed('max(precipDepth)','max_precipDepth')

# TODO: Replace <storage_name> with the name of your Primary ADLS Gen2 storage account name
# and <container_name> with the name of the container chosen as the Primary ADLS Gen2 file system
path_isd = "abfss://<container_name>@<storage_name>.dfs.core.windows.net/isdweather"

weather_df_grouped.write.partitionBy("year", "month").mode("overwrite").parquet(path_isd)