In [1]:
from google.colab import drive

In [96]:
import os
import zipfile
from zipfile import ZipFile
from os import path
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, to_timestamp, unix_timestamp,mean, to_date, month,  date_format, array, explode, rank
from pyspark.sql import types
from pyspark.sql.window import Window

In [3]:
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!ls "/content/drive/MyDrive/data"

Divvy_Trips_2019_Q4.zip  Divvy_Trips_2020_Q1.zip


In [5]:
def get_file_paths(root_directory, folder_name, file_extension):
    file_paths = []
    file_directory = os.path.join(root_directory, folder_name)
    for root, _, files in os.walk(file_directory):
        for file_name in files:
            if file_extension in file_name:
                file_paths.append(os.path.join(root, file_name))
    return file_paths

In [6]:
def read_zip_files_into_dataframes(zip_file_paths):
    df_list = []
    for file_path in zip_file_paths:
        print(f"Reading for {file_path}")
        zip_file = ZipFile(file_path)
        for text_file in zip_file.infolist():
            if text_file.filename.endswith(
                ".csv"
            ) and not text_file.filename.startswith("__MACOSX"):
                df = pd.read_csv(zip_file.open(text_file.filename))
                # add a column for the year based on the filename
                year = int(path.basename(file_path).split(".")[0].split("_")[-2])
                df['year'] = year
                df_list.append(df)
    return df_list

In [7]:
def write_to_csv(df, file_path):
  # set mode to overwrite
  df.write.mode("overwrite").csv(file_path, header=True)

In [8]:
current_directory = os.getcwd()

In [9]:
current_directory

'/content'

In [10]:
zip_file_paths = get_file_paths(current_directory, r"/content/drive/MyDrive/data", "zip")

In [11]:
zip_file_paths

['/content/drive/MyDrive/data/Divvy_Trips_2019_Q4.zip',
 '/content/drive/MyDrive/data/Divvy_Trips_2020_Q1.zip']

In [12]:
pdf = read_zip_files_into_dataframes(zip_file_paths)

Reading for /content/drive/MyDrive/data/Divvy_Trips_2019_Q4.zip
Reading for /content/drive/MyDrive/data/Divvy_Trips_2020_Q1.zip


In [13]:
df_2019 = pdf[0]

In [14]:
df_2019['birthyear'] = df_2019['birthyear'].astype('Int64')
df_2019["trip_id"] = df_2019["trip_id"].astype(str)

In [15]:
df_2019=df_2019.rename(columns={'trip_id' : "ride_id", "start_time": "started_at", "end_time": "ended_at", "from_station_id": "start_station_id", "from_station_name": "start_station_name", "to_station_id": "end_station_id", "to_station_name": "end_station_name"})

In [16]:
df_2020 = pdf[1]

In [17]:
overlapping_columns = df_2020.columns.intersection(df_2019.columns).tolist()

In [18]:
df_2019_subset = df_2019.loc[:, overlapping_columns]

In [19]:
df_2020_subset = df_2020.loc[:, overlapping_columns]

In [21]:
subset_list = [df_2019_subset, df_2020_subset]

In [22]:
combined_df = pd.concat(subset_list, ignore_index=True)

In [23]:
combined_df['end_station_id'] = combined_df['end_station_id'].astype('Int64')
combined_df['start_station_id'] = combined_df['start_station_id'].astype('Int64')

In [24]:
cols = combined_df.columns.tolist()

In [25]:
combined_df = combined_df[['ride_id','started_at', 'ended_at','start_station_id','start_station_name','end_station_id','end_station_name','year']]

In [26]:

combined_df['started_at']= pd.to_datetime(combined_df['started_at'])
combined_df['ended_at'] = pd.to_datetime(combined_df['ended_at'])

In [27]:
combined_df.fillna({'end_station_id': 0}, inplace=True)

In [28]:
combined_df.dtypes

Unnamed: 0,0
ride_id,object
started_at,datetime64[ns]
ended_at,datetime64[ns]
start_station_id,Int64
start_station_name,object
end_station_id,Int64
end_station_name,object
year,int64


In [29]:
schema = types.StructType([
    types.StructField("ride_id", types.StringType(), True),
    types.StructField("started_at", types.TimestampType(), True),
    types.StructField("ended_at", types.TimestampType(), True),
    types.StructField("start_station_id", types.LongType(), True),
    types.StructField("start_station_name", types.StringType(), True),
    types.StructField("end_station_id", types.LongType(), True),
    types.StructField("end_station_name", types.StringType(), True),
    types.StructField("year", types.LongType(), True),

])

In [30]:
spark = SparkSession.builder.appName("Exercise6").enableHiveSupport().getOrCreate()

In [42]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [31]:
spark.createDataFrame(combined_df).schema

StructType([StructField('ride_id', StringType(), True), StructField('started_at', TimestampType(), True), StructField('ended_at', TimestampType(), True), StructField('start_station_id', LongType(), True), StructField('start_station_name', StringType(), True), StructField('end_station_id', LongType(), True), StructField('end_station_name', StringType(), True), StructField('year', LongType(), True)])

In [32]:
sparkDF = spark.createDataFrame(combined_df, schema = schema)

In [34]:
sparkDF.show()

+--------+-------------------+-------------------+----------------+--------------------+--------------+--------------------+----+
| ride_id|         started_at|           ended_at|start_station_id|  start_station_name|end_station_id|    end_station_name|year|
+--------+-------------------+-------------------+----------------+--------------------+--------------+--------------------+----+
|25223640|2019-10-01 00:01:39|2019-10-01 00:17:20|              20|Sheffield Ave & K...|           309|Leavitt St & Armi...|2019|
|25223641|2019-10-01 00:02:16|2019-10-01 00:06:34|              19|Throop (Loomis) S...|           241| Morgan St & Polk St|2019|
|25223642|2019-10-01 00:04:32|2019-10-01 00:18:43|              84|Milwaukee Ave & G...|           199|Wabash Ave & Gran...|2019|
|25223643|2019-10-01 00:04:32|2019-10-01 00:43:43|             313|Lakeview Ave & Fu...|           290|Kedzie Ave & Palm...|2019|
|25223644|2019-10-01 00:04:34|2019-10-01 00:35:42|             210|Ashland Ave & Div...|  

In [None]:
def output_average_trip_duration(df, output_file_path):
  df_new = df.withColumn('start_time_ts', unix_timestamp('started_at'))\
                      .withColumn('end_time_ts', unix_timestamp('ended_at'))
  df_new = df_new.withColumn('days_diff', col('end_time_ts').cast('long') - col('start_time_ts').cast('long'))
  df_new = df_new.withColumn("day", to_date(col("started_at")))
  df_new = df_new.groupBy("day").agg(mean("days_diff").alias("trip_duration"))
  # select only day and average_trip_duration columns
  df_new = df_new.orderBy("day")
  df_new = df_new.dropna()
  write_to_csv(df_new, output_file_path)


In [None]:
output_average_trip_duration(sparkDF, "/content/drive/MyDrive/reports/report1")
#

In [None]:
def output_trips_per_day(df, output_file_path):
  df_new = df.withColumn("day", to_date(col("started_at")))
  df_new = df_new.groupBy("day").count()
  df_new = df_new.orderBy("day")
  write_to_csv(df_new, output_file_path)

In [None]:
output_trips_per_day(sparkDF, "/content/drive/MyDrive/reports/report2")

In [None]:
def output_most_popular_start_stations_for_each_month(df, output_file_path):
  df_new = df.withColumn("month", month(col("started_at")))
  df_new = df_new.dropna(subset=["start_station_id"])
  df_new = df_new.groupBy("month", "year", "start_station_id", "start_station_name").count()
  df_new = df_new.orderBy("count", "month", "year", ascending=[False, True, True])
  df_new.show()
  write_to_csv(df_new, output_file_path)

In [None]:
output_most_popular_start_stations_for_each_month(sparkDF, "/content/drive/MyDrive/reports/report3")

+-----+----+----------------+--------------------+-----+
|month|year|start_station_id|  start_station_name|count|
+-----+----+----------------+--------------------+-----+
|   10|2019|             192| Canal St & Adams St| 6564|
|   10|2019|              77|Clinton St & Madi...| 5340|
|   10|2019|              91|Clinton St & Wash...| 4921|
|   10|2019|              35|Streeter Dr & Gra...| 4553|
|   10|2019|              76|Lake Shore Dr & M...| 3981|
|   10|2019|             195|Columbus Dr & Ran...| 3925|
|   10|2019|             287|Franklin St & Mon...| 3741|
|   10|2019|             133|Kingsbury St & Ki...| 3497|
|   11|2019|             192| Canal St & Adams St| 3445|
|   10|2019|              81|  Daley Center Plaza| 3387|
|   10|2019|              43|Michigan Ave & Wa...| 3294|
|    1|2020|             192| Canal St & Adams St| 3241|
|    3|2020|             675|               HQ QR| 3210|
|   12|2019|             192| Canal St & Adams St| 2928|
|    1|2020|              77|Cl

In [94]:
def output_top_three_stations_each_day_for_last_two_weeks(df, output_file_path):
  df_new = df.withColumn("day", to_date(col("started_at")))
  df_new = df_new.withColumn("week", date_format(to_date("day", "yyyy-mm-dddd"), "W"))
  df_new = df_new.filter((col("week") == "3") | (col("week") == "4"))
  df_new = df_new.withColumn("station_usage", array(col("start_station_name"), col("end_station_name")))
  df_new = df_new.select("day", explode("station_usage").alias("station_name"))
  df_new = df_new.groupBy("day", "station_name").count()
  df_new = df_new.orderBy("day", "count", ascending=[True, False])
  window = Window.partitionBy("day").orderBy(col("count").desc())
  df_new = df_new.withColumn("rank", rank().over(window))
  df_new = df_new.filter(col("rank") <= 3)
  df_new = df_new.select("day", "station_name", "count")
  write_to_csv(df_new, output_file_path)

In [95]:
output_top_three_stations_each_day_for_last_two_weeks(sparkDF, "/content/drive/MyDrive/reports/report4")