In [None]:
sc

In [None]:

spark

In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [None]:
# creating custom schema

schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("month", StringType(), True),
    StructField("day", IntegerType(), True),
    StructField("weekday", StringType(), True),
    StructField("hour", IntegerType(), True),
    StructField("atm_status", StringType(), True),
    StructField("atm_id", StringType(), True),
    StructField("atm_manufacturer", StringType(), True),
    StructField("atm_location", StringType(), True),
    StructField("atm_streetname", StringType(), True),
    StructField("atm_street_number", IntegerType(), True),
    StructField("atm_zipcode", IntegerType(), True),
    StructField("atm_lat", DoubleType(), True),
    StructField("atm_lon", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("card_type", StringType(), True),
    StructField("transaction_amount", IntegerType(), True),
    StructField("service", StringType(), True),
    StructField("message_code", StringType(), True),
    StructField("message_text", StringType(), True),
    StructField("weather_lat", DoubleType(), True),
    StructField("weather_lon", DoubleType(), True),
    StructField("weather_city_id", IntegerType(), True),
    StructField("weather_city_name", StringType(), True),
    StructField("temp", DoubleType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("wind_speed", IntegerType(), True),
    StructField("wind_deg", IntegerType(), True),
    StructField("rain_3h", DoubleType(), True),
    StructField("clouds_all", IntegerType(), True),
    StructField("weather_id", IntegerType(), True),
    StructField("weather_main", StringType(), True),
    StructField("weather_description", StringType(), True)
])

In [None]:
# reading the data from HDFS

df = spark.read \
    .format("csv") \
    .option("header", "false") \
    .schema(schema) \
    .load("data/ATM.csv", sep='|')


df.show(5)

In [None]:
# verify count of total records 
df.count()

In [None]:
df.rdd.getNumPartitions()

In [None]:
# check if the schema is correct
df.printSchema()

## Creating Location dimension

In [None]:
# select only distinct records in all dimension tables.
df_location = df.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()


In [None]:
df_location.show(5)

In [None]:
df_location.count()

In [None]:
df_location.select('*').show(5)

In [None]:
# creating a primary key 'location_id' based on row_number

window_spec = Window.partitionBy().orderBy(df_location['atm_location'])
df_location = df_location.select(row_number().over(window_spec).alias('location_id'), '*')


In [None]:
df_location.show(10)

In [None]:
df_location.count()

In [None]:
df_location.printSchema()

## Creating ATM dimension

In [None]:
# Also select the columns 'atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon' to join
# the table with location table using these columns to fetch the location_id

df_atm = df.select('atm_id','atm_manufacturer','atm_location','atm_streetname',
                   'atm_street_number', 'atm_zipcode','atm_lat','atm_lon').distinct()
df_atm.count()


In [None]:
df_atm.show(5)

In [None]:
# creating views for atm and location dimension to join them using sql and create foreign key location_id
# in atm table based on location table primary key

df_atm.createOrReplaceTempView('atm')
df_location.createOrReplaceTempView('loc')

In [None]:
# joining on atm_location to fetch the location_id from location dimension.
# doing left join to fetch the location_id for each atm_id
# joining is possible on atm_location as atm_location

df_atm = spark.sql(" select atm.atm_id, atm.atm_manufacturer, loc.location_id \
                     from atm left join loc on \
                     atm.atm_location = loc.atm_location and \
                     atm.atm_streetname = loc.atm_streetname and \
                     atm.atm_street_number = loc.atm_street_number and \
                     atm.atm_zipcode = loc.atm_zipcode and \
                     atm.atm_lat = loc.atm_lat and \
                     atm.atm_lon = loc.atm_lon ")

df_atm.show(10)

In [None]:
df_atm.count()

In [None]:
# atm_id in fact table should be atm_number as per the atm schema
# and atm_id in atm schema should be the primary key generated using row_number

window_spec = Window.partitionBy().orderBy(df_atm['atm_id'])
df_atm = df_atm.select(row_number().over(window_spec).alias('atm_id'),
                       col('atm_id').alias('atm_number'),
                       'atm_manufacturer', 'location_id')


In [None]:
df_atm.show(10)

In [None]:
df_atm.printSchema()

## Creating Date dimension

In [None]:
df_date = df.select("year","month","day","hour","weekday").distinct()
df_date.show(5)


In [None]:
df_date.count()


In [None]:
# If using Spark version > 3.0, then use this for proper conversion to timestamp

# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [None]:
# create the timestamp using year, month, day, hour columns
# date_id is the primary key generated using row_number

window_spec = Window.partitionBy().orderBy('year','month','day','hour')

df_date = df_date.select(row_number().over(window_spec).alias('date_id'),
                         to_timestamp(
                            concat_ws(
                                " ",
                                df["year"].cast("string"),  # Cast year to string
                                df["month"],
                                df["day"].cast("string"),   # Cast day to string
                                df["hour"].cast("string"),  # Cast hour to string
                                    ),
                    "yyyy MMMM dd HH" ).alias('full_date_time'),
                         '*')


In [None]:
df_date.show(5)

In [None]:
df_date.printSchema()

## Creating Card type dimension

In [None]:
df_card = df.select('card_type').distinct()
df_card.show(5)


In [None]:
df_card = df_card.select(row_number().over(Window.partitionBy().orderBy('card_type')).alias('card_type_id'), 'card_type')

df_card.show(5)

In [None]:
df_card.count()

In [None]:
df_card.printSchema()

## Creating fact table

In [None]:
# Creating all the views for all dimensions and fact table

df_location.createOrReplaceTempView('loc')
df_atm.createOrReplaceTempView('atm')
df_date.createOrReplaceTempView('date')
df_card.createOrReplaceTempView('card')
df.createOrReplaceTempView('fact')

In [None]:
# joining with card dimension

df_fact = spark.sql("select card.card_type_id, fact.* \
                    from fact left join card on fact.card_type = card.card_type")
df_fact.show(5)

In [None]:
# create a view for the fact table based on updated columns after previous join

df_fact.createOrReplaceTempView('fact')

In [None]:
# join with date dimension

df_fact = spark.sql("select date.date_id, fact.* from fact left join date on \
                    fact.year = date.year and \
                    fact.month = date.month and \
                    fact.day = date.day and \
                    fact.hour = date.hour and \
                    fact.weekday = date.weekday ")

df_fact.show(5)

In [None]:
df_fact.createOrReplaceTempView('fact')

In [None]:
# join with location dimension

df_fact = spark.sql("select loc.location_id weather_loc_id, fact.* \
                     from fact left join loc on \
                     fact.atm_location = loc.atm_location and \
                     fact.atm_streetname = loc.atm_streetname and \
                     fact.atm_street_number = loc.atm_street_number and \
                     fact.atm_zipcode = loc.atm_zipcode and \
                     fact.atm_lat = loc.atm_lat and \
                     fact.atm_lon = loc.atm_lon ")

df_fact.show(5)

In [None]:
df_fact.createOrReplaceTempView('fact')

In [None]:
# join with atm dimension
# note that atm_id of fact will be mapped to atm_number of atm dimension

df_fact = spark.sql("select atm.atm_id atm_id_dim, fact.* \
                    from fact left join atm on \
                    fact.atm_id = atm.atm_number and \
                    fact.atm_manufacturer = atm.atm_manufacturer and \
                    fact.weather_loc_id = atm.location_id ")

df_fact.show(5)


In [None]:
df_fact.createOrReplaceTempView('fact')

In [None]:
# selecting only the relevant columns in fact table

df_fact = df_fact.select(row_number().over(Window.partitionBy().orderBy('date_id','atm_id_dim','weather_loc_id','card_type_id')).alias('trans_id'),
                         col('atm_id_dim').alias('atm_id'),
                         'weather_loc_id','date_id', 'card_type_id', 'atm_status', 'currency','service', 'transaction_amount',
                         'message_code','message_text','rain_3h','clouds_all','weather_id','weather_main', 'weather_description'
                         )

df_fact.show(5)


In [None]:
df_fact.createOrReplaceTempView('fact')

In [None]:
df_fact.count()

In [None]:
df_fact.printSchema()

## Writing tables in S3

In [None]:
dim_atm_path = "s3://atm-data-model/DIM_ATM/" 

# Write atm dimension to S3 in CSV format
df_atm.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(dim_atm_path)

In [None]:
dim_date_path = "s3://atm-data-model/DIM_DATE/" 

# Write date dimension to S3 in CSV format
df_date.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(dim_date_path)

In [None]:
dim_card_path = "s3://atm-data-model/DIM_CARD_TYPE/" 

# Write card dimension to S3 in CSV format
df_card.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(dim_card_path)

In [None]:
dim_location_path = "s3://atm-data-model/DIM_LOCATION/" 

# Write location dimension to S3 in CSV format
df_location.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(dim_location_path)

In [None]:
fact_path = "s3://atm-data-model/FACT_ATM_TRANS/" 

# Write fact table to S3 in CSV format
df_fact.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(fact_path)