## Azure Databricks: Transform the data using Spark

By Selman Karaosmanoglu

In [0]:
from pyspark.sql import DataFrame

In [0]:
# User-defined functions
def info(df:DataFrame):
    display(df.pandas_api().info())
    display(df.pandas_api().head(2))
    display(df.printSchema())

### Payment Fact

In [0]:
# Read
df_payment_bronze = spark.table('payment_bronze')

In [0]:
info(df_payment_bronze)

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 1946607 entries, 0 to 1946606
Data columns (total 4 columns):
 #   Column      Non-Null Count    Dtype  
---  ------      --------------    -----  
 0   payment_id  1946607 non-null  int32  
 1   date        1946607 non-null  object 
 2   amount      1946607 non-null  float64
 3   rider_id    1946607 non-null  int32  
dtypes: float64(1), int32(2), object(1)

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000


root
 |-- payment_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- rider_id: integer (nullable = true)



In [0]:
# Rename column
df_payment_bronze = df_payment_bronze.withColumnRenamed("date", "payment_date")

In [0]:
# Create a Spark table with Delta format and overwrite mode
df_payment_bronze.write.format("delta").mode("overwrite").saveAsTable("fact_payment")

In [0]:
df_payment_gold = spark.table("fact_payment")

In [0]:
info(df_payment_gold)

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 1946607 entries, 0 to 1946606
Data columns (total 4 columns):
 #   Column        Non-Null Count    Dtype  
---  ------        --------------    -----  
 0   payment_id    1946607 non-null  int32  
 1   payment_date  1946607 non-null  object 
 2   amount        1946607 non-null  float64
 3   rider_id      1946607 non-null  int32  
dtypes: float64(1), int32(2), object(1)

payment_id,payment_date,amount,rider_id
1064462,2020-06-01,9.0,42106
1064463,2020-07-01,9.0,42106


root
 |-- payment_id: integer (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- rider_id: integer (nullable = true)



### Rider Dimension

In [0]:
# Read
df_rider_bronze = spark.table('rider_bronze')

In [0]:
info(df_rider_bronze)

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   rider_id            75000 non-null  int32 
 1   first_name          75000 non-null  object
 2   last_name           75000 non-null  object
 3   address             75000 non-null  object
 4   birthdate           75000 non-null  object
 5   account_start_date  75000 non-null  object
 6   account_end_date    14954 non-null  object
 7   is_member           75000 non-null  bool  
dtypes: bool(1), int32(1), object(6)

rider_id,first_name,last_name,address,birthdate,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True


root
 |-- rider_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = true)



In [0]:
df_rider_gold = df_rider_bronze.withColumnRenamed("birthdate", "birthday")

In [0]:
# Create a Spark table with Delta format and overwrite mode
df_rider_gold.write.format("delta").mode("overwrite").saveAsTable("dim_rider")

In [0]:
info(spark.table("dim_rider"))

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   rider_id            75000 non-null  int32 
 1   first_name          75000 non-null  object
 2   last_name           75000 non-null  object
 3   address             75000 non-null  object
 4   birthday            75000 non-null  object
 5   account_start_date  75000 non-null  object
 6   account_end_date    14954 non-null  object
 7   is_member           75000 non-null  bool  
dtypes: bool(1), int32(1), object(6)

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True


root
 |-- rider_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = true)



### Station Dimension

In [0]:
df_station_bronze = spark.table('station_bronze')

In [0]:
# Create dim_station
df_station_bronze.write.format("delta").mode("overwrite").saveAsTable("dim_station")

In [0]:
info(spark.table("dim_station"))

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 838 entries, 0 to 837
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   station_id  838 non-null    object 
 1   name        838 non-null    object 
 2   latitude    838 non-null    float64
 3   longitude   838 non-null    float64
dtypes: float64(2), object(2)

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



### Trip Fact

In [0]:
# Read
df_trip_bronze = spark.table('trip_bronze')

In [0]:
info(df_trip_bronze)

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 4584921 entries, 0 to 4584920
Data columns (total 7 columns):
 #   Column            Non-Null Count    Dtype         
---  ------            --------------    -----         
 0   trip_id           4584921 non-null  object        
 1   rideable_type     4584921 non-null  object        
 2   started_at        4584921 non-null  datetime64[ns]
 3   ended_at          4584921 non-null  datetime64[ns]
 4   start_station_id  4584921 non-null  object        
 5   end_station_id    4584921 non-null  object        
 6   rider_id          4584921 non-null  int32         
dtypes: datetime64[ns](2), int32(1), object(4)

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
F6F309843C09CAAC,classic_bike,2021-09-02T18:36:00Z,2021-09-02T18:48:43Z,TA1306000026,TA1307000041,70456
BD496FA19316E89C,classic_bike,2021-09-02T17:23:28Z,2021-09-02T17:32:43Z,TA1306000026,TA1309000019,24732


root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)



In [0]:
query = """
SELECT
  t.trip_id,
  t.rider_id,
  t.rideable_type,
  t.started_at,
  t.ended_at,
  t.start_station_id,
  t.end_station_id,
  CAST(FLOOR(DATEDIFF(SECOND, t.started_at, t.ended_at) / 60) AS INT) AS duration,
  CAST(FLOOR(MONTHS_BETWEEN(t.started_at, r.birthday) / 12) AS INT) AS rider_age
FROM
  trip_bronze t
JOIN
  dim_rider r
ON
  t.rider_id = r.rider_id;
"""
df_trip_gold = spark.sql(query)

In [0]:
# Create fact_trip
df_trip_gold.write.format("delta").mode("overwrite").saveAsTable("fact_trip")

In [0]:
info(spark.table('fact_trip'))

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 4584921 entries, 0 to 4584920
Data columns (total 9 columns):
 #   Column            Non-Null Count    Dtype         
---  ------            --------------    -----         
 0   trip_id           4584921 non-null  object        
 1   rider_id          4584921 non-null  int32         
 2   rideable_type     4584921 non-null  object        
 3   started_at        4584921 non-null  datetime64[ns]
 4   ended_at          4584921 non-null  datetime64[ns]
 5   start_station_id  4584921 non-null  object        
 6   end_station_id    4584921 non-null  object        
 7   duration          4584921 non-null  int32         
 8   rider_age         4584921 non-null  int32         
dtypes: datetime64[ns](2), int32(3), object(4)

trip_id,rider_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,duration,rider_age
222BB8E5059252D7,34062,classic_bike,2021-06-13T09:48:47Z,2021-06-13T10:07:23Z,KA1503000064,13021,18,30
1826E16CB5486018,5342,classic_bike,2021-06-21T22:59:13Z,2021-06-21T23:04:29Z,TA1306000010,13021,5,26


root
 |-- trip_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- rider_age: integer (nullable = true)



### Date Dimension

In [0]:
query="""
SELECT UUID() AS time_id,
  started_at AS date,
  dayofweek(started_at) day_of_week,
  DATEPART('DAY', started_at) day_of_month,
  DATEPART('WEEK', started_at) week_of_year,
  DATEPART('QUARTER', started_at) quarter,
  DATEPART('MONTH', started_at) month,
  DATEPART('YEAR', started_at) year
FROM trip_bronze
WHERE started_at IS NOT NULL
UNION
SELECT UUID() AS time_id,
  ended_at AS date,
  dayofweek(ended_at) day_of_week,
  DATEPART('DAY', ended_at) day_of_month,
  DATEPART('WEEK', ended_at) week_of_year,
  DATEPART('QUARTER', ended_at) quarter,
  DATEPART('MONTH', ended_at) month,
  DATEPART('YEAR', ended_at) year
FROM trip_bronze
WHERE ended_at IS NOT NULL
UNION
SELECT UUID() AS time_id,
  birthdate AS date,
  dayofweek(birthdate) day_of_week,
  DATEPART('DAY', birthdate) day_of_month,
  DATEPART('WEEK', birthdate) week_of_year,
  DATEPART('QUARTER', birthdate) quarter,
  DATEPART('MONTH', birthdate) month,
  DATEPART('YEAR', birthdate) year
FROM rider_bronze
WHERE birthdate IS NOT NULL
UNION
SELECT UUID() AS time_id,
  account_start_date AS date,
  dayofweek(account_start_date) day_of_week,
  DATEPART('DAY', account_start_date) day_of_month,
  DATEPART('WEEK', account_start_date) week_of_year,
  DATEPART('QUARTER', account_start_date) quarter,
  DATEPART('MONTH', account_start_date) month,
  DATEPART('YEAR', account_start_date) year
FROM rider_bronze
WHERE account_start_date IS NOT NULL
UNION
SELECT UUID() AS time_id,
  account_end_date AS date,
  dayofweek(account_end_date) day_of_week,
  DATEPART('DAY', account_end_date) day_of_month,
  DATEPART('WEEK', account_end_date) week_of_year,
  DATEPART('QUARTER', account_end_date) quarter,
  DATEPART('MONTH', account_end_date) month,
  DATEPART('YEAR', account_end_date) year
FROM rider_bronze
WHERE account_end_date IS NOT NULL
UNION
SELECT UUID() AS time_id,
  date,
  dayofweek(date) day_of_week,
  DATEPART('DAY', date) day_of_month,
  DATEPART('WEEK', date) week_of_year,
  DATEPART('QUARTER', date) quarter,
  DATEPART('MONTH', date) month,
  DATEPART('YEAR', date) year
FROM payment_bronze
WHERE date IS NOT NULL
"""

In [0]:
df_dim_date = spark.sql(query)

In [0]:
# Create dim_date
df_dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")

In [0]:
info(spark.table('dim_date'))

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 11281403 entries, 0 to 11281402
Data columns (total 8 columns):
 #   Column        Non-Null Count     Dtype         
---  ------        --------------     -----         
 0   time_id       11281403 non-null  object        
 1   date          11281403 non-null  datetime64[ns]
 2   day_of_week   11281403 non-null  int32         
 3   day_of_month  11281403 non-null  int32         
 4   week_of_year  11281403 non-null  int32         
 5   quarter       11281403 non-null  int32         
 6   month         11281403 non-null  int32         
 7   year          11281403 non-null  int32         
dtypes: datetime64[ns](1), int32(6), object(1)

time_id,date,day_of_week,day_of_month,week_of_year,quarter,month,year
9b7a9ebc-195a-4485-ad4c-83502953f95f,2021-06-09T23:07:47Z,4,9,23,2,6,2021
4990e300-2fc4-467d-b40a-c7d9c3d54ed9,2021-06-12T18:40:57Z,7,12,23,2,6,2021


root
 |-- time_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

