# Gold Stage ! Ster Schema  + Calendar Table Range

In [0]:
from pyspark.sql.functions import min, max, col
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType, FloatType
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np

## LOAD Silver Data

In [0]:
df_silver_rider = spark.table("default.silver_riders")

In [0]:
df_silver_station = spark.table("default.silver_stations")

In [0]:
df_silver_trip = spark.table("default.silver_trips")

In [0]:
df_silver_payments = spark.table("default.silver_payments")

# Calendar Table DIM DF

## Range of Calendar Table

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Get min and max for our Calendar Table


# Print the minimum and maximum values of the "date" column in the "df_silver_payments" DataFrame
print(df_silver_payments.select(min("date"), max("date")).collect())

# Print the minimum value of the "start_at" column and the maximum value of the "ended_at" column in the "df_silver_trip" DataFrame
print(df_silver_trip.select(min("started_at"), max("ended_at")).collect())



[Row(min(date)=datetime.date(2013, 2, 1), max(date)=datetime.date(2022, 2, 1))]
[Row(min(started_at)=datetime.datetime(2022, 1, 1, 0, 0), max(ended_at)=datetime.datetime(2022, 12, 31, 23, 0, 0, 58000))]


In [0]:
#function to Produce the Dimension Calendar Table
def dimension_datetime_frame(start='2013-01-01', end='2013-12-31'):
    """ Return a ready  Dimension Calendar Table frame with precision of seconds"""
    df = pd.DataFrame({"DateTime": pd.date_range(start=start, end=end, freq="S")})
    df["second"] = df.DateTime.dt.second
    df["minute"] = df.DateTime.dt.minute
    df["hour"] = df.DateTime.dt.hour
    df["day"] = df.DateTime.dt.day
    df["dayofweek"] = df.DateTime.dt.dayofweek
    df["is_weekend"]= df.DateTime.dt.dayofweek > 4
    df["month"] = df.DateTime.dt.month
    df["Quarter"] = df.DateTime.dt.quarter
    df["Year"] = df.DateTime.dt.year
    return df

In [0]:
calendar_df = dimension_datetime_frame()

In [0]:
calendar_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 31449601 entries, 0 to 31449600
Data columns (total 10 columns):
 #   Column      Dtype         
---  ------      -----         
 0   DateTime    datetime64[ns]
 1   second      int64         
 2   minute      int64         
 3   hour        int64         
 4   day         int64         
 5   dayofweek   int64         
 6   is_weekend  bool          
 7   month       int64         
 8   Quarter     int64         
 9   Year        int64         
dtypes: bool(1), datetime64[ns](1), int64(8)
memory usage: 2.1 GB


In [0]:
calendar_df = spark.createDataFrame(calendar_df)

In [0]:
calendar_df.printSchema()

root
 |-- DateTime: timestamp (nullable = true)
 |-- second: long (nullable = true)
 |-- minute: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- day: long (nullable = true)
 |-- dayofweek: long (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- month: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Year: long (nullable = true)



### Write to Tables

In [0]:
calendar_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dimDate")

## Riders dim DF

In [0]:
# Define the schema for the riders dimension table
schema = StructType([
    StructField("rider_key", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("birthday", DateType(), True),
    StructField("account_start_date", TimestampType(), True),
    StructField("account_end_date", TimestampType(), True),
    StructField("is_member", BooleanType(), True)
])

# Create an empty DataFrame with the defined schema
df_dim_riders = spark.createDataFrame([], schema)



### Concat

In [0]:
df_dim_riders = df_dim_riders.union(df_silver_rider.select(
    col('rider_id').alias('rider_key'), 
    col('first').alias('first_name'), 
    col('last').alias('last_name'), 
    col('address'), 
    col('birthday'), 
    col('account_start_date').alias('account_start_date'), 
    col('account_end_date').alias('account_end_date'), 
    col('is_member')
))


In [0]:
df_dim_riders.display()
df_dim_riders.printSchema()

rider_key,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Kimberly,Williams,1200 Alyssa Squares,1988-03-28,2019-04-23T00:00:00.000+0000,,True
1001,Anthony,Erickson,397 Diana Ferry,1976-12-04,2019-11-01T00:00:00.000+0000,2020-09-01T00:00:00.000+0000,True
1002,Jessica,Roach,644 Brittany Row Apt. 097,1998-03-28,2022-02-04T00:00:00.000+0000,,True
1003,Andrew,Ryan,996 Dickerson Turnpike,1999-03-05,2019-08-26T00:00:00.000+0000,,False
1004,Ian,Peters,7009 Nathan Expressway,1969-06-25,2019-09-14T00:00:00.000+0000,,True
1005,Michael,Gillespie,224 Washington Mills Apt. 467,1974-09-28,2020-03-24T00:00:00.000+0000,,False
1006,Ryan,Peters,1137 Angela Locks,2003-07-10,2020-11-27T00:00:00.000+0000,2021-06-01T00:00:00.000+0000,True
1007,Crystal,Sanchez,979 Phillips Ways,1987-10-15,2016-12-11T00:00:00.000+0000,,False
1008,David,Hicks,7691 Evans Court,1986-07-12,2021-03-28T00:00:00.000+0000,2021-04-01T00:00:00.000+0000,True
1009,Daniel,Hicks,9922 Jim Crest Apt. 319,1981-02-14,2020-06-12T00:00:00.000+0000,2021-02-01T00:00:00.000+0000,True


root
 |-- rider_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: timestamp (nullable = true)
 |-- account_end_date: timestamp (nullable = true)
 |-- is_member: boolean (nullable = true)



In [0]:
# Set the rider_key column as the DataFrame index
df_dim_riders = df_dim_riders.withColumn("rider_key", df_dim_riders["rider_key"].cast(IntegerType()))
df_dim_riders = df_dim_riders.orderBy('rider_key')
df_dim_riders = df_dim_riders.dropDuplicates(['rider_key'])
df_dim_riders = df_dim_riders.repartition('rider_key')

In [0]:
df_dim_riders.display()
df_dim_riders.printSchema()

rider_key,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1025,Christopher,Sanders,5182 Michelle Place Apt. 142,2004-08-23,2020-05-01T00:00:00.000+0000,,False
1005,Michael,Gillespie,224 Washington Mills Apt. 467,1974-09-28,2020-03-24T00:00:00.000+0000,,False
1016,Misty,Scott,72226 Casey Square,1991-10-30,2022-02-02T00:00:00.000+0000,,True
1031,Warren,Lynn,47822 Darrell Green,1986-11-23,2018-08-16T00:00:00.000+0000,,True
1030,Deborah,Adams,2722 Kaitlin Street Suite 253,2000-07-21,2021-02-09T00:00:00.000+0000,,True
1034,David,Hall,9001 Carrie Alley,1988-07-14,2020-03-08T00:00:00.000+0000,,False
1019,David,Thompson,00348 Brandi Parks Suite 405,1997-07-01,2021-07-10T00:00:00.000+0000,2021-08-01T00:00:00.000+0000,False
1046,Patrick,Carrillo,351 Bill Curve,2005-03-05,2020-07-08T00:00:00.000+0000,,True
1008,David,Hicks,7691 Evans Court,1986-07-12,2021-03-28T00:00:00.000+0000,2021-04-01T00:00:00.000+0000,True
1047,Julian,Castro,2639 Ronald Summit Apt. 385,1988-11-04,2021-02-23T00:00:00.000+0000,2021-09-01T00:00:00.000+0000,True


root
 |-- rider_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: timestamp (nullable = true)
 |-- account_end_date: timestamp (nullable = true)
 |-- is_member: boolean (nullable = true)



#### Write To Table

In [0]:
df_dim_riders.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dimRiders")

## Station dim DF

In [0]:
# Define the schema for the riders dimension table
schema = StructType([
    StructField("station_key", StringType(), True),
    StructField("station_name", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True)
])

# Create an empty DataFrame with the defined schema
df_dim_station = spark.createDataFrame([], schema)

### Concatenate df_silver_station_batch with df_dim_station

In [0]:
df_dim_station = data_dim_station.union(df_silver_station)

In [0]:
df_dim_station.display()
df_dim_station.printSchema()

station_key,station_name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


root
 |-- station_key: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [0]:
# Set the station_key column as the DataFrame index
df_dim_station = df_dim_station.orderBy('station_key')
df_dim_station = df_dim_station.dropDuplicates(['station_key'])
df_dim_station = df_dim_station.repartition('station_key')

### Write To Table

In [0]:
df_dim_station.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_dimStations")

## Trip Fact DF

In [0]:
# Define the schema for the riders dimension table
schema = StructType([
    StructField("trip_key", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("start_date_id", TimestampType(), True),
    StructField("ended_date_id", TimestampType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("rider_id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("trip_duration", TimestampType(), True)
])

# Create an empty DataFrame with the defined schema
df_fact_trip = spark.createDataFrame([], schema)

### Merge the trip, dimDate (twice), and dimRiders data frames based on the required columns

In [0]:
df_fact_trip_batch = df_silver_trip \
    .join(calendar_df.withColumnRenamed("DateTime", "start_date_time"), 
          df_silver_trip.started_at == col("start_date_time"), "inner") \
    .join(calendar_df.withColumnRenamed("DateTime", "end_date_time"), 
          df_silver_trip.ended_at == col("end_date_time"), "inner") \
    .join(df_dim_riders, df_silver_trip.rider_id == df_dim_riders.rider_key, "inner") \
    .select(df_silver_trip.trip_id, df_silver_trip.rideable_type, "start_date_time", "end_date_time", 
            df_silver_trip.start_station_id, df_silver_trip.end_station_id, df_silver_trip.rider_id, 
            df_dim_riders.birthday)


### Create Age Column

In [0]:
# Method 1 , wrong ! gives wrong data for trips under 1 year, ### FIX ME
"""
df_fact_trip_batch['age'] = (df_fact_trip_batch['start_at'] - df_fact_trip_batch['birthday']).astype('<m8[Y]')
"""
# Method 2
# Define a UDF to calculate age from birth year
def calculate_age(birthday, start_at):
    anniversary_date = datetime(start_at.year, birthday.month, birthday.day)
    age = start_at.year - birthday.year - ((start_at < anniversary_date))
    return age

# Register the UDF
calculate_age_udf = udf(lambda birthday, start_at: calculate_age(birthday, start_at), IntegerType())

# Apply the UDF to the dob column and store the result in a new column called age
df_fact_trip_batch = df_fact_trip_batch.withColumn('age', calculate_age_udf(df_fact_trip_batch.birthday, df_fact_trip_batch.start_date_time))

### Create Trip Duration Column

In [0]:
df_fact_trip_batch = df_fact_trip_batch.withColumn("trip_duration", (col("end_date_time") - col("start_date_time")))

### Rename the 'DateTime' columns to 'start_date_id' and 'ended_date_id', and select the required columns:

In [0]:
df_fact_trip_batch = df_fact_trip_batch.withColumnRenamed('start_date_time', 'start_date_id')
df_fact_trip_batch = df_fact_trip_batch.withColumnRenamed('end_date_time', 'ended_date_id')
df_fact_trip_batch = df_fact_trip_batch.withColumnRenamed('trip_id', 'trip_key')
df_fact_trip_batch = df_fact_trip_batch.withColumn("trip_duration", to_timestamp("trip_duration", "yyyy-MM-dd HH:mm:ss"))
df_fact_trip_batch = df_fact_trip_batch[['trip_key', 'rideable_type', 'start_date_id', 'ended_date_id', 'start_station_id', 'end_station_id', 'rider_id', 'age', 'trip_duration']]

In [0]:
df_fact_trip_batch.display()
df_fact_trip_batch.printSchema()

trip_key,rideable_type,start_date_id,ended_date_id,start_station_id,end_station_id,rider_id,age,trip_duration


root
 |-- trip_key: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- start_date_id: timestamp (nullable = true)
 |-- ended_date_id: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- trip_duration: timestamp (nullable = true)



### Concat

In [0]:
df_fact_trip = df_fact_trip.union(df_fact_trip_batch)

In [0]:
# Set the station_key column as the DataFrame index
df_fact_trip = df_fact_trip.orderBy('trip_key')
df_fact_trip = df_fact_trip.dropDuplicates(['trip_key'])
df_fact_trip = df_fact_trip.repartition('trip_key')

### Write To Table

In [0]:
df_fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_factTrips")

## Payments Fact DF

In [0]:
# Define the schema for the riders dimension table
schema = StructType([
    StructField("payment_id", IntegerType(), True),
    StructField("date_id", TimestampType(), True),
    StructField("rider_id", IntegerType(), True),
    StructField("amount", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
df_fact_payments = spark.createDataFrame([], schema)

### Merge df_silver_payments with calendar_df

In [0]:
df_fact_payments_batch = df_silver_payments \
    .join(calendar_df, df_silver_payments.date == calendar_df.DateTime, "inner") \
    .select('payment_id', 'DateTime', 'rider_id', 'amount' )

In [0]:
df_fact_payments_batch.display()
df_fact_payments_batch.printSchema()

payment_id,DateTime,rider_id,amount
71226,2013-02-01T00:00:00.000+0000,3958,7.24
221075,2013-03-01T00:00:00.000+0000,9517,9.0
163370,2013-03-01T00:00:00.000+0000,7263,9.0
162760,2013-03-01T00:00:00.000+0000,7243,9.0
128879,2013-03-01T00:00:00.000+0000,6050,9.0
105007,2013-03-01T00:00:00.000+0000,5168,9.0
77091,2013-03-01T00:00:00.000+0000,4160,9.0
72973,2013-03-01T00:00:00.000+0000,4023,9.0
71227,2013-03-01T00:00:00.000+0000,3958,7.24
47127,2013-03-01T00:00:00.000+0000,3066,9.0


root
 |-- payment_id: integer (nullable = true)
 |-- DateTime: timestamp (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)



In [0]:
df_fact_payments_batch = df_fact_payments_batch.withColumnRenamed('DateTime', 'date_id')
multiply_by_100 = udf(lambda x: int(x * 100))
df_fact_payments_batch = df_fact_payments_batch.withColumn("amount", multiply_by_100("amount"))
df_fact_payments_batch = df_fact_payments_batch.withColumn("amount", col("amount").cast("integer"))

### Concatenate df_fact_payments_batch with df_fact_payments

In [0]:
df_fact_payments = df_fact_payments.union(df_fact_payments_batch)

In [0]:
# Set the station_key column as the DataFrame index
df_fact_payments = df_fact_payments.orderBy('payment_id')
df_fact_payments = df_fact_payments.dropDuplicates(['payment_id'])
df_fact_payments = df_fact_payments.repartition('payment_id')

### Write To Table

In [0]:
df_fact_payments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_factPayments")