Creating gold database in mounted ADLS container bikesharedata

In [0]:
spark.sql('create database if not exists gold_db location "/mnt/bikesharestorage/bikesharedata/gold"')

Table payments_fact

In [0]:
spark.sql("""select payment_id, 
rider_id, 
amount 
from bronze_db.payments_bronze""").write.format('delta').mode('overwrite').saveAsTable('gold_db.payments_fact')

In [0]:
spark.sql('select * from gold_db.payments_fact').show()

Out[9]: DataFrame[payment_id: int, rider_id: int, amount: float]

Table stations_dimn

In [0]:
spark.sql("""select station_id, 
name, 
latitude, 
longitude 
from bronze_db.stations_bronze""").write.format('delta').mode('overwrite').saveAsTable('gold_db.stations_dimn')

In [0]:
spark.sql('select * from gold_db.stations_dimn').show()

Out[15]: DataFrame[station_id: string, name: string, latitude: float, longitude: float]

Table trips_fact

In [0]:
spark.sql("""
select t.trip_id,
 s.station_id, 
 t.start_station_id, 
 t.end_station_id, 
 t.rider_id,
(minute(t.ended_at) - minute(t.started_at) + 60) + (((hour(t.ended_at) - hour(t.started_at))- 1)*60) as trip_duration_in_mins,
datediff(year, r.birthday, t.started_at) as rider_age_at_trip_start
from bronze_db.trips_bronze t
join bronze_db.stations_bronze s
on s.station_id = t.start_station_id
and s.station_id = t.end_station_id
JOIN bronze_db.riders_bronze r
on t.rider_id = r.rider_id
""").write.format('delta').mode('overwrite').saveAsTable('gold_db.trips_fact')

In [0]:
spark.sql('select * from gold_db.trips_fact').show()

Out[20]: DataFrame[trip_id: string, station_id: string, start_station_id: string, end_station_id: string, rider_id: int, trip_duration_in_mins: int, rider_age_at_trip_start: bigint]

Table riders_dimn

In [0]:
spark.sql("""select r.rider_id,
r.first,
r.last, 
r.address, 
r.is_member, 
t.rideable_type 
from bronze_db.riders_bronze r 
JOIN bronze_db.trips_bronze t 
on t.rider_id = r.rider_id""").write.format('delta').mode('overwrite').saveAsTable('gold_db.riders_dimn')

In [0]:
spark.sql('select * from gold_db.riders_dimn').show()

Out[25]: DataFrame[rider_id: int, first: string, last: string, address: string, is_member: boolean, rideable_type: string]

Table pyments_date_dimn

In [0]:
spark.sql("""
select p.payment_id as payment_date_key,
p.payment_date,
year(p.payment_date) as year,
month(p.payment_date) as month,
quarter(p.payment_date) as quarter,
r.account_start_date,
r.account_end_date
FROM bronze_db.payments_bronze p
JOIN bronze_db.riders_bronze r
on p.rider_id = r.rider_id
""").write.format('delta').mode('overwrite').saveAsTable('gold_db.payments_date_dimn')


In [0]:
spark.sql('select * from gold_db.payments_date_dimn').show()

Out[28]: DataFrame[payment_date_key: int, payment_date: date, year: int, month: int, quarter: int, account_start_date: date, account_end_date: date]

Table trips_date_dimn

In [0]:
spark.sql("""
SELECT trip_id as trip_date_key, 
started_at as trip_start_date, 
ended_at as trip_end_date, 
weekday(started_at) as day_of_week, 
hour(started_at) as trip_start_hr, 
minute(started_at) as trip_start_min, 
hour(ended_at) as trip_end_hr, 
minute(ended_at) as trip_end_min 
from bronze_db.trips_bronze
""").write.format('delta').mode('overwrite').saveAsTable('gold_db.trips_date_dimn')

In [0]:

spark.sql('select * from gold_db.trips_date_dimn').show()

Out[32]: DataFrame[trip_date_key: string, trip_start_date: timestamp, trip_end_date: timestamp, day_of_week: int, trip_start_hr: int, trip_start_min: int, trip_end_hr: int, trip_end_min: int]