## ETL Project: Project Statement
Spar Nord Bank is trying observe the withdrawal behavior and the corresponding dependent factors to optimally manage the refill frequency of a particular ATM as well as the overall effort required. Spar Nord Bank have to refill the ATMs when the money goes below a specific threshold limit. Cash refilling into ATMS depends on the activity , area where an ATM is located, weather, day of the week etc.

We have data from more than 100 ATMs across Denmark. Data is captured for every transaction including withdrawn amount,card type,location,date,time,ATM type etc.
Spar Nord Bank has built a Dimensional Model datastore (ATM Data Mart) on this ATM transaction data to understand the ATM Usage Pattern.

In [2]:
# starting Spark Application : Starts a Driver Program, which initiates the spark session
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fd5d5e94b50>

In [4]:
# import necessary SQL Datatypes, functions and Window operations for the transformations required
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,DoubleType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Defining the schema with necessary datatypes for the columns data as defined below  

fileschema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True), 
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True), 
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)
                        ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Reading the csv file data located in HDFS storage in the below path
ETL = spark.read.csv('hdfs:/user/root/SRC_ATM_TRANS/part-m-00000',schema = fileschema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### LOCATION DIMENSION Table Creation Steps

In [8]:
# Creating the Location Dimension Table using the ETL Dataframe
dim_loc = ETL.select(ETL.atm_location.alias('location'),
                    ETL.atm_streetname.alias('streetname'),
                    ETL.atm_street_number.alias('street_number'),
                    ETL.atm_zipcode.alias('zipcode'),
                    ETL.atm_lat.alias('lat'),
                    ETL.atm_lon.alias('lon')).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Creating the Primary Key for the DIM_LOCATION Table
DIM_LOCATION = dim_loc.select(row_number().over(Window.orderBy('zipcode')).alias('location_id'),'location','streetname','street_number','zipcode','lat','lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Count of number of records in DIM_LOCATION Table for validation
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

### CARD_TYPE DIMENSION Table Creation Steps

In [11]:
# Creating Card Type Dimension Table from ETL Dataframe
dim_card_type = ETL.select('card_type').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Creating Primary Key for the DIM_CARD_TYPE Table

DIM_CARD_TYPE = dim_card_type.select(row_number().over(Window.orderBy('card_type')).alias('card_type_id'),'card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

### ATM DIMENSION Table Creation Steps

In [16]:
# Creating ATM dimension table from ETL dataframe 
condition1 = [DIM_LOCATION.location == ETL.atm_location,
              DIM_LOCATION.streetname == ETL.atm_streetname,
              DIM_LOCATION.street_number == ETL.atm_street_number,
              DIM_LOCATION.zipcode == ETL.atm_zipcode,
              DIM_LOCATION.lat == ETL.atm_lat,
              DIM_LOCATION.lon == ETL.atm_lon]

dim_atm = DIM_LOCATION.join(ETL,condition1,'leftouter').select(DIM_LOCATION.location_id.alias('atm_location_id'),
       ETL.atm_manufacturer.alias('atm_manufacturer'),ETL.atm_id.alias('atm_number')).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Creating primary key for the ATM Dimension Table
DIM_ATM = dim_atm.select(row_number().over(Window.orderBy('atm_number')).alias('atm_id'),'atm_number','atm_manufacturer','atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Count of records in ATM DIMENSION table
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### DATE DIMENSION TABLE Creation Steps

In [20]:
# mapping month name to month number for the timestamp creation
dim_date = ETL.select('year','month','day','hour','weekday',expr("CASE WHEN (month = 'January') THEN '01' " +
                          "WHEN (month = 'February') THEN '02'" +
                          "WHEN (month = 'March') THEN '03'" +
                          "WHEN (month = 'April') THEN '04'" +
                          "WHEN (month = 'May') THEN '05'" +
                          "WHEN (month = 'June') THEN '06'" +
                          "WHEN (month = 'July') THEN '07'" +
                          "WHEN (month = 'August') THEN '08'" +
                          "WHEN (month = 'September') THEN '09'" +
                          "WHEN (month = 'October') THEN '10'" +
                          "WHEN (month = 'November') THEN '11'" +
                          "ELSE '12' END").alias('month_number'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Acheiving the date timestamp pattern by concatenating the necessary fields
dim_date1 = dim_date.select('*',concat('year',lit(':'),'month_number',lit(':'),'day',lit(' '),'hour',lit(':'),lit('00'),lit(':'),lit('00')).alias('date_time'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:

pattern = 'yyyy:MM:dd HH:mm:ss'
dim_date2 = dim_date1.withColumn('full_date_time',unix_timestamp('date_time',pattern).cast('timestamp')).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Creating the Date Dimension Table and date_id as Primary Key 
DIM_DATE = dim_date2.select(row_number().over(Window.orderBy('full_date_time')).alias('date_id'),'full_date_time','year','month','day','hour','weekday')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

### FACT_ATM_TRANS Fact Table Creation Steps

In [25]:
# 1st Transformation to get atm_id Foreign Key into Fact table : FACT_ATM_TRANS table
condition1 = [DIM_ATM.atm_number == ETL.atm_id]

fact_atm_trans1 = ETL.join(DIM_ATM,condition1,'leftouter').select(
    DIM_ATM.atm_id,'year','month','day','weekday','hour','atm_status', ETL.atm_manufacturer,'atm_location','atm_streetname','atm_street_number',
    'atm_zipcode','atm_lat','atm_lon','currency','card_type', 'transaction_amount','service','message_code',
    'message_text','weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure',
    'humidity','wind_speed','wind_deg','rain_3h', 'clouds_all','weather_id','weather_main','weather_description'
    ,'atm_number','atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# 2nd Transformation to get get date_id Foreign Key into Fact table : FACT_ATM_TRANS table
condition2 = [fact_atm_trans1.year == DIM_DATE.year,
              fact_atm_trans1.month == DIM_DATE.month,
              fact_atm_trans1.day == DIM_DATE.day,
              fact_atm_trans1.hour == DIM_DATE.hour,
              fact_atm_trans1.weekday == DIM_DATE.weekday]
fact_atm_trans2 =fact_atm_trans1.join(DIM_DATE,condition2,'leftouter').select(DIM_DATE.date_id,'atm_id',DIM_DATE.year,DIM_DATE.month,DIM_DATE.day,DIM_DATE.weekday,DIM_DATE.hour,'atm_status', ETL.atm_manufacturer,'atm_location','atm_streetname','atm_street_number',
    'atm_zipcode','atm_lat','atm_lon','currency','card_type', 'transaction_amount','service','message_code',
    'message_text','weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure',
    'humidity','wind_speed','wind_deg','rain_3h', 'clouds_all','weather_id','weather_main','weather_description'
    ,'atm_number','atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# 3rd Transformation to get get card_type_id Foreign Key into Fact table : FACT_ATM_TRANS table
condition3 = [fact_atm_trans2.card_type == DIM_CARD_TYPE.card_type]

fact_atm_trans3 = fact_atm_trans2.join(DIM_CARD_TYPE , condition3 , 'leftouter').select(DIM_CARD_TYPE.card_type_id,'date_id','atm_id','year','month','day','weekday','hour','atm_status','atm_manufacturer','atm_location','atm_streetname','atm_street_number',
    'atm_zipcode','atm_lat','atm_lon','currency',DIM_CARD_TYPE.card_type,'transaction_amount','service','message_code',
    'message_text','weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure',
    'humidity','wind_speed','wind_deg','rain_3h', 'clouds_all','weather_id','weather_main','weather_description'
    ,'atm_number','atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# 4th Transformation to get weather_Loc_id Foreign Key into Fact table : FACT_ATM_TRANS table
condition4 = [fact_atm_trans3.atm_location == DIM_LOCATION.location,
              fact_atm_trans3.atm_streetname == DIM_LOCATION.streetname,
              fact_atm_trans3.atm_street_number == DIM_LOCATION.street_number,
              fact_atm_trans3.atm_zipcode == DIM_LOCATION.zipcode,
              fact_atm_trans3.atm_lat == DIM_LOCATION.lat,
              fact_atm_trans3.atm_lon == DIM_LOCATION.lon]
fact_atm_trans4 = fact_atm_trans3.join(DIM_LOCATION,condition4,'leftouter').select(DIM_LOCATION.location_id.alias('weather_loc_id'),'card_type_id','date_id','atm_id','atm_status','currency','transaction_amount','service','message_code',
    'message_text','rain_3h', 'clouds_all','weather_id','weather_main','weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# Creating transaction id as primary key for the Fact Table : FACT_ATM_TRANS
FACT_ATM_TRANS = fact_atm_trans4.select(row_number().over(Window.orderBy('date_id')).alias('trans_id'),'atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code',
    'message_text','rain_3h', 'clouds_all','weather_id','weather_main','weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# Total Records in FACT_ATM_TRANS table
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Number of records validation in Fact tables before loading them into S3 storage for furthur processing

In [33]:
FACT_ATM_TRANS.select('date_id').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [34]:
FACT_ATM_TRANS.select('card_type_id').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [35]:
FACT_ATM_TRANS.select('atm_id').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [36]:
FACT_ATM_TRANS.select('weather_loc_id').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

### Load the Data into S3 storage bucket folder at the path 's3://etlvinods3bucket'

In [35]:
# write the DIM_DATE date into s3 storage at "s3://etlprojectdukavinods3/DIM_DATE"
DIM_DATE.coalesce(1).write.format("csv").option("header","false").mode("Overwrite").save("s3://etlprojectdukavinods3/DIM_DATE")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# write the DIM_LOCATION data into s3 storage at "s3://etlprojectdukavinods3/DIM_LOCATION" 
DIM_LOCATION.coalesce(1).write.format("csv").option("header","false").mode("Overwrite").save("s3://etlprojectdukavinods3/DIM_LOCATION")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# write the DIM_CARD_TYPE data into s3 storage at "s3://etlprojectdukavinods3/DIM_CARD_TYPE"
DIM_CARD_TYPE.coalesce(1).write.format("csv").option("header","false").mode("Overwrite").save("s3://etlprojectdukavinods3/DIM_CARD_TYPE")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# write the DIM_ATM data into s3 storage at "s3://etlprojectdukavinods3/DIM_ATM"
DIM_ATM.coalesce(1).write.format("csv").option("header","false").mode("Overwrite").save("s3://etlprojectdukavinods3/DIM_ATM")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# write the FACT_ATM_TRANS data into s3 storage at "s3://etlprojectdukavinods3/FACT_ATM_TRANS"
FACT_ATM_TRANS.coalesce(1).write.format("csv").option("header","false").mode("Overwrite").save("s3://etlprojectdukavinods3/FACT_ATM_TRANS")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…