## Project 3: Building an Azure Data Lake for Bike Share Analytics

### Extract Step:

In [0]:
# write payments.csv from FileStore to the Delta File System

df_payments = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/tables/payments.csv")


df_payments.write.format("delta").save("/delta/bronze_payments")

In [0]:
# write riders.csv from FileStore to the Delta File System

df_riders = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/tables/riders.csv")


df_riders.write.format("delta").save("/delta/bronze_riders")

In [0]:
# write stations.csv from FileStore to the Delta File System

df_stations = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/tables/stations.csv")


df_stations.write.format("delta").save("/delta/bronze_stations")

In [0]:
# write trips.csv from FileStore to the Delta File System

df_trips = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/tables/trips.csv")


df_trips.write.format("delta").save("/delta/bronze_trips")

In [0]:
# write dimdates.csv from FileStore to the Delta File System

df_dimdates = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "true") \
    .option("sep", ";") \
    .load("/FileStore/tables/dimdates.csv")


df_dimdates.write.format("delta").save("/delta/bronze_dimdates")

### Load Step:

In [0]:
# create delta table for the payments data

spark.sql("""
    CREATE TABLE bronze_payments_deltaTable
    USING DELTA
    LOCATION '/delta/bronze_payments'
""")

DataFrame[]

In [0]:
# create delta table for riders data

spark.sql("""
    CREATE TABLE bronze_riders_deltaTable
    USING DELTA
    LOCATION '/delta/bronze_riders'
""")

DataFrame[]

In [0]:
# create delta table for stations data

spark.sql("""
    CREATE TABLE bronze_stations_deltaTable
    USING DELTA
    LOCATION '/delta/bronze_stations'
""")

DataFrame[]

In [0]:
# create delta table for trips data

spark.sql("""
    CREATE TABLE bronze_trips_deltaTable
    USING DELTA
    LOCATION '/delta/bronze_trips'
""")

DataFrame[]

In [0]:
# create delta table for date dimension

spark.sql("""
    CREATE TABLE bronze_dimdate_deltaTable
    USING DELTA
    LOCATION '/delta/bronze_dimdates'
""")

DataFrame[]

### Transform

In [0]:
# create golden delta table for date dimension

df = spark.sql("""
CREATE TABLE gold_dimDate_deltaTable AS
SELECT 
CAST( DateDimId AS BIGINT) AS DateDimId,
CAST( DateActual AS DATE) AS DateActual,
CAST( DateLongDescription AS STRING) AS DateLongDescription,
CAST( DayLongName AS STRING) AS DayLongName,
CAST( MonthLongName AS STRING) AS MonthLongName,
CAST( CalendarDay AS BIGINT) AS CalendarDay,
CAST( CalendarWeek AS BIGINT) AS CalendarWeek,
CAST( CalendarDayInWeek AS BIGINT) AS CalendarDayInWeek,
CAST( CalendarMonth AS BIGINT) AS CalendarMonth,
CAST( CalendarQuarter AS BIGINT) AS CalendarQuarter,
CAST( CalendarYear AS BIGINT) AS CalendarYear,
CAST( FiscalDayInWeek AS BIGINT) AS FiscalDayInWeek,
CAST( FiscalMonth AS BIGINT) AS FiscalMonth,
CAST( FiscalDayInMonth AS BIGINT) AS FiscalDayInMonth,
CAST( FiscalQuarter AS BIGINT) AS FiscalQuarter,
CAST( FiscalYear AS BIGINT) AS FiscalYear
FROM bronze_dimdate_deltaTable;
""")

In [0]:
# create golden delta table for rider dimension

df = spark.sql("""
CREATE TABLE gold_dimRiders_deltaTable AS
SELECT 
CAST( _c0 AS BIGINT) AS RiderId,
CAST( _c1 AS STRING) AS First,
CAST( _c2 AS STRING) AS Last,
CAST( _c3 AS STRING) AS Address,
CAST( _c4 AS DATE) AS Birthday,
CAST( _c5 AS DATE) AS AccountStartDate,
CAST( _c6 AS DATE) AS AccountEndDate,
CAST( _c7 AS BOOLEAN) AS Member
FROM bronze_riders_deltaTable;
""")

In [0]:
# create golden delta table for station dimension

df = spark.sql("""
CREATE TABLE gold_dimStations_deltaTable AS
SELECT 
CAST( _c0 AS STRING) AS StationId,
CAST( _c1 AS STRING) AS Name,
CAST( _c2 AS FLOAT) AS Latitude,
CAST( _c3 AS FLOAT) AS Longitude
FROM bronze_stations_deltaTable;
""")

In [0]:
# create golden delta table for payments fact

df = spark.sql("""
CREATE TABLE gold_factPayments_deltaTable AS
SELECT 
CAST( T1._c0 AS BIGINT) AS PaymentId,
CAST( T2.DateDimId AS BIGINT) AS DateDimId,
CAST( T1._c3 AS BIGINT) AS RiderId,
CAST( T1._c1 AS DATE) AS PaymentDate,
CAST( T1._c2 AS FLOAT) AS Amount
FROM bronze_payments_deltaTable AS T1
INNER JOIN gold_dimdate_deltatable AS T2
ON T2.DateActual = CAST( T1._c1 AS DATE);
""")

In [0]:
# create golden delta table for trips fact

df = spark.sql("""
CREATE TABLE gold_factTrips_deltaTable AS
SELECT 
CAST( T1._c0 AS STRING) AS TripId,
CAST( T2.DateDimId AS BIGINT) AS DateDimId,
CAST( T1._c6 AS BIGINT) AS RiderId,
CAST( T1._c1 AS STRING) AS RideableType,
CAST( T1._c4 AS STRING) AS StartStationId,
CAST( T1._c5 AS STRING) AS EndStationId,
CAST( T1._c2 AS TIMESTAMP) AS StartedAt,
CAST( T1._c3 AS TIMESTAMP) AS EndedAt,
DATEDIFF(MINUTE, CAST( T1._c2 AS TIMESTAMP), CAST( T1._c3 AS TIMESTAMP)) AS TripDurationInMinutes,
DATEDIFF(YEAR, T3.Birthday, CAST( T1._c2 AS DATE)) AS RiderAge
FROM bronze_trips_deltaTable AS T1
INNER JOIN gold_dimdate_deltatable AS T2
    ON T2.DateActual = CAST( T1._c2 AS DATE)
INNER JOIN gold_dimriders_deltatable AS T3
    ON T3.RiderId = CAST( T1._c6 AS BIGINT);
""")