In [1]:
from pyspark.sql import SparkSession

In [2]:
# Creating Spark session
spark = SparkSession.builder.appName('SparNordBank').master('local').getOrCreate()

In [3]:
sc =  spark.sparkContext

In [4]:
spark

In [5]:
# importing required libraries and types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, LongType, BooleanType

### Reading the data from the files in HDFS by a specific schema using PySpark

In [6]:
# Creating custom schema
custom_schema = StructType([
                        StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), nullable = True),
                        StructField('day', IntegerType(), nullable = True),
                        StructField('weekday', StringType(), nullable = True),
                        StructField('hour', IntegerType(), nullable = True),
                        StructField('atm_status', StringType(), nullable = True),
                        StructField('atm_id', StringType(), nullable = True),
                        StructField('atm_manufacturer', StringType(), nullable = True),
                        StructField('atm_location', StringType(), nullable = True),
                        StructField('atm_streetname', StringType(), nullable = True),
                        StructField('atm_street_number', IntegerType(), nullable = True),
                        StructField('atm_zipcode', IntegerType(), nullable = True),
                        StructField('atm_lat', DoubleType(), nullable = True),
                        StructField('atm_lon', DoubleType(), nullable = True),
                        StructField('currency', StringType(), nullable = True),
                        StructField('card_type', StringType(), nullable = True),
                        StructField('transaction_amount', IntegerType(), nullable = True),
                        StructField('service', StringType(), nullable = True),
                        StructField('message_code', StringType(), nullable = True),
                        StructField('message_text', StringType(), nullable = True),
                        StructField('weather_lat', DoubleType(), nullable = True),
                        StructField('weather_lon', DoubleType(), nullable = True),
                        StructField('weather_city_id', IntegerType(), nullable = True),
                        StructField('weather_city_name', StringType(), nullable = True),
                        StructField('temp', DoubleType(), nullable = True),
                        StructField('pressure', IntegerType(), nullable = True),
                        StructField('humidity', IntegerType(), nullable = True),
                        StructField('wind_speed', IntegerType(), nullable = True),
                        StructField('wind_deg', IntegerType(), nullable = True),
                        StructField('rain_3h', DoubleType(), nullable = True),
                        StructField('clouds_all', IntegerType(), nullable = True),
                        StructField('weather_id', IntegerType(), nullable = True),
                        StructField('weather_main', StringType(), nullable = True),
                        StructField('weather_description', StringType(), nullable = True)
                        ])

In [7]:
# Reading the dataset
etl_df = spark.read.csv('/user/root/ETL_Project_SPAR_NORD/part-m-00000' , header=False, schema = custom_schema)

In [8]:
# Validating the record count
etl_df.count()

1250001

In [9]:
# Checking the imported dataset
etl_df.show(1)

+----+-----+----+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+---------+------------------+------------+------------+------------+-----------+-----------+---------------+-----------------+----+--------+--------+----------+--------+-------+----------+----------+-------------------+-------------------+
|year|month| day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|card_type|transaction_amount|     service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|       weather_main|weather_description|
+----+-----+----+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+---------+------------------+------------+------------+------------+--------

In [10]:
# Checking the schema
etl_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [11]:
# Checking the columns
etl_df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

### Creation of 4 Dimensions & 1 Fact tables using PySpark

•	ATM dimension - This dimension will have the data related to the various ATMs present in the dataset along with the ATM number(ATM ID in the original dataset), ATM manufacturer and a reference to the ATM location and is very important for solving analytical queries related where ATM data will be used.

•	Location dimension - This is a very important dimension containing all the location data including location name, street name, street number, zip code and even the latitude and longitude. This information will be very important for solving problems related to the particular location at which a transaction took place and can help banks in things like pinpointing locations where ATMs where demand is higher as compared to other locations. Combined with weather data in the transaction table, this can be used to further do analysis such as how weather affects the demand at ATMs at a particular location.

•	Date dimension - This is another very important dimension which is almost always present where data such as transactional data is being dealt with. This dimension includes fields such as the full date and time timestamp, year, month, day, hour as well as the weekday for a transaction. This all can help in analysing the transaction behaviour with respect to the time at which the transaction took place and also how the transaction activity varies between weekdays and weekends.

•	Card type dimension - This dimension has the information about the particular card type with which a particular transaction took place. This can help in performing analysis on how the number of transactions varies with respect to each different card type.

•	Transaction fact - This is the actual fact table for the data set which contains all of the numerical data such as the currency of the transaction, service, transaction amount, message code and text as well as weather info such as description, weather id etc.


### *********************************************Location dimension*************************************************
location_id INT
location VARCHAR(50)
streetname VARCHAR(255)
street_number INT
zipcode INT
lat DECIMAL(10,3)
lon DECIMAL(10,3)

In [12]:
# Importing the required libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [13]:
# Creating the Location dimension
df_location = etl_df.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon')\
            .distinct()

In [14]:
# Validating the record count
df_location.count()

106

In [15]:
df_location.show(3)

+------------+--------------------+-----------------+-----------+-----------+-----------+
|atm_location|      atm_streetname|atm_street_number|atm_zipcode|    atm_lat|    atm_lon|
+------------+--------------------+-----------------+-----------+-----------+-----------+
|  Middelfart|             Brogade|                9|       5500|55.50686976| 9.72743835|
|     Randers|           Østervold|               16|       8900|56.46187753|10.03778506|
|    Sæby Syd|Trafikcenter Sæby...|                1|       9300|  57.313053| 10.4496415|
+------------+--------------------+-----------------+-----------+-----------+-----------+
only showing top 3 rows



In [16]:
# Creating the primary key location_id & renaming columns below as per requirement
df_location = df_location.select(row_number().over(Window.orderBy(df_location['atm_location'])).alias('location_id'),'*')\
                            .withColumnRenamed('atm_location','location')\
                            .withColumnRenamed('atm_streetname','streetname')\
                            .withColumnRenamed('atm_street_number','street_number')\
                            .withColumnRenamed('atm_zipcode','zipcode')
                            #.withColumnRenamed('atm_lat','lat')\
                            #.withColumnRenamed('atm_lon','lon')

In [17]:
# Validating changes in the dimension records
df_location.orderBy(df_location['location']).show(5)

+-----------+--------------------+------------+-------------+-------+-----------+-----------+
|location_id|            location|  streetname|street_number|zipcode|    atm_lat|    atm_lon|
+-----------+--------------------+------------+-------------+-------+-----------+-----------+
|          1|             Aabybro|   Østergade|            6|   9440|57.16215457| 9.73008178|
|          2|      Aalborg Hallen|Europa Plads|            4|   9000|57.04365629| 9.91267605|
|          3|Aalborg Storcente...|    Hobrovej|          452|   9200|57.00479614| 9.87593478|
|          4|Aalborg Storcente...|    Hobrovej|          452|   9200|57.00479614| 9.87593478|
|          5|              Aalbæk|  Centralvej|            5|   9982|57.59326139|10.41166458|
+-----------+--------------------+------------+-------------+-------+-----------+-----------+
only showing top 5 rows



In [18]:
# Validating the record count
df_location.count()

106

### ***********************************************ATM dimension*************************************************************
atm_id INT
atm_number VARCHAR(20)
atm_manufacturer VARCHAR(50)
atm_location_id

In [19]:
# Creating the ATM dimension & renaming the columns as below
df_atm = etl_df.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon','atm_location')\
        .withColumnRenamed('atm_id','atm_number').distinct()\
        .withColumnRenamed('atm_location','location')

In [20]:
# Validating changes in the dimension records
df_atm.show(2)

+----------+----------------+-----------+-----------+-----------+
|atm_number|atm_manufacturer|    atm_lat|    atm_lon|   location|
+----------+----------------+-----------+-----------+-----------+
|        25| Diebold Nixdorf|55.39420346|10.36993204|     Odense|
|        91|             NCR|56.56678482| 9.02660419|Skive Lobby|
+----------+----------------+-----------+-----------+-----------+
only showing top 2 rows



In [21]:
# Validating the record count
df_atm.count()

109

In [22]:
# Joining with location dimension to fetch location_id column
df_atm_dim = df_atm.join(df_location, on=['location', 'atm_lat','atm_lon'], how='left')

In [23]:
# Validating the record count
df_atm_dim.count()

109

In [24]:
# Validating changes in the dimension records
df_atm_dim.show(3)

+-----------+-----------+-----------+----------+----------------+-----------+-------------+-------------+-------+
|   location|    atm_lat|    atm_lon|atm_number|atm_manufacturer|location_id|   streetname|street_number|zipcode|
+-----------+-----------+-----------+----------+----------------+-----------+-------------+-------------+-------+
|Nørresundby| 57.0586772|  9.9224726|       105| Diebold Nixdorf|         73|       Torvet|            6|   9400|
|   Hillerød|55.93325641|12.31363816|        57|             NCR|         33|Københavnsvej|           31|   3400|
|     Odense|55.39420346|10.36993204|        25| Diebold Nixdorf|         74|    Fælledvej|            3|   5000|
+-----------+-----------+-----------+----------+----------------+-----------+-------------+-------------+-------+
only showing top 3 rows



In [25]:
# Creating the primary key atm_id & renaming columns below as per requirement
DIM_ATM = df_atm_dim.select(row_number().over(Window.orderBy('atm_number')).alias('atm_id'),\
            'atm_number', 'atm_manufacturer','location_id')\
            .withColumnRenamed('location_id','atm_location_id')

In [26]:
# Validating the record count
DIM_ATM.count()

109

In [27]:
# Renaming the columns as below in location dimension
DIM_LOCATION = df_location.withColumnRenamed('atm_lat','lat')\
                             .withColumnRenamed('atm_lon','lon')

In [28]:
# Validating the record count
DIM_LOCATION.count()

106

### ***********************************************Date dimension **********************************************************
date_id INT
full_date_time TIMESTAMP
year INT
month VARCHAR(20)
day INT
hour INT
weekday VARCHAR(20)

In [29]:
# Creating the date dimension
df_date = etl_df.select('year', 'month', 'day', 'hour', 'weekday').distinct()

In [30]:
# Validating the record count
df_date.count()

4348

In [31]:
# Validating the records
df_date.show(3)

+----+-------+---+----+-------+
|year|  month|day|hour|weekday|
+----+-------+---+----+-------+
|2017|January|  1|   9| Sunday|
|2017|January|  3|   5|Tuesday|
|2017|January|  8|  19| Sunday|
+----+-------+---+----+-------+
only showing top 3 rows



In [32]:
# Creating the logic for full_date_time column as timestamp
df_date\
.withColumn('full_date_time',from_unixtime(unix_timestamp(concat_ws(' ', 'year', 'month', 'day', 'hour'),'yyyy MMMM d H')))\
.show(3, truncate=False)

+----+-------+---+----+-------+-------------------+
|year|month  |day|hour|weekday|full_date_time     |
+----+-------+---+----+-------+-------------------+
|2017|January|1  |9   |Sunday |2017-01-01 09:00:00|
|2017|January|3  |5   |Tuesday|2017-01-03 05:00:00|
|2017|January|8  |19  |Sunday |2017-01-08 19:00:00|
+----+-------+---+----+-------+-------------------+
only showing top 3 rows



In [33]:
# Including the full_date_time column in date dimension
df_date = df_date\
.withColumn('full_date_time',from_unixtime(unix_timestamp(concat_ws(' ', 'year', 'month', 'day', 'hour'),'yyyy MMMM d H')))

In [34]:
# Validating the record count
df_date.count()

4348

In [35]:
# Creating the primary key location_id & selecting columns below as per requirement
DIM_DATE = df_date.select(row_number().over(Window.orderBy('full_date_time')).alias('date_id'),\
              'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

In [36]:
# Validating the record count
DIM_DATE.count()

4348

In [37]:
# Validating the records
DIM_DATE.show(4)

+-------+-------------------+----+-------+----+----+-------+
|date_id|     full_date_time|year|  month| day|hour|weekday|
+-------+-------------------+----+-------+----+----+-------+
|      1|               null|null|  month|null|null|weekday|
|      2|2017-01-01 00:00:00|2017|January|   1|   0| Sunday|
|      3|2017-01-01 01:00:00|2017|January|   1|   1| Sunday|
|      4|2017-01-01 02:00:00|2017|January|   1|   2| Sunday|
+-------+-------------------+----+-------+----+----+-------+
only showing top 4 rows



### ***********************************************Card type dimension*******************************************************
card_type_id INT
card_type VARCHAR(30)

In [38]:
# Creating the cardtype dimension
df_card = etl_df.select('card_type').distinct()

In [39]:
# Validating the record count
df_card.count()

13

In [40]:
# Validating the records
df_card.show(2)

+---------------+
|      card_type|
+---------------+
|Dankort - on-us|
|         CIRRUS|
+---------------+
only showing top 2 rows



In [41]:
# Creating the primary key card_type_id
DIM_CARD_TYPE = df_card.select(row_number().over(Window.orderBy('card_type')).alias('card_type_id'),'card_type')

In [42]:
# Validating the record count
DIM_CARD_TYPE.count()

13

### ***********************************************Transaction fact*******************************************************
trans_id BIGINT
atm_id INT
weather_loc_id INT
date_id INT
card_type_id INT
atm_status VARCHAR(20)
currency VARCHAR(10)
service VARCHAR(20)
transaction_amount INT
message_code VARCHAR(255)
message_text VARCHAR(255)
rain_3h DECIMAL(10,3)
clouds_all INT
weather_id INT
weather_main VARCHAR(50)
weather_description VARCHAR(255)

In [43]:
# Creating the Transaction fact 
FACT_ATM_TRANS = etl_df\
# Rename below columns as per requirement
    .withColumnRenamed('atm_location','location')\
    .withColumnRenamed('atm_streetname','streetname')\
    .withColumnRenamed('atm_street_number','street_number')\
    .withColumnRenamed('atm_zipcode','zipcode')\
    .withColumnRenamed('atm_lat','lat')\
    .withColumnRenamed('atm_lon','lon')\
# Join with DIM_LOCATION
    .join(DIM_LOCATION, on=['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = 'left')\
# Rename 'atm_id'-> 'atm_number'   &   'location_id' -> 'atm_location_id'
    .withColumnRenamed('atm_id', 'atm_number')\
    .withColumnRenamed('location_id', 'atm_location_id')\
# Join with DIM_ATM
    .join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = 'left')\
# Join with DIM_DATE
    .join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = 'left')\
# Join with DIM_CARD_TYPE
    .join(DIM_CARD_TYPE, on = ['card_type'], how = 'left')\
# Rename atm_location_id to weather_loc_id
    .withColumnRenamed('atm_location_id', 'weather_loc_id')\
# Create trans_id 
    .withColumn("trans_id", row_number().over(Window.orderBy('date_id')))\
# Select only required columns in final fact 
    .select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id','atm_status', 'currency',\
            'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h',\
            'clouds_all', 'weather_id', 'weather_main', 'weather_description')


In [44]:
# Validating the record count
FACT_ATM_TRANS.count()

1250001

In [45]:
# Checking the original record count
etl_df.count()

1250001

In [46]:
# Validationg the records
FACT_ATM_TRANS.show(2)

+--------+------+--------------+-------+------------+----------+--------+------------+------------------+------------+------------+-------+----------+----------+-------------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|     service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|       weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+------------+------------------+------------+------------+-------+----------+----------+-------------------+-------------------+
|       1|  null|          null|   null|          13|atm_status|currency|message_code|              null|message_text| weather_lat|   null|      null|      null|weather_description|               null|
|       2|    26|            38|      2|           8|    Active|     DKK|        null|              null|        null|   56.643059|   92.0|       500|      null|         light rain|           

### Loading the dimension and fact tables into Amazon S3 bucket

In [None]:
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project/dim_location', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_atm' in csv format to dim_atm folder in S3 bucket 'upgrad-bucket-s3'
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project/dim_atm', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_data' in csv format to dim_data folder in S3 bucket 'upgrad-bucket-s3'
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project/dim_date', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_card_type' in csv format to dim_card_type folder in S3 bucket 'upgrad-bucket-s3'
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project/dim_card_type', mode='overwrite')

In [None]:
# writing data from pyspark df 'fact_atm_trans' in csv format to fact_atm_trans folder in S3 bucket 'upgrad-bucket-s3'
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project/fact_atm_trans', mode='overwrite')