This project consists of five tables. For creating dimensions STATION and RIDER, and facts PAYMENT and TRIP run this script. For making Date table run separately provided dimDate script.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

1. Creating STATION table

In [0]:

schema = StructType([\
    StructField("StationId", StringType(), False),\
    StructField("Name", StringType(), True),\
    StructField("Latitude", FloatType(), True),\
    StructField("Longitude", FloatType(), True)])

df_station = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load("/FileStore/proj4/stations.csv")

In [0]:
df_station.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "false") \
    .save("/delta/stations")

In [0]:
df_station_gold = df_station 
df_station_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_stations")

2 Creating RIDER table

In [0]:
schema = StructType([\
                     StructField("RiderId", IntegerType(), False),\
                     StructField("First", StringType(), True),\
                     StructField("Last", StringType(), True),\
                     StructField("Address", StringType(), True),\
                     StructField("Birthday", DateType(), True),\
                     StructField("AccountStartDate", DateType(), True),\
                     StructField("AccountEndDate", DateType(), True),\
                     StructField("IsMember", BooleanType(), True)
                     ])

df_rider = spark.read.format("csv")\
    .option("header", "false")\
    .schema(schema)\
    .load("/FileStore/proj4/riders.csv")

In [0]:
df_rider.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "false") \
    .save("/delta/riders")

In [0]:
df_riders_gold = df_rider
df_riders_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_riders")

3 Create PAYMENT table

In [0]:
schema = StructType([\
                     StructField("PaymentId", IntegerType(), False),\
                     StructField("Date", DateType(), True),\
                     StructField("Amount", DoubleType(), True),\
                     StructField("RiderId", IntegerType(), True)
                     ])

df_payment = spark.read.format("csv")\
    .option("header", "false")\
    .schema(schema)\
    .load("/FileStore/proj4/payments.csv")

In [0]:
df_payment.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "false") \
    .save("/delta/payments")

In [0]:
df_payments_gold = df_payment
df_payments_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payments")

4 Creating TRIP table

In [0]:
schema = StructType([\
                     StructField("TripId", StringType(), False),\
                     StructField("RideableType", StringType(), True),\
                     StructField("StartAt", TimestampType(), True),\
                     StructField("EndedAt", TimestampType(), True),\
                     StructField("StartStationId", StringType(), True),\
                     StructField("EndStationId", StringType(), True),\
                     StructField("RiderId", IntegerType(), True)])

df_trips = spark.read.format("csv")\
    .option("header", "false")\
    .schema(schema)\
    .load("/FileStore/proj4/trips.csv")

In [0]:
df_trips.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "false") \
    .save("/delta/trips")

Adding 'Duration' column to the TRIP table

In [0]:
import pyspark.sql.functions as F
df_trips_with_duration = df_trips.withColumn('Duration', ((F.col('EndedAt') - F.col('StartAt'))).cast("Int"))
df_trips_with_duration.select('TripId','RideableType','StartAt', 'EndedAt','Duration','StartStationId','EndStationId','RiderId')


Adding 'Rider Age at Time of Trip' column to TRIP table

In [0]:
df_trips = df_trips_with_duration
df_riders = spark.table("dim_riders")

df_gold_trips = df_trips.join(df_riders, (['RiderId']), how="inner").\
    select('TripId',
           'RideableType',
           'StartAt',
           'EndedAt',
           'Duration',
           'StartStationId',
           'EndStationId',
           'RiderId',
           (months_between(df_trips.StartAt, df_riders.Birthday)/12).cast("Int").alias("RiderAgeAtTimeOfTrip"))


df_gold_trips.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trips")

In [0]:
# This is utility instructions for deleting files from delta partition. Uncomment the desired line and run the cell.


# dbutils.fs.rm("dbfs:/delta/stations",True)
# dbutils.fs.rm("dbfs:/delta/riders",True)
# dbutils.fs.rm("dbfs:/delta/payments",True)
# dbutils.fs.rm("dbfs:/delta/trips",True)
# dbutils.fs.rm("dbfs:/delta/Date",True)