In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import DataFrame, Column
import pandas as pd

In [0]:
spark = SparkSession.builder.appName("bikeshare").getOrCreate()

### Loading and Writing Data to Delta Lake

In [0]:
[ spark.sql(f"DROP TABLE IF EXISTS {table}") for table in ['payments', 'trips', 'riders', 'stations', 'trip_dates', 'payment_dates'] ]

payment_df = spark.read.format('csv').option('sep', ',').load('/FileStore/payments.csv')
trip_df = spark.read.format('csv').option('sep', ',').load('/FileStore/trips.csv')
rider_df = spark.read.format('csv').option('sep', ',').load('/FileStore/riders.csv')
station_df = spark.read.format('csv').option('sep', ',').load('/FileStore/stations.csv')


dataframes = {
    'payments': payment_df,
    'trips': trip_df,
    'riders': rider_df,
    'stations': station_df
}

for name, df in dataframes.items():
    df.write.format('delta').mode('overwrite').saveAsTable(name)


###    display raw data

In [0]:
# display only the first 10 records for each raw tables
for table_name, table in dataframes.items():
    displayHTML(f"<h3>{table_name}_table</h3>")  # Display the table name as a title
    # Convert list to DataFrame
    df = table.toPandas()
    
    # Display the first 10 rows
    displayHTML(df.head(10).to_html())


Unnamed: 0,_c0,_c1,_c2,_c3
0,1,2019-05-01,9.0,1000
1,2,2019-06-01,9.0,1000
2,3,2019-07-01,9.0,1000
3,4,2019-08-01,9.0,1000
4,5,2019-09-01,9.0,1000
5,6,2019-10-01,9.0,1000
6,7,2019-11-01,9.0,1000
7,8,2019-12-01,9.0,1000
8,9,2020-01-01,9.0,1000
9,10,2020-02-01,9.0,1000


Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
1,0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
2,E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
3,B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
4,83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
5,BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
6,A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
7,295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
8,362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
9,21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068


Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
0,1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1,1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
2,1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
3,1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
4,1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
5,1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
6,1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
7,1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
8,1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
9,1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


Unnamed: 0,_c0,_c1,_c2,_c3
0,525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
1,KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
2,637,Wood St & Chicago Ave,41.895634,-87.672069
3,13216,State St & 33rd St,41.8347335,-87.6258275
4,18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
5,KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
6,13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
7,KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
8,KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
9,TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


### Formatting the columns to reflect the schema design

In [0]:
def format_columns(table_path: str, column_rename_dict: dict, column_type_dict: dict):
    """
    Formats columns in a Spark table by renaming and casting types.

    Args:
        table_path (str): The path to the table.
        column_rename_dict (dict): A dictionary mapping old column names to new column names.
        column_type_dict (dict): A dictionary mapping column names to their new types.

    Returns:
        DataFrame: The transformed DataFrame.
    """
    # Read the table
    df = spark.read.table(table_path)

    # Rename columns
    df = df.select([col(c).alias(column_rename_dict.get(c, c)) for c in df.columns])

    # Cast column types
    df = df.select([col(c).cast(column_type_dict.get(c, df.schema[c].dataType)) for c in df.columns])

    # Write the transformed DataFrame back to the table
    df.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable(table_path)
    
    # Return the transformed DataFrame
    return df



# Column renaming and type definitions
columns_types = {
    'payments': ({'_c0': 'payment_id', '_c1': 'date_id', '_c2': 'amount', '_c3': 'rider_id'}, {'payment_id': 'int', 'amount': 'decimal', 'date_id': 'date', 'rider_id': 'int'}),
    'trips': ({'_c0': 'trip_id', '_c1': 'rideable_type', '_c2': 'started_at', '_c3': 'ended_at', '_c4': 'start_station_id', '_c5': 'end_station_id', '_c6': 'rider_id'}, {'trip_id': 'string', 'rideable_type': 'string', 'started_at': 'timestamp', 'ended_at': 'timestamp', 'start_station_id': 'string', 'end_station_id': 'string', 'rider_id': 'int'}),
    'riders': ({'_c0': 'rider_id', '_c1': 'first', '_c2': 'last', '_c3': 'address', '_c4': 'birthday', '_c5': 'account_start_date', '_c6': 'account_end_date', '_c7': 'is_member'}, {'rider_id': 'int', 'first': 'string', 'last': 'string', 'address': 'string', 'birthday': 'date', 'account_start_date': 'date', 'account_end_date': 'date', 'is_member': 'boolean'}),
    'stations': ({'_c0': 'station_id', '_c1': 'name', '_c2': 'latitude', '_c3': 'longitude'}, {'station_id': 'string', 'name': 'string', 'latitude': 'float', 'longitude': 'float'})
}

# Apply transformations for each table and display results
for table_name, (columns, types) in columns_types.items():
    # Apply transformations and get the transformed DataFrame
    transformed_df = format_columns(table_name, columns, types)
    
    # Display the table name as a title
    displayHTML(f"<h3>{table_name}_table</h3>")
    
    # Display only the first 10 records of the transformed DataFrame
    display(transformed_df.head(10))


###Adding columns to address business outcomes

In [0]:
# Read tables
dataframes = {
    'trips': spark.read.table('trips'),
    'riders': spark.read.table('riders'),
    'payments': spark.read.table('payments')
}

# Calculate trip duration and time_id
dataframes['trips'] = dataframes['trips'].withColumn("duration", (col("ended_at") - col("started_at")).cast("long")) \
                                         .withColumn("time_id", date_trunc("hour", col("started_at")))

# Calculate age at account start
dataframes['riders'] = dataframes['riders'].withColumn("age_at_account_start", (datediff(col("account_start_date"), col("birthday")) / 365).cast("int"))

# Write updated rider data
dataframes['riders'].write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable('riders')

# List rider columns excluding 'rider_id'
rider_columns = [col for col in dataframes['riders'].columns if col != 'rider_id']

# Join trip and rider data, calculate age at ride time
dataframes['trips'] = dataframes['trips'].join(dataframes['riders'].select('rider_id', 'birthday'), on='rider_id', how='inner') \
                                         .withColumn("age_at_ride_time", (datediff(to_date(col("started_at")), col("birthday")) / 365).cast("int")) \
                                         .select('trip_id', 'duration', 'rideable_type', 'age_at_ride_time', 'started_at', 'ended_at', 'start_station_id', 'end_station_id', 'time_id', 'rider_id')

# Write updated trip data
dataframes['trips'].write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable('trips')

# Write payment data
dataframes['payments'].select('payment_id', 'amount', 'date_id', 'rider_id').write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable('payments')


### Date Dimensions
Separate date dimension tables will be created for payment and trip data due to differences in their time granularity:

The trip date dimension captures time-of-day info (morning, afternoon, evening, night) at an hourly level. The payment date dimension focuses on spending trends by month, quarter, and year at a daily level.

In [0]:
# Read and cache tables
payment_df, trip_df = (spark.read.table('payments').cache(), spark.read.table('trips').cache())

# Get min and max dates for payment and trip
payment_min_date, payment_max_date = payment_df.select(min('date_id'), max('date_id')).first()
trip_min_date, trip_max_date = trip_df.select(min('time_id'), max('time_id')).first()

# Log date ranges
print(f"Trip Dates: {trip_min_date} to {trip_max_date}")
print(f"Payment Dates: {payment_min_date} to {payment_max_date}")

# Create date and time sequences
sequences = [
    spark.sql(f"SELECT explode(sequence(to_date('{payment_min_date}'), to_date('{payment_max_date}'), INTERVAL 1 DAY)) AS date").createOrReplaceTempView('payment_dates_view'),
    spark.sql(f"SELECT explode(sequence(to_timestamp('{trip_min_date}'), to_timestamp('{trip_max_date}'), INTERVAL 1 HOUR)) AS time").createOrReplaceTempView('trip_dates_view')
]


### Payment Dates View


In [0]:
%sql SELECT * FROM payment_dates_view LIMIT 20

### Trip Dates View


In [0]:
%sql SELECT * FROM trip_dates_view LIMIT 20

In [0]:
trip_dates_query = f"""
SELECT
    time AS time_id,
    dayofweek(time) AS day_of_week,
    CASE 
        WHEN hour(time) BETWEEN 5 AND 11 THEN 'morning'
        WHEN hour(time) BETWEEN 12 AND 16 THEN 'afternoon'
        WHEN hour(time) BETWEEN 17 AND 21 THEN 'evening'
        ELSE 'night'
    END AS time_of_day
FROM trip_dates_view
ORDER BY time
"""

trip_dates = spark.sql(trip_dates_query)
trip_dates.write.format('delta').mode('overwrite').saveAsTable('trip_dates')


In [0]:
# Define the SQL query  for payment dates
payment_dates_query = f"""
SELECT
    date AS date_id,
    month(date) AS month,
    quarter(date) AS quarter,
    year(date) AS year
FROM payment_dates_view
ORDER BY date
"""

payment_dates = spark.sql(payment_dates_query)
payment_dates.write.format('delta').mode('overwrite').saveAsTable('payment_dates')


## Business Questions to Address

- Analyze how much time is spent per ride
  * Based on date and time factors such as day of week and time of day
  * Based on which station is the starting and / or ending station
  * Based on age of the rider at time of the ride
  * Based on whether the rider is a member or a casual rider
- Analyze how much money is spent
  * Per month, quarter, year
  * Per member, based on the age of the rider at account start
- EXTRA CREDIT - Analyze how much money is spent per member
  * Based on how many rides the rider averages per month
  * Based on how many minutes the rider spends on a bike per month

In [0]:
# Load the fact and dimension tables
tables = ['payments', 'trips', 'riders', 'stations', 'trip_dates', 'payment_dates']
payment_df, trip_df, rider_df, station_df, trip_date_df, payment_date_df = [spark.read.table(table) for table in tables]


## Trip Table

In [0]:
def analyze_trip_data(df: DataFrame, group_col: str, agg_func: Column, alias: str, title: str) -> None:
    """
    Analyze trip data with aggregation and display results with a title.
    
    Args:
        df (DataFrame): Trip data.
        group_col (str): Column to group by.
        agg_func (Column): Aggregation function (avg, sum).
        alias (str): Alias for the aggregated column.
        title (str): Title for the results.

    Returns:
        None
    """
    result_df = df.join(trip_date_df, 'time_id')\
                  .groupBy(group_col)\
                  .agg(agg_func('duration').alias(alias))\
                  .orderBy(alias, ascending=False)
    
    # Display the title and results
    displayHTML(f"<h3>{title}</h3>")
    result_df.show()

# Analysis examples with titles
analyze_trip_data(trip_df, 'day_of_week', avg, 'duration_in_seconds_avg', 'Avg Duration per Ride by Day of Week')
analyze_trip_data(trip_df, 'day_of_week', sum, 'duration_in_seconds_sum', 'Total Duration per Ride by Day of Week')
analyze_trip_data(trip_df, 'time_of_day', avg, 'duration_in_seconds_avg', 'Avg Duration per Ride by Time of Day')
analyze_trip_data(trip_df, 'time_of_day', sum, 'duration_in_seconds_sum', 'Total Duration per Ride by Time of Day')


In [0]:
def analyze_duration(df: DataFrame, group_col: str, agg_func: Column, alias: str, title: str) -> None:
    """
    Analyze duration data by grouping and applying an aggregation function.

    Args:
        df (DataFrame): DataFrame containing duration data.
        group_col (str): Column to group by.
        agg_func (Column): Aggregation function (avg, sum).
        alias (str): Alias for the aggregated column.
        title (str): Title for the results.

    Returns:
        None
    """
    result_df = df.groupBy(group_col)\
                  .agg(agg_func('duration').alias(alias))\
                  .orderBy(alias, ascending=False)
    
    # Display title and results
    displayHTML(f"<h3>{title}</h3>")
    result_df.show()

# Avg and total duration per ride by start station
analyze_duration(trip_df, 'start_station_id', avg, 'duration_in_seconds_avg', 'Avg Duration per Ride by Start Station')
analyze_duration(trip_df, 'start_station_id', sum, 'duration_in_seconds_sum', 'Total Duration per Ride by Start Station')

# Avg and total duration per ride by end station
analyze_duration(trip_df, 'end_station_id', avg, 'duration_in_seconds_avg', 'Avg Duration per Ride by End Station')
analyze_duration(trip_df, 'end_station_id', sum, 'duration_in_seconds_sum', 'Total Duration per Ride by End Station')


In [0]:
def analyze_duration_by_age(df: DataFrame, group_col: str, agg_func: Column, alias: str, title: str) -> None:
    """
    Analyze duration data by joining with the rider DataFrame, grouping by the specified column, and applying an aggregation function.

    Args:
        df (DataFrame): DataFrame containing duration data.
        group_col (str): Column to group by.
        agg_func (Column): Aggregation function (avg, sum).
        alias (str): Alias for the aggregated column.
        title (str): Title for the results.

    Returns:
        None
    """
    result_df = df.join(rider_df, df.rider_id == rider_df.rider_id)\
                  .groupBy(group_col)\
                  .agg(agg_func('duration').alias(alias))\
                  .orderBy(alias, ascending=False)
    
    # Display title and results
    displayHTML(f"<h3>{title}</h3>")
    result_df.show()

# Avg and total duration by age at account start
analyze_duration_by_age(trip_df, 'age_at_account_start', avg, 'duration_in_seconds_avg', 'Avg Duration by Age at Account Start')
analyze_duration_by_age(trip_df, 'age_at_account_start', sum, 'duration_in_seconds_sum', 'Total Duration by Age at Account Start')


In [0]:
def analyze_duration_by_membership(df: DataFrame, group_col: str, agg_func: Column, alias: str, title: str) -> None:
    """
    Analyze duration data by joining with the rider DataFrame, grouping by membership status, and applying an aggregation function.

    Args:
        df (DataFrame): DataFrame containing duration data.
        group_col (str): Column to group by.
        agg_func (Column): Aggregation function (avg, sum).
        alias (str): Alias for the aggregated column.
        title (str): Title for the results.

    Returns:
        None
    """
    result_df = df.join(rider_df, 'rider_id')\
                  .groupBy(group_col)\
                  .agg(agg_func('duration').alias(alias))\
                  .orderBy(alias, ascending=False)
    
    # Display title and results
    displayHTML(f"<h3>{title}</h3>")
    result_df.show()

# Avg and total duration by rider membership
analyze_duration_by_membership(trip_df, 'is_member', avg, 'duration_in_seconds_avg', 'Avg Duration by Rider Membership')
analyze_duration_by_membership(trip_df, 'is_member', sum, 'duration_in_seconds_sum', 'Total Duration by Rider Membership')


## Payment Table Queries For Analyzing Payment Data

In [0]:
def analyze_payment_data(df: DataFrame, group_col: str, agg_funcs: list, aliases: list, titles: list):
    """
    Analyze payment data by joining with the payment date DataFrame, grouping by the specified column, and applying aggregation functions.

    Args:
        df (DataFrame): DataFrame containing payment data.
        group_col (str): Column to group by.
        agg_funcs (list): List of aggregation functions (e.g., sum, avg).
        aliases (list): List of aliases for the aggregated columns.
        titles (list): List of titles for the results.

    Returns:
        None
    """
    for agg_func, alias, title in zip(agg_funcs, aliases, titles):
        result_df = df.join(payment_date_df, 'date_id')\
                      .groupBy(group_col)\
                      .agg(agg_func('amount').alias(alias))\
                      .orderBy(alias, ascending=False)
        
        # Display title and results
        displayHTML(f"<h3>{title}</h3>")
        result_df.show()

# Aggregation functions, their aliases, and titles
agg_funcs = [sum, avg]
aliases = ['amount_sum', 'amount_avg']
titles = ['Total Amount', 'Average Amount']

# Analyze spending by different time periods
for group_col in ['month', 'quarter', 'year']:
    analyze_payment_data(payment_df, group_col, agg_funcs, aliases, [f'Total Amount by {group_col.capitalize()}', f'Average Amount by {group_col.capitalize()}'])


In [0]:
def analyze_member_payment_data(df: DataFrame, group_col: str, agg_func: Column, alias: str, title: str) -> None:
    """
    Analyze payment data for members by joining with the rider DataFrame, 
    grouping by the specified column, and applying an aggregation function.

    Args:
        df (DataFrame): DataFrame containing payment data.
        group_col (str): Column to group by.
        agg_func (Column): Aggregation function (avg, sum).
        alias (str): Alias for the aggregated column.
        title (str): Title for the results.

    Returns:
        None
    """
    result_df = df.join(rider_df, 'rider_id')\
                  .where(rider_df.is_member == True)\
                  .groupBy(group_col)\
                  .agg(agg_func('amount').alias(alias))\
                  .orderBy(alias, ascending=False)
    
    # Display title and results
    displayHTML(f"<h3>{title}</h3>")
    result_df.show()

# Aggregation functions and their aliases
agg_funcs = [avg, sum]
aliases = ['amount_avg', 'amount_sum']

# Analyze spending by members by age at account start
titles = ['Average Amount by Age at Account Start (Members)', 'Total Amount by Age at Account Start (Members)']
for agg_func, alias, title in zip(agg_funcs, aliases, titles):
    analyze_member_payment_data(payment_df, 'age_at_account_start', agg_func, alias, title)


## Extra Credit 

In [0]:
# Avg spending per member by monthly ride count
result_df = trip_df.join(payment_df, 'rider_id')\
    .select('rider_id', 'time_id', 'amount', 'trip_id')\
    .join(rider_df.where(rider_df.is_member == True), 'rider_id')\
    .withColumn('month', month('time_id'))\
    .groupBy('rider_id', 'month')\
    .agg(avg('amount').alias('avg_amount'), count('trip_id').alias('num_rides'))\
    .orderBy('num_rides', ascending=False)

# Display title and results
displayHTML("<h3>Average Spending per Member by Monthly Ride Count</h3>")
result_df.show()


In [0]:
# Avg spending per member by monthly bike usage
result_df = trip_df.join(rider_df, 'rider_id')\
    .join(payment_df, 'rider_id')\
    .filter(rider_df.is_member)\
    .withColumn('month', month('time_id'))\
    .withColumn('minutes', (trip_df.duration / 60).cast('int'))\
    .groupBy('rider_id', 'minutes', 'month')\
    .agg(
        avg('amount').alias('avg_amount'),
        avg('duration').alias('avg_duration')
    )\
    .orderBy('avg_duration', ascending=False)

# Display title and results
displayHTML("<h3>Average Spending per Member by Monthly Bike Usage</h3>")
result_df.show()
