In [None]:
#Create PySpark dataframes from the CSV file data from /FileStore/tables/udacity
payments_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
.load("/FileStore/tables/udacity/payments.csv")

riders_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
.load("/FileStore/tables/udacity/riders.csv")

stations_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
.load("/FileStore/tables/udacity/stations.csv")

trips_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
.load("/FileStore/tables/udacity/trips.csv")

payments_df.printSchema()
payments_df.show(2)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

+---+----------+---+----+
|_c0|       _c1|_c2| _c3|
+---+----------+---+----+
|  1|2019-05-01|9.0|1000|
|  2|2019-06-01|9.0|1000|
+---+----------+---+----+
only showing top 2 rows



In [None]:
# Save the imported CSV file on DBFS into the Delta Lake space as a per default Parquet file
payments_df.write.format("delta") \
    .mode("overwrite") \
.save("/delta/bronze_payments")

riders_df.write.format("delta") \
    .mode("overwrite") \
.save("/delta/bronze_riders")

stations_df.write.format("delta") \
    .mode("overwrite") \
.save("/delta/bronze_stations")

trips_df.write.format("delta") \
    .mode("overwrite") \
.save("/delta/bronze_trips")



In [None]:
# Create the BRONZE Delta Lake Table metadata for a database from the Parquet files in the Databricks File System
spark.sql("CREATE TABLE IF NOT EXISTS bronze_payments_tbl USING DELTA LOCATION '/delta/bronze_payments'")
spark.sql("CREATE TABLE IF NOT EXISTS bronze_riders_tbl USING DELTA LOCATION '/delta/bronze_riders'")
spark.sql("CREATE TABLE IF NOT EXISTS bronze_stations_tbl USING DELTA LOCATION '/delta/bronze_stations'")
spark.sql("CREATE TABLE IF NOT EXISTS bronze_trips_tbl USING DELTA LOCATION '/delta/bronze_trips'")


DataFrame[]

In [None]:
#### OPTIONAL ####
# DROP the SILVER tables if not wanted anymore
spark.sql("DROP TABLE IF EXISTS silver_payments_tbl")
spark.sql("DROP TABLE IF EXISTS silver_riders_tbl")
spark.sql("DROP TABLE IF EXISTS silver_stations_tbl")
spark.sql("DROP TABLE IF EXISTS silver_trips_tbl")


DataFrame[]

In [None]:
# List databases in the Catalog
spark.catalog.listDatabases()
spark.catalog.listTables()

[Table(name='bronze_payments_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_riders_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_stations_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_trips_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='gold_station_dims_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

In [None]:
# Read and show Bronze tables SCHEMAs
spark.sql("DESCRIBE TABLE bronze_payments_tbl").show()
spark.sql("DESCRIBE TABLE bronze_riders_tbl").show()
spark.sql("DESCRIBE TABLE bronze_stations_tbl").show()
spark.sql("DESCRIBE TABLE bronze_trips_tbl").show()


# Read and show bronze tables CONTECT
spark.read.table("bronze_payments_tbl").show(3)
spark.read.table("bronze_riders_tbl").show(3)
spark.read.table("bronze_stations_tbl").show(3)
spark.read.table("bronze_trips_tbl").show(3)




+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|     _c0|   string|   NULL|
|     _c1|   string|   NULL|
|     _c2|   string|   NULL|
|     _c3|   string|   NULL|
+--------+---------+-------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|     _c0|   string|   NULL|
|     _c1|   string|   NULL|
|     _c2|   string|   NULL|
|     _c3|   string|   NULL|
|     _c4|   string|   NULL|
|     _c5|   string|   NULL|
|     _c6|   string|   NULL|
|     _c7|   string|   NULL|
+--------+---------+-------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|     _c0|   string|   NULL|
|     _c1|   string|   NULL|
|     _c2|   string|   NULL|
|     _c3|   string|   NULL|
+--------+---------+-------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|     _c0|   string|   NULL|
|     _c1|   string|   NULL|
|     _c2|   string|   NULL|
|     _c3| 

In [None]:
# Exercise to develop a StructType Schema for databases, to make first column values not nulable. 
# Will be used with createDataFrame function below.
# The PySpark DataTypes must be imported
from pyspark.sql.types import * 

# Define the structures for the RDD dataframe Schema 
payments_schema = StructType([ \
    StructField('payment_id', IntegerType(), nullable=False), \
    StructField('date', DateType(), nullable=True), \
    StructField('amount', DecimalType(), nullable=True), \
    StructField('rider_id', IntegerType(), nullable=True) \
]) 

riders_schema = StructType([ \
    StructField('rider_id', IntegerType(), nullable=False), \
    StructField('first', StringType(), nullable=True), \
    StructField('last', StringType(), nullable=True), \
    StructField('address', StringType(), nullable=True), \
    StructField('birthday', DateType(), nullable=True), \
    StructField('account_start_date', DateType(), nullable=True), \
    StructField('account_end_date', DateType(), nullable=True), \
    StructField('is_member', BooleanType(), nullable=True) \
]) 

stations_schema = StructType([ \
    StructField('station_id', StringType(), nullable=False), \
    StructField('name', StringType(), nullable=True), \
    StructField('latitude', FloatType(), nullable=True), \
    StructField('longitude', FloatType(), nullable=True) \
]) 

trips_schema = StructType([ \
    StructField('trip_id', StringType(), nullable=False), \
    StructField('rideable_type', StringType(), nullable=True), \
    StructField('started_at', TimestampNTZType(), nullable=True), \
    StructField('ended_at', TimestampNTZType(), nullable=True), \
    StructField('start_station_id', IntegerType(), nullable=True), \
    StructField('end_station_id', IntegerType(), nullable=True), \
    StructField('rider_id', IntegerType(), nullable=True) \
])



In [None]:
##### Create SILVER PySpark Dataframe #####
#### silver_payments_df ####
# Import required PySpark functions
from pyspark.sql.functions import col
from pyspark.sql.types import *

# Create silver_payments_df PySpark dataframes with data from BRONZE Delta Lake Parquet Databricks File System
silver_payments_df = spark.read.format("delta").load("/delta/bronze_payments")

silver_payments_df.printSchema()

# Rename and CAST Dataframe Columns
silver_payments_df = silver_payments_df.withColumn("payment_id",col("_c0").cast(IntegerType()))
silver_payments_df = silver_payments_df.withColumn("date", col("_c1").cast(DateType()))
silver_payments_df = silver_payments_df.withColumn("amount", col("_c2").cast(DecimalType()))
silver_payments_df = silver_payments_df.withColumn("rider_id", col("_c3").cast(IntegerType()))

# Drop the original columns
silver_payments_df = silver_payments_df.drop("_c0")
silver_payments_df = silver_payments_df.drop("_c1")
silver_payments_df = silver_payments_df.drop("_c2")
silver_payments_df = silver_payments_df.drop("_c3")
                                                   
# Display Results
silver_payments_df.show(3)
silver_payments_df.printSchema()



root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|    539256|2020-08-01|     9|   21826|
|    539257|2020-09-01|     9|   21826|
|    539258|2020-10-01|     9|   21826|
+----------+----------+------+--------+
only showing top 3 rows

root
 |-- payment_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: decimal(10,0) (nullable = true)
 |-- rider_id: integer (nullable = true)



In [None]:
##### Create SILVER PySpark Dataframe #####
#### silver_riders_df ####
# Import required PySpark functions
from pyspark.sql.functions import col

# Create silver_riders_df PySpark dataframes with data from Delta Lake Parquet Databricks File System
silver_riders_df = spark.read.format("delta").load("/delta/bronze_riders")

# Rename and CAST Dataframe Columns
silver_riders_df = silver_riders_df.withColumn("rider_id", col("_c0").cast(IntegerType()))
silver_riders_df = silver_riders_df.withColumn("first", col("_c1").cast(VarcharType(256)))
silver_riders_df = silver_riders_df.withColumn("last", col("_c2").cast(VarcharType(256)))
silver_riders_df = silver_riders_df.withColumn("address", col("_c3").cast(VarcharType(256)))
silver_riders_df = silver_riders_df.withColumn("birthday", col("_c4").cast(DateType()))
silver_riders_df = silver_riders_df.withColumn("account_start_date", col("_c5").cast(DateType()))
silver_riders_df = silver_riders_df.withColumn("account_end_date", col("_c6").cast(DateType()))
silver_riders_df = silver_riders_df.withColumn("is_member", col("_c7").cast(BooleanType()))

# Drop the original columns
silver_riders_df = silver_riders_df.drop("_c0")
silver_riders_df = silver_riders_df.drop("_c1")
silver_riders_df = silver_riders_df.drop("_c2")
silver_riders_df = silver_riders_df.drop("_c3")
silver_riders_df = silver_riders_df.drop("_c4")
silver_riders_df = silver_riders_df.drop("_c5")
silver_riders_df = silver_riders_df.drop("_c6")
silver_riders_df = silver_riders_df.drop("_c7")

# Display Results
silver_riders_df.show(3)
silver_riders_df.printSchema()


      

+--------+-----+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|first|     last|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+-----+---------+--------------------+----------+------------------+----------------+---------+
|   57257| Mark|Mcfarland|   9928 Hunter Ranch|1982-02-01|        2020-12-05|            NULL|    false|
|   57258| Mark|    Davis|20036 Barrett Sum...|1963-07-28|        2017-07-12|            NULL|     true|
|   57259|Bryan|  Manning|    089 Sarah Square|1984-11-05|        2018-08-10|            NULL|     true|
+--------+-----+---------+--------------------+----------+------------------+----------------+---------+
only showing top 3 rows

root
 |-- rider_id: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = tr

In [None]:
##### Create SILVER PySpark Dataframe #####
#### silver_stations_df ####
# Import required PySpark functions
from pyspark.sql.functions import col

# Create silver_stations_df PySpark dataframes with data from Delta Lake Parquet Databricks File System
silver_stations_df = spark.read.format("delta").load("/delta/bronze_stations")

# Rename and CAST Dataframe Columns
silver_stations_df = silver_stations_df.withColumn("station_id", col("_c0").cast(VarcharType(256)))
silver_stations_df = silver_stations_df.withColumn("name", col("_c1").cast(VarcharType(256)))
silver_stations_df = silver_stations_df.withColumn("latitude", col("_c2").cast(FloatType()))
silver_stations_df = silver_stations_df.withColumn("longitude", col("_c3").cast(FloatType()))

# Drop the original columns
silver_stations_df = silver_stations_df.drop("_c0")
silver_stations_df = silver_stations_df.drop("_c1")
silver_stations_df = silver_stations_df.drop("_c2")
silver_stations_df = silver_stations_df.drop("_c3")

# Display Results
silver_stations_df.show(3)
silver_stations_df.printSchema()




+------------+--------------------+---------+----------+
|  station_id|                name| latitude| longitude|
+------------+--------------------+---------+----------+
|         525|Glenwood Ave & To...|  42.0127| -87.66606|
|KA1503000012|  Clark St & Lake St|41.885796|  -87.6311|
|         637|Wood St & Chicago...|41.895634|-87.672066|
+------------+--------------------+---------+----------+
only showing top 3 rows

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)



In [None]:
##### Create SILVER PySpark Dataframe #####
#### silver_trips_df ####
# Import required PySpark functions
from pyspark.sql.functions import col

# Create silver_stations_df PySpark dataframes with data from Delta Lake Parquet Databricks File System
silver_trips_df = spark.read.format("delta").load("/delta/bronze_trips")

# Rename and CAST Dataframe Columns
silver_trips_df = silver_trips_df.withColumn("trip_id", col("_c0").cast(VarcharType(256)))
silver_trips_df = silver_trips_df.withColumn("rideable_type", col("_c1").cast(StringType()))
silver_trips_df = silver_trips_df.withColumn("started_at", col("_c2").cast(TimestampNTZType()))
silver_trips_df = silver_trips_df.withColumn("ended_at", col("_c3").cast(TimestampNTZType()))
silver_trips_df = silver_trips_df.withColumn("start_station_id", col("_c4").cast(IntegerType()))
silver_trips_df = silver_trips_df.withColumn("end_station_id", col("_c5").cast(IntegerType()))
silver_trips_df = silver_trips_df.withColumn("rider_id", col("_c6").cast(IntegerType()))

# Drop the original columns
silver_trips_df = silver_trips_df.drop("_c0")
silver_trips_df = silver_trips_df.drop("_c1")
silver_trips_df = silver_trips_df.drop("_c2")
silver_trips_df = silver_trips_df.drop("_c3")
silver_trips_df = silver_trips_df.drop("_c4")
silver_trips_df = silver_trips_df.drop("_c5")
silver_trips_df = silver_trips_df.drop("_c6")

# Display Results
silver_trips_df.show(3)
silver_trips_df.printSchema()


+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|7E1E50AC37E2DAD3| classic_bike|2021-08-14 14:01:36|2021-08-14 14:34:49|            NULL|         13089|    2644|
|ADFF32195521E952| classic_bike|2021-08-29 16:16:36|2021-08-29 16:24:43|           13288|          NULL|   37747|
|7C59843DB8D13CC7|electric_bike|2021-08-27 11:06:34|2021-08-27 11:12:52|            NULL|          NULL|   63224|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
only showing top 3 rows

root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp_ntz (nullable = true)
 |-- ended_at: timestamp_ntz (nullable 

In [None]:
# Use createDataFrame to make first Column Values NOT NULLABLE
silver_payments_df = spark.createDataFrame(silver_payments_df.rdd, payments_schema)
silver_payments_df.printSchema()

silver_riders_df = spark.createDataFrame(silver_riders_df.rdd, riders_schema)
silver_riders_df.printSchema()

silver_stations_df = spark.createDataFrame(silver_stations_df.rdd, stations_schema)
silver_stations_df.printSchema()

silver_trips_df = spark.createDataFrame(silver_trips_df.rdd, trips_schema)
silver_trips_df.printSchema()

root
 |-- payment_id: integer (nullable = false)
 |-- date: date (nullable = true)
 |-- amount: decimal(10,0) (nullable = true)
 |-- rider_id: integer (nullable = true)

root
 |-- rider_id: integer (nullable = false)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = true)

root
 |-- station_id: string (nullable = false)
 |-- name: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)

root
 |-- trip_id: string (nullable = false)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp_ntz (nullable = true)
 |-- ended_at: timestamp_ntz (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- rider_id: integer (nullable = true)



In [None]:
##### Create SILVER Tables #####
# Create the SILVER tables in the Delta Lake Lakehouse Structure from dataframes
silver_payments_df.write.format("delta").mode("overwrite").saveAsTable("silver_payments_tbl")
spark.read.table("silver_payments_tbl").printSchema()
spark.read.table("silver_payments_tbl").show(3)

silver_riders_df.write.format("delta").mode("overwrite").saveAsTable("silver_riders_tbl")
spark.read.table("silver_riders_tbl").printSchema()
spark.read.table("silver_riders_tbl").show(3)

silver_stations_df.write.format("delta").mode("overwrite").saveAsTable("silver_stations_tbl")
spark.read.table("silver_stations_tbl").printSchema()
spark.read.table("silver_stations_tbl").show(3)

silver_trips_df.write.format("delta").mode("overwrite").saveAsTable("silver_trips_tbl")
spark.read.table("silver_trips_tbl").printSchema()
spark.read.table("silver_trips_tbl").show(3)



root
 |-- payment_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: decimal(10,0) (nullable = true)
 |-- rider_id: integer (nullable = true)

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|   1574726|2021-02-01|     9|   61831|
|   1574727|2021-03-01|     9|   61831|
|   1574728|2021-04-01|     9|   61831|
+----------+----------+------+--------+
only showing top 3 rows

root
 |-- rider_id: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = true)

+--------+-----+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|first|     last|             address|  birthday|account_start_date|account_end

In [None]:
# List databases in the Catalog
spark.catalog.listDatabases()
spark.catalog.listTables()

[Table(name='bronze_payments_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_riders_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_stations_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='bronze_trips_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='gold_pament_facts_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='gold_rider_dims_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='gold_station_dims_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Ta

In [None]:
##### Create GOLD Dimension Dataframe #####
# Building Gold date_dims table

gold_date_dims_df = spark.sql("""
SELECT
    DISTINCT t.started_at AS date_id,
    EXTRACT(YEAR FROM t.started_at) AS year,
	EXTRACT(QUARTER FROM t.started_at) AS quarter,
	EXTRACT(MONTH FROM t.started_at) AS month,
	EXTRACT(WEEK FROM t.started_at) AS week,
	EXTRACT(DAY FROM t.started_at) AS day,
	CASE WHEN dayofweek(t.started_at) IN (6, 7) THEN true ELSE false END AS is_weekend,
	EXTRACT(HOUR FROM t.started_at) AS hour
FROM default.silver_trips_tbl AS t

UNION

SELECT 
    DISTINCT t.ended_at AS date_id,
    EXTRACT(YEAR FROM t.ended_at) AS year,
	EXTRACT(QUARTER FROM t.ended_at) AS quarter,
	EXTRACT(MONTH FROM t.ended_at) AS month,
	EXTRACT(WEEK FROM t.ended_at) AS week,
	EXTRACT(DAY FROM t.ended_at) AS day,
	CASE WHEN dayofweek(t.ended_at) IN (6, 7) THEN true ELSE false END AS is_weekend,
	EXTRACT(HOUR FROM t.ended_at) AS hour
FROM default.silver_trips_tbl AS t

UNION

SELECT 
    DISTINCT p.date AS date_id,
    EXTRACT(YEAR FROM p.date) AS year,
	EXTRACT(QUARTER FROM p.date) AS quarter,
	EXTRACT(MONTH FROM p.date) AS month,
	EXTRACT(WEEK FROM p.date) AS week,
	EXTRACT(DAY FROM p.date) AS day,
	CASE WHEN dayofweek(p.date) IN (6, 7) THEN true ELSE false END AS is_weekend,
	NULL AS hour
FROM default.silver_payments_tbl AS p;
""")


gold_date_dims_df.printSchema()
gold_date_dims_df.show(5)

##### Create GOLD Dimension Table #####                   
gold_date_dims_df.write.format("delta").mode("overwrite").saveAsTable("gold_date_dims_tbl")

spark.read.table("gold_date_dims_tbl").printSchema()
spark.read.table("gold_date_dims_tbl").show(3)



root
 |-- date_id: timestamp_ntz (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- is_weekend: boolean (nullable = false)
 |-- hour: integer (nullable = true)

+-------------------+----+-------+-----+----+---+----------+----+
|            date_id|year|quarter|month|week|day|is_weekend|hour|
+-------------------+----+-------+-----+----+---+----------+----+
|2021-02-10 17:44:28|2021|      1|    2|   6| 10|     false|  17|
|2021-02-28 21:09:14|2021|      1|    2|   8| 28|     false|  21|
|2021-02-28 13:56:01|2021|      1|    2|   8| 28|     false|  13|
|2021-02-05 10:45:16|2021|      1|    2|   5|  5|      true|  10|
|2021-02-03 16:32:44|2021|      1|    2|   5|  3|     false|  16|
+-------------------+----+-------+-----+----+---+----------+----+
only showing top 5 rows

root
 |-- date_id: timestamp_ntz (nullable = true)
 |-- year

In [None]:
##### Create GOLD Dimension Dataframe #####
# Building Gold station_dims table

gold_station_dims_df = spark.sql("""SELECT s.station_id, s.name 
FROM default.silver_stations_tbl AS s""")

gold_station_dims_df.printSchema()
gold_station_dims_df.show(5)

##### Create GOLD Dimension Table #####
gold_station_dims_df.write.format("delta").mode("overwrite").saveAsTable("default.gold_station_dims_tbl")

spark.read.table("gold_station_dims_tbl").printSchema()
spark.read.table("gold_station_dims_tbl").show(3)



root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)

+------------+--------------------+
|  station_id|                name|
+------------+--------------------+
|         525|Glenwood Ave & To...|
|KA1503000012|  Clark St & Lake St|
|         637|Wood St & Chicago...|
|       13216|  State St & 33rd St|
|       18003|Fairbanks St & Su...|
+------------+--------------------+
only showing top 5 rows

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)

+------------+--------------------+
|  station_id|                name|
+------------+--------------------+
|         525|Glenwood Ave & To...|
|KA1503000012|  Clark St & Lake St|
|         637|Wood St & Chicago...|
+------------+--------------------+
only showing top 3 rows



In [None]:
##### Create GOLD Dimension Dataframe #####
# Building Gold rider_dims table

gold_rider_dims_df = spark.sql("""
SELECT 
r.rider_id, r.first, r.last, r.address, r.birthday, r.is_member, r.account_start_date, r.account_end_date, 
DATEDIFF(year, r.account_start_date, r.birthday) AS age_at_acc_start
FROM default.silver_riders_tbl AS r
""")

gold_rider_dims_df.printSchema()
gold_rider_dims_df.show()

##### Create GOLD Dimension Table #####
gold_rider_dims_df.write.format("delta").mode("overwrite").saveAsTable("default.gold_rider_dims_tbl")

spark.read.table("gold_rider_dims_tbl").printSchema()
spark.read.table("gold_rider_dims_tbl").show(3)



root
 |-- rider_id: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- is_member: boolean (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- age_at_acc_start: long (nullable = true)

+--------+-----------+---------+--------------------+----------+---------+------------------+----------------+----------------+
|rider_id|      first|     last|             address|  birthday|is_member|account_start_date|account_end_date|age_at_acc_start|
+--------+-----------+---------+--------------------+----------+---------+------------------+----------------+----------------+
|    1000|      Diana|    Clark| 1200 Alyssa Squares|1989-02-13|     true|        2019-04-23|            NULL|             -30|
|    1001|   Jennifer|    Smith|     397 Diana Ferry|1976-08-10|     true|        2019-11-01|      2020-09-01

In [None]:
##### Create GOLD Dimension Dataframe #####
# Building Gold trip_facts table

gold_trip_facts_df = spark.sql("""
    SELECT 
    t.trip_id,
	t.started_at AS started_at_date_id,
	t.ended_at AS ended_at_date_id,
	t.start_station_id,
	t.end_station_id, 
	t.rider_id,
	DATEDIFF(YEAR, t.started_at, r.birthday) AS rider_age, 
	DATEDIFF(HOUR, t.ended_at, t.started_at) AS trip_duration
FROM default.silver_trips_tbl AS t 
JOIN default.silver_riders_tbl AS r ON t.rider_id = r.rider_id;
""")

gold_trip_facts_df.printSchema()
gold_trip_facts_df.show(5)

##### Create GOLD Dimension Table #####
gold_trip_facts_df.write.format("delta").mode("overwrite").saveAsTable("default.gold_trip_facts_tbl")

spark.read.table("gold_trip_facts_tbl").printSchema()
spark.read.table("gold_trip_facts_tbl").show(3)


root
 |-- trip_id: string (nullable = true)
 |-- started_at_date_id: timestamp_ntz (nullable = true)
 |-- ended_at_date_id: timestamp_ntz (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- rider_age: long (nullable = true)
 |-- trip_duration: long (nullable = true)

+----------------+-------------------+-------------------+----------------+--------------+--------+---------+-------------+
|         trip_id| started_at_date_id|   ended_at_date_id|start_station_id|end_station_id|rider_id|rider_age|trip_duration|
+----------------+-------------------+-------------------+----------------+--------------+--------+---------+-------------+
|89E7AA6C29227EFF|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|      -37|            0|
|0FEFDE2603568365|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|      -38|            0|
|E61

In [None]:
##### Create GOLD Dimension Dataframe #####
# Building Gold payment_facts table

gold_payment_facts_df = spark.sql("""
SELECT 
	p.payment_id,
	p.date AS date_id,
	p.rider_id,
	p.amount AS pay_amount
FROM default.silver_payments_tbl AS p
""")

gold_payment_facts_df.printSchema()
gold_payment_facts_df.show(5)

##### Create GOLD Dimension Table #####
gold_payment_facts_df.write.format("delta").mode("overwrite").saveAsTable("default.gold_payment_facts_tbl")

spark.read.table("gold_payment_facts_tbl").printSchema()
spark.read.table("gold_payment_facts_tbl").show(3)



root
 |-- payment_id: integer (nullable = true)
 |-- date_id: date (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- pay_amount: decimal(10,0) (nullable = true)

+----------+----------+--------+----------+
|payment_id|   date_id|rider_id|pay_amount|
+----------+----------+--------+----------+
|   1574726|2021-02-01|   61831|         9|
|   1574727|2021-03-01|   61831|         9|
|   1574728|2021-04-01|   61831|         9|
|   1574729|2021-05-01|   61831|         9|
|   1574730|2021-06-01|   61831|         9|
+----------+----------+--------+----------+
only showing top 5 rows

root
 |-- payment_id: integer (nullable = true)
 |-- date_id: date (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- pay_amount: decimal(10,0) (nullable = true)

+----------+----------+--------+----------+
|payment_id|   date_id|rider_id|pay_amount|
+----------+----------+--------+----------+
|    539256|2020-08-01|   21826|         9|
|    539257|2020-09-01|   21826|         9|
|    53