In [0]:
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.cm as cm
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style("ticks")
import datetime
from datetime import timedelta

In [0]:
display(
  dbutils.fs.ls('/FileStore/tables/')
)

path,name,size,modificationTime
dbfs:/FileStore/tables/201501_citibike_tripdata.csv,201501_citibike_tripdata.csv,44482247,1648437439000
dbfs:/FileStore/tables/201502_citibike_tripdata.csv,201502_citibike_tripdata.csv,30661770,1648437411000
dbfs:/FileStore/tables/201503_citibike_tripdata.csv,201503_citibike_tripdata.csv,53293646,1648437517000
dbfs:/FileStore/tables/201504_citibike_tripdata.csv,201504_citibike_tripdata.csv,125863090,1648437683000
dbfs:/FileStore/tables/201505_citibike_tripdata.csv,201505_citibike_tripdata.csv,185377057,1648437879000
dbfs:/FileStore/tables/201506_citibike_tripdata.csv,201506_citibike_tripdata.csv,146640768,1648437972000
dbfs:/FileStore/tables/201507_citibike_tripdata.csv,201507_citibike_tripdata.csv,209217721,1648441498000
dbfs:/FileStore/tables/201508_citibike_tripdata.csv,201508_citibike_tripdata.csv,227336662,1648441623000
dbfs:/FileStore/tables/201509_citibike_tripdata.csv,201509_citibike_tripdata.csv,249559343,1648443201000
dbfs:/FileStore/tables/201510_citibike_tripdata.csv,201510_citibike_tripdata.csv,237827334,1648443255000


In [0]:
display(
  dbutils.fs.ls('/FileStore/tables/nyctaxi')
)

path,name,size,modificationTime
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_01.csv,yellow_tripdata_2019_01.csv,687088084,1647823206000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_02.csv,yellow_tripdata_2019_02.csv,649882828,1648183383000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_03.csv,yellow_tripdata_2019_03.csv,726201566,1648184576000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_04.csv,yellow_tripdata_2019_04.csv,689207122,1648184865000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_05.csv,yellow_tripdata_2019_05.csv,701538890,1648186032000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_06.csv,yellow_tripdata_2019_06.csv,643492154,1648239561000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_07.csv,yellow_tripdata_2019_07.csv,584387609,1648239450000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_08.csv,yellow_tripdata_2019_08.csv,562386202,1648179279000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_09.csv,yellow_tripdata_2019_09.csv,608973500,1648182571000
dbfs:/FileStore/tables/nyctaxi/yellow_tripdata_2019_10.csv,yellow_tripdata_2019_10.csv,669168416,1648180729000


In [0]:
def nyc_citibike_data_reader(year):
    
    """
    function to read citibike data for the given year
    inputs: year: (string) the year the data will be read 
                           (between 2013 to 2021)
    output: dataframe: (pandas) of the NYC citibike date 
    """
    
    # File location and type
    file_location = "/FileStore/tables/" + year + "*"
    file_type = "csv"

    # CSV options
    infer_schema = "true"
    first_row_is_header = "true"
    delimiter = ","

    # The applied options are for CSV files. For other file types, these will be ignored.
    df = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)

    return  df.toPandas()

In [0]:
def date_preprocess(df, freq="H"):
    
    '''
    Function to process a dataframe of ride data by doing the following steps:
    - Converts start time to dataype datetime and set it as the index
    - Group the data by 1D frequency, include start station id, latitude, longitude
    - Aggregate data by count
    - Rename "count" column "ride_count"
    - Reset the index then move starttime back to the index (removes multi-index layers)
    
    input: 
         df: dataframe (pandas) of the NYC citibike date 
         freq: (string) the desired frequency of aggregated data
                  "H": for hourly frequency 
                  "D": for daily frequency

    output: 
         df_grouped: dataframe (pandas) aggregated data of the input NYC citibike date 
    '''
    
    df['tripduration'] = df['tripduration'].astype('int')
    df_dur_lim  = [np.percentile(df['tripduration'], 1), np.percentile(df['tripduration'], 99)]
    df= df[df['tripduration'].between(df_dur_lim[0],df_dur_lim[1])]

    df['starttime'] = pd.to_datetime(df['starttime'])
    
    df = df.set_index('starttime')
    
    grouper = df.groupby([pd.Grouper(freq=freq), 'start station id', 'start station latitude', 'start station longitude'])
    
    df_grouped = pd.DataFrame(grouper['start station id'].count())
    
    df_grouped = df_grouped.rename(columns={'start station id': 'ride_count'})
    
    df_grouped = df_grouped.reset_index().set_index('starttime')
    
    return df_grouped


#### A sample raw citibike data

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import coalesce, to_timestamp

# schema for the raw trip history data
trip_schema = StructType([
  StructField('trip_duration', StringType()),
  StructField('start_time', StringType()),
  StructField('stop_time',  StringType()),
  StructField('start_station_id', StringType()),
  StructField('start_station_name', StringType()),
  StructField('start_station_latitude', StringType()),
  StructField('start_station_longitude', StringType()),
  StructField('end_station_id', StringType()),
  StructField('end_station_name', StringType()),
  StructField('end_station_latitude', StringType()),
  StructField('end_station_longitude', StringType()),
  StructField('bike_id', StringType()),
  StructField('user_type', StringType()),
  StructField('birth_year', StringType()),
  StructField('user_gender', StringType()),
  ])

# read the raw trip history data to dataframe
raw = spark.read.csv(
  '/FileStore/tables/', 
  header=True,  
  schema=trip_schema
  )

raw = raw.drop_duplicates()
bike_df = raw.toPandas()

In [0]:
bike_df

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
0,320,2019-01-01 00:01:47.401,2019-01-01 00:07:07.581,3160,Central Park West & W 76 St,40.778968,-73.973747,3283,W 89 St & Columbus Ave,40.788221,-73.970416,15839,Subscriber,1971,1
1,316,2019-01-01 00:04:43.736,2019-01-01 00:10:00.608,519,Pershing Square North,40.751873,-73.977706,518,E 39 St & 2 Ave,40.747804,-73.973442,32723,Subscriber,1964,1
2,591,2019-01-01 00:06:03.997,2019-01-01 00:15:55.438,3171,Amsterdam Ave & W 82 St,40.785247,-73.976673,3154,E 77 St & 3 Ave,40.773142,-73.958562,27451,Subscriber,1987,1
3,2719,2019-01-01 00:07:03.545,2019-01-01 00:52:22.650,504,1 Ave & E 16 St,40.732219,-73.981656,3709,W 15 St & 6 Ave,40.738046,-73.996430,21579,Subscriber,1990,1
4,303,2019-01-01 00:07:35.945,2019-01-01 00:12:39.502,229,Great Jones St,40.727434,-73.993790,503,E 20 St & Park Ave,40.738274,-73.987520,35379,Subscriber,1979,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13580301,232,2019-06-30 23:59:32.892,2019-07-01 00:03:25.631,3113,Greenpoint Ave & Manhattan Ave,40.730260,-73.953940,3107,Bedford Ave & Nassau Ave,40.723117,-73.952123,25779,Subscriber,1972,1
13580302,477,2019-06-30 23:59:33.699,2019-07-01 00:07:31.044,445,E 10 St & Avenue A,40.727408,-73.981420,3737,Stanton St & Norfolk St,40.720747,-73.986274,39522,Subscriber,1981,2
13580303,300,2019-06-30 23:59:47.405,2019-07-01 00:04:48.185,3307,West End Ave & W 94 St,40.794165,-73.974124,3320,Central Park West & W 100 St,40.794067,-73.962868,26730,Subscriber,1981,1
13580304,906,2019-06-30 23:59:51.598,2019-07-01 00:14:58.321,402,Broadway & E 22 St,40.740343,-73.989551,400,Pitt St & Stanton St,40.719261,-73.981780,38286,Subscriber,1996,1


#### A sample procedure to process raw citibike data for one year

In [0]:
# reading data for one year 
df_2015 = nyc_citibike_data_reader("2015")

In [0]:
# preprocessing data for one year
df_2015_pp = date_preprocess(df_2015, freq="H")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['starttime'] = pd.to_datetime(df['starttime'])


Note: Due to limitatin of free community version of Databricks environment, this same preporcess was repeated for each of the years from 2013 to 2022 out of Databricks environment and the clean data was imported to the other notebooks.

In [0]:
bike_stations = bike_df[['start station id', 'start station longitude','start station latitude']].drop_duplicates()
bike_stations = bike_stations[bike_stations['start station id'] != "NULL"]
bike_stations

Unnamed: 0,start station id,start station longitude,start station latitude
0,3160,-73.973747,40.778968
1,519,-73.977706,40.751873
2,3171,-73.976673,40.785247
3,504,-73.981656,40.732219
4,229,-73.993790,40.727434
...,...,...,...
19878860,3305,-73.950000,40.782000
19884451,3142,-73.962000,40.761000
19886264,3110,-73.953000,40.728000
19920600,3854,-73.935000,40.716000


In [0]:
# counting number of usages for each bike station
bike_df_gb = bike_df[['start station id', 'starttime']].groupby(by='start station id').count()
bike_df_gb = bike_df_gb.sort_values(by='starttime', ascending=False)

# getting top 200 active bike stations
bike_topstations = pd.DataFrame()
bike_topstations['station id'] = bike_df_gb.head(n=50).index  
len(bike_topstations)

In [0]:
bike_df_filtered = bike_df[bike_df['start station id'].isin(bike_topstations['station id'])]
len(bike_df_filtered)

Out[16]: 4269606

In [0]:
delta_bike = bike_df_filtered["stoptime"] - bike_df_filtered["starttime"]
bike_df_filtered['trip_duration'] = delta_bike.apply(lambda x: x/np.timedelta64(1,'m'))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  bike_df_filtered['trip_duration'] = delta_bike.apply(lambda x: x/np.timedelta64(1,'m'))


In [0]:
bike_df_filtered['age'] = [2019 - x for x in bike_df_filtered['birth year']]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  bike_df_filtered['age'] = [2019 - x for x in bike_df_filtered['birth year']]


****bike stations visualizations****

In [0]:
# from google.cloud import bigquery
from IPython.display import display
from IPython.display import HTML, Image
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import folium
from folium import Marker
from folium.plugins import MarkerCluster
import math

In [0]:
# !--upgrade pip
# !pip install folium

In [0]:
# Create the map
Map = folium.Map(location=[bike_stations['start station latitude'].mean(),bike_stations['start station longitude'].mean()], 
                 tiles='OpenStreetMap', 
                 zoom_start=11)

# Add points to the map
mc = MarkerCluster()
# mc = folium.FeatureGroup(name="My Map")

for _, row in bike_stations.iterrows():
    if not math.isnan(row['start station latitude']) and not math.isnan(row['start station longitude']):
        mc.add_child(Marker([row['start station latitude'], row['start station longitude']]))
Map.add_child(mc)

# Display the map
Map

***Taxi Data***

In [0]:
# File location and type
file_location = "/FileStore/tables/nyctaxi/yellow_tripdata_2019_01.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
taxi = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# display(taxi)
taxi_df = taxi.toPandas()
taxi_df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14.0,0.5,0.5,1.00,0.0,0.3,16.30,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.00,1,N,236,236,1,4.5,0.5,0.5,0.00,0.0,0.3,5.80,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.00,2,N,193,193,2,52.0,0.0,0.5,0.00,0.0,0.3,55.55,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7667787,2,2019-01-31 23:57:36,2019-02-01 00:18:39,1,4.79,1,N,263,4,1,18.0,0.5,0.5,3.86,0.0,0.3,23.16,0.0
7667788,2,2019-01-31 23:32:03,2019-01-31 23:33:11,1,0.00,1,N,193,193,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667789,2,2019-01-31 23:36:36,2019-01-31 23:36:40,1,0.00,1,N,264,264,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667790,2,2019-01-31 23:14:53,2019-01-31 23:15:20,1,0.00,1,N,264,7,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0


In [0]:
delta = taxi_df["tpep_dropoff_datetime"] - taxi_df["tpep_pickup_datetime"]
taxi_df['trip_duration'] = delta.apply(lambda x: x/np.timedelta64(1,'s')) # extracing trip duration in seconds

In [0]:
# File location and type
file_location = "/FileStore/tables/weather/1001_2019_01_01_to_2019_12_31.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
weather_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(weather_df)

name,datetime,temp,feelslike,dew,humidity,precip,precipprob,preciptype,snow,snowdepth,windgust,windspeed,winddir,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,severerisk,conditions,icon,stations
1001,2019-01-01T00:00:00.000+0000,32.1,22.1,32.1,100.0,,,,,,,13.9,190.0,,100.0,6.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T01:00:00.000+0000,33.9,24.0,32.1,93.03,,,,,,,15.0,190.0,,100.0,6.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T02:00:00.000+0000,33.9,23.3,32.0,92.93,,,,,,22.4,17.2,190.0,1012.6,97.3,6.2,,,,,Overcast,cloudy,"EVRA,26326099999,26422599999,26425099999,26424099999"
1001,2019-01-01T03:00:00.000+0000,33.9,23.0,33.9,100.0,,,,,,,18.1,190.0,,97.9,6.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T04:00:00.000+0000,33.9,23.2,33.9,100.0,,,,,,,17.4,192.0,,100.0,5.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T05:00:00.000+0000,33.9,23.3,33.8,99.69,,,,,,33.6,17.1,190.0,1006.0,96.8,6.0,,,,,Overcast,cloudy,"EVRA,26326099999,26429099999,26422599999,26425099999,26424099999"
1001,2019-01-01T06:00:00.000+0000,35.7,25.9,35.7,100.0,,,,,,,16.1,200.0,,100.0,6.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T07:00:00.000+0000,37.5,27.3,37.1,98.76,,,,,,,19.3,208.0,,100.0,6.2,,,,,Overcast,cloudy,"EVRA,26422599999"
1001,2019-01-01T08:00:00.000+0000,37.5,27.6,37.4,99.8,,,,,,26.8,18.3,210.0,999.3,96.8,5.3,,,,,Overcast,cloudy,"EVRA,26326099999,26429099999,26422599999,26425099999,26424099999"
1001,2019-01-01T09:00:00.000+0000,39.3,30.2,39.3,100.0,,,,,,,17.2,200.0,,97.9,2.9,,,,,Overcast,cloudy,"EVRA,26422599999"


In [0]:
weather_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipprob: string (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: double (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- windgust: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- winddir: integer (nullable = true)
 |-- sealevelpressure: double (nullable = true)
 |-- cloudcover: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- solarradiation: integer (nullable = true)
 |-- solarenergy: double (nullable = true)
 |-- uvindex: integer (nullable = true)
 |-- severerisk: string (nullable = true)
 |-- conditions: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- stations: string (nullable = true)

