In [0]:
from pyspark.sql.functions import explode, sequence, to_date

In [0]:
spark.sparkContext.getConf().getAll()

### Read the uploaded files into DataFrames

In [0]:
payments_df = spark.read.format("csv") \
      .option("sep", ",") \
      .load("/FileStore/data/payments.csv") 

trips_df = spark.read.format("csv") \
      .option("sep", ",") \
      .load("/FileStore/data/trips.csv") 

riders_df = spark.read.format("csv") \
      .option("sep", ",") \
      .load("/FileStore/data/riders.csv") 

stations_df = spark.read.format("csv")\
    .option("sep",",")\
    .load("/FileStore/data/stations.csv")

### Renaming Columns and displaying the data

In [0]:
trips_list = ["trip_id" ,"rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id"] 
trips_df = trips_df.toDF(*trips_list)
trips_df.show()

payments_list = ["payment_id","date","amount","rider_id" ]
payments_df = payments_df.toDF(*payments_list)
payments_df.show()

stations_list = ["station_id","name","latitude","longitude" ]
stations_df = stations_df.toDF(*stations_list)
stations_df.show()

riders_list = ["rider_id", "first_name", "last_name", "address", "birthday", "account_start_date", "account_end_date", "is_member" ]
riders_df = riders_df.toDF(*riders_list)
riders_df.show()

### Creation of Bronze Layer. These tables hold the data in the raw format (same as the files)

In [0]:
#Store into delta files
payments_df.write.format("delta").mode("overwrite").save("payments_data")
trips_df.write.format("delta").mode("overwrite").save("trips_data")
riders_df.write.format("delta").mode("overwrite").save("riders_data")
stations_df.write.format("delta").mode("overwrite").save("stations_data")

#create bronze layer tables
spark.sql("CREATE TABLE bronze_payments USING DELTA LOCATION '/delta/payments_data'")
spark.sql("CREATE TABLE bronze_trips USING DELTA LOCATION '/delta/trips_data'")
spark.sql("CREATE TABLE bronze_riders USING DELTA LOCATION '/delta/riders_data'")
spark.sql("CREATE TABLE bronze_stations USING DELTA LOCATION '/delta/stations_data'")

## Creation of Silver Layer.

### Creating the Date Dimension Table

In [0]:
dates = spark.sql("SELECT to_timestamp(max(ended_at)),to_timestamp(date_add(min(start_at), -1500)) FROM bronze_trips")
beginDate = dates.first()[1]
endDate = dates.first()[0]

(
  spark.sql(f"select explode(sequence(to_timestamp('{beginDate}'), to_timestamp('{endDate}'), interval 1 day)) as calendarDate")
    .createOrReplaceTempView('dimdates')
)

dimDate_df = spark.sql('''SELECT calendarDate as date_key,
                    calendarDate as datetime,
                    date_part('YEAR',calendarDate)  as Year,
                    date_part('QUARTER', calendarDate) as Quarter,
                    date_part('MONTH', calendarDate) as Month,
                    date_part('DAY',calendarDate) as Day,
                    date_part('WEEK',calendarDate) as Week,
                    date_part('HOUR',calendarDate) as Hour,
                    date_part('MINUTE',calendarDate) as Minute,
                    date_part('SECOND',calendarDate) as Second,
                    CASE WHEN date_part('DAYOFWEEK',calendarDate) IN (6,7) THEN 1 ELSE 0 END as is_weekend                    
                    FROM dimDates
                    ''')
dimDate_df.write.format("delta").mode("overwrite").saveAsTable("silver_dimDate")

### Creating the Rider Dimension Table

In [0]:
dimRider_df = spark.sql('''SELECT DISTINCT rider_id as rider_key,
                            rider_id as rider_id,
                            first_name as first_name,
                            last_name as last_name,
                            to_timestamp(birthday) as birthday,
                            is_member as ismember
                            FROM bronze_riders
                            ''')
dimRider_df.write.format("delta").mode("overwrite").saveAsTable("silver_dimRider")

### Creating the Station Dimension Table

In [0]:
dimStation_df = spark.sql('''SELECT DISTINCT station_id as station_key,
                            station_id as station_id,
                            name as name,
                            latitude as latitude,
                            longitude as longitude
                            FROM bronze_stations''');

dimStation_df.write.format("delta").mode("overwrite").saveAsTable("silver_dimStation")

### Creating the Payments Fact Table

In [0]:
factPayment_df = spark.sql('''SELECT DISTINCT p.payment_id as payment_key,
                                p.payment_id as payment_id,
                                p.amount as amount,
                                d.date_key as date_key,
                                r.rider_key as rider_key
                                FROM bronze_payments p
                                JOIN silver_dimDate d ON p.date = to_date(d.date_key)
                                JOIN silver_dimRider r on p.rider_id = r.rider_key''')
factPayment_df.write.format("delta").mode("overwrite").saveAsTable("silver_factPayment")

### Creating the Trips Fact Table

In [0]:
factTrip_df = spark.sql('''SELECT  DISTINCT trip.trip_id as trip_key,
                    trip.trip_id as trip_id,
                    td1.date_key as start_date_key,  
                    td2.date_key as end_date_key,               
                    rider.rider_key as rider_key,
                    DATEDIFF(second, trip.start_at,trip.ended_at) as duration,
                    st1.station_key as start_station_key,
                    st2.station_key as end_station_key,
                    DATEDIFF(MONTH, rider.birthday,trip.start_at) as rider_age,                    
                    trip.rideable_type as rideable_type
            FROM bronze_trips trip
            LEFT JOIN silver_dimDate td1 ON date_trunc("Day",td1.datetime) = date_trunc("Day",trip.start_at)
            LEFT JOIN silver_dimDate td2 ON DATE_TRUNC("Day",td2.datetime) = DATE_TRUNC("Day",trip.ended_at)
            JOIN silver_dimRider rider ON trip.rider_id = rider.rider_id
            JOIN silver_dimStation st1 ON trip.start_station_id = st1.station_id
            JOIN silver_dimStation st2 ON trip.end_station_id = st2.station_id''')
factTrip_df.write.format("delta").mode("overwrite").saveAsTable("silver_factTrip")

## Business Queries for which Gold Layer tables are created.

### Analyze how much time is spent per ride Based on time of day
### gold_duration_by_hour

In [0]:
dur_by_hour_df = spark.sql('''SELECT AVG(DURATION) , dt.HOUR
FROM silver_factTrip trip
INNER JOIN silver_dimDate dt
ON trip.start_date_key = dt.start_date_key
group by dt.hour''')
dur_by_hour_df.write.format("delta").mode("overwrite").saveAsTable("gold_duration_by_hour")

### Analyze how much time is spent per ride Based on which station is the starting and / or ending station
### gold_duration_by_station

In [0]:
dur_by_station_df = spark.sql('''SELECT AVG(trip.duration),st.name
FROM silver_factTrip trip
INNER JOIN silver_dimStation st
ON trip.start_station_key = st.station_key
WHERE ST.NAME = 'Glenwood Ave & Touhy Ave'
GROUP BY st.name''')
dur_by_station_df.write.format("delta").mode("overwrite").saveAsTable("gold_duration_by_station")

### Analyze how much time is spent per ride Based on age of the rider at time of the ride
### gold_duration_by_riderage

In [0]:
dur_by_age_df = spark.sql('''
SELECT AVG(duration), age
FROM silver_factTrip
group by age''')
dur_by_age_df.write.format("delta").mode("overwrite").saveAsTable("gold_duration_by_riderage")

### Analyze how much money is spent month
### gold_amount_by_month

In [0]:
amt_by_mnt_df = spark.sql('''SELECT SUM(amount) , dt.month, dt.year, dt.quarter
FROM silver_factPayment p
INNER JOIN silver dimDate dt
ON p.date_key = dt.date_key
where dt.year = '2021'
GROUP BY dt.month, dt.year, dt.quarter''')
amt_by_mnt_df.write.format("delta").mode("overwrite").saveAsTable("gold_amount_by_month")

### Analyze how much money is spent Per member, based on the age of the rider at account start
### gold_amount_by_rider

In [0]:
amount_by_rider_df = spark.sql('''SELECT SUM(AMOUNT), R.FIRST_NAME
FROM silver_factPayment p
INNER JOIN silver_dimRider r
WHERE r.age = 25
GROUP BY r.first_name''')
amount_by_rider_df.write.format("delta").mode("overwrite").saveAsTable("gold_amount_by_rider")

### Analyze how much money is spent per member Based on how many rides the rider averages per month
### gold_rides_per_rider_month

In [0]:
rides_df = spark.sql('''SELECT count(trip_id), r.first_name,r.rider_key, dt.month
FROM silver_factTrip t
INNER JOIN silver_dimRider r ON t.rider_key = r.rider_key
INNER JOIN silver_dimDate d ON t.date_key = d.date_key
GROUP BY r.first_name , r.rider_key,dt.month''')
rides_df.write.format("delta").mode("overwrite").saveAsTable("gold_rides_per_rider_month")



### Analyze how much money is spent per member Based on how many minutes the rider spends on a bike per month
### gold_duration_rider

In [0]:

dur_rider_df = spark.sql('''SELECT sum(minutes) , r.rider_key, r.first_name
FROM silver_factTrip t
INNER JOIN silver_dimDate d ON t.date_key = d.date_key
INNER JOIN silver_dimRider r on t.rider_key = r.rider_key''')
dur_rider_df.write.format("delta").mode("overwrite").saveAsTable("gold_duration_rider")