In [96]:
df_dict = spark.read.format("csv").option("header","true").load("Files/bronze/dict.csv")

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 98, Finished, Available, Finished)

In [107]:
import pandas as pd

df_rider = spark.read.format("csv").option("header","true").load("Files/bronze/rider.csv")
df_rider = df_rider.toPandas()

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 109, Finished, Available, Finished)

In [98]:
value_vars = [col for col in df_rider.columns if col != 'Date']

df_rider_melted = pd.melt(
    df_rider,
    id_vars = ['Date'],
    value_vars = value_vars,
    var_name = 'tipo',
    value_name = 'valor'
)

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 100, Finished, Available, Finished)

In [99]:
df_rider_melted[['origem', 'categoria']] = df_rider_melted['tipo'].str.split(': ', expand=True)

df_rider_final = df_rider_melted.pivot_table(
    index=['Date', 'origem'],
    columns='categoria',
    values='valor',
    aggfunc='first'
).reset_index()

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 101, Finished, Available, Finished)

In [100]:
df_rider_final.columns.name = None
df_rider_final = df_rider_final.rename(columns={'origem': 'Category'})

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 102, Finished, Available, Finished)

In [101]:
df = df_rider_final
df['Date'] = df['Date'].astype('datetime64[ns]')
df['% of Comparable Pre-Pandemic Day'] = df['% of Comparable Pre-Pandemic Day'].astype('Int64')
df['% of Comparable Pre-Pandemic Day'] = df['% of Comparable Pre-Pandemic Day']/100
df.rename(columns={'% of Comparable Pre-Pandemic Day':'Pre Pandemic Day'}, inplace=True)
df['Total Estimated Ridership'] = df['Total Estimated Ridership'].astype('Int64')
df['Total Scheduled Trips'] = df['Total Scheduled Trips'].astype('Int64')
df['Total Traffic'] = df['Total Traffic'].astype('Int64')

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 103, Finished, Available, Finished)

In [102]:
from pyspark.sql.functions import *

df_category = df[["Category"]].drop_duplicates().reset_index(drop=True)
df_category["idCategory"] = range(1, len(df_category) + 1)
df_category = spark.createDataFrame(df_category)

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 104, Finished, Available, Finished)

In [103]:
from pyspark.sql.functions import col, dayofmonth, month, year, date_format, dayofweek, when

df_rider = spark.createDataFrame(df)

df_date = df_rider.dropDuplicates(["Date"]).select(col("Date"), \
          dayofmonth("Date").alias("Day"), \
          month("Date").alias("Month"), \
          year("Date").alias("Year"), \
          dayofweek("Date").alias("DayWeek"), \
          date_format(col("Date"), "yyyyMMdd").alias("OrderDate"), \
          date_format(col("Date"), "EEEE").alias("DayWeekName"), \
          date_format(col("Date"), "MMMM").alias("MonthName"), \
          date_format(col("Date"), "MMMM.yyyy").alias("MonthYear"), \
          date_format(col("Date"), "yyyy.MM").alias("YearMonth"), \
          concat(
                when(quarter("Date") <= 2, lit("1° Sem. ")).otherwise(lit("2° Sem. ")), 
                year("Date").cast("string")
                ).alias("Semester"), \
          concat(
                year("Date").cast("string"),
                when(quarter("Date") <= 2, lit(".01 Sem")).otherwise(lit(".02 Sem"))
                ).alias("YearSemester"), \
).orderBy("Date")

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 105, Finished, Available, Finished)

In [104]:
df_rider_2 = df_rider.join(df_category, on=['Category'], how='left') \
.select('Date', 'idCategory', 'Total Estimated Ridership', 'Total Scheduled Trips', 'Total Traffic', 'Pre Pandemic Day')

df_rider = (df_rider_2
            .withColumnRenamed('Total Estimated Ridership','TotalEstimatedRidership')
            .withColumnRenamed('Total Scheduled Trips','TotalScheduledTrips')
            .withColumnRenamed('Total Traffic','TotalTraffic')
            .withColumnRenamed('Pre Pandemic Day','PrePandemicDay'))

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 106, Finished, Available, Finished)

In [105]:
df_dict.write.mode("overwrite").format("delta").saveAsTable("dict")
df_rider.write.mode("overwrite").format("delta").saveAsTable("fact_rider")
df_date.write.mode("overwrite").format("delta").saveAsTable("dim_calendar")
df_category.write.mode("overwrite").format("delta").saveAsTable("dim_category")

StatementMeta(, c938a5b5-8031-4398-be8e-b28c239b1823, 107, Finished, Available, Finished)