## ETL Project (Spar Nord Bank ATM Data Mart)

### Importing libraries and setting up SparkSession

In [2]:
import os 
import sys 
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda/bin/python'
os.environ["JAVA_HOME"] = '/usr/java/jdk1.8.0_232-cloudera/jre'
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/'
os.environ['PYLIB'] = os.environ["SPARK_HOME"] + '/python/lib'
sys.path.insert(0,os.environ["PYLIB"] +'/py4j-0.10.6-src.zip')
sys.path.insert(0,os.environ['PYLIB'] + 'pyspark.zip')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('jupyter_Spark').master('local').getOrCreate()
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f1d46191bd0>

### Reading the data from HDFS

In [5]:
sprdf = spark.read.csv('/user/root/SRC_ATM_TRANS/part-m-00000', header = False,inferSchema = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
sprdf.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

## Creating custom input schema using StrucType and reading the data

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType,LongType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
SparSchema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
sprdf = spark.read.csv('/user/root/SRC_ATM_TRANS/part-m-00000',header = False, schema = SparSchema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#verifying the count of the records for validation
sprdf.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Count of Records - <strong><span style="color:Green">2468572</span></strong>

In [11]:
#checking Schema,Dataframe and the columns
sprdf.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [12]:
sprdf.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

In [13]:
sprdf.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'month', 'day', 'weekday', 'hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

## Creating the Dimension and Fact tables

####  creating a dataframe for location dimension according to target dimension model

In [14]:
#temporary dataframe
loc = sprdf.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
loc.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [16]:
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
#primary key column
sprdf_temp = loc.rdd.zipWithIndex().toDF()
dim_loc = sprdf_temp.select(col("_1.*"),col("_2").alias('location_id'))
dim_loc.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|    atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|         Kolding|           Vejlevej|              135|       6000| 55.505|  9.457|          0|
|  Skelagervej 15|        Skelagervej|               15|       9000| 57.023|  9.891|          1|
|Intern HolbÃƒÂ¦k|        Slotsvolden|                7|       4300| 55.718| 11.704|          2|
|          Odense|       FÃƒÂ¦lledvej|                3|       5000| 55.394|  10.37|          3|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|          4|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows

In [18]:
#renaming the columns
DIM_LOC = dim_loc.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# rearranging the columns according to the target model
DIM_LOC = DIM_LOC.select('location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# checking that all required columns are present and named correctly
DIM_LOC.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon']

In [21]:
# validating the count of the dataframe
DIM_LOC.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

### Count of locations - <strong><span style="color:Green">109</span></strong>

In [22]:
DIM_LOC.orderBy(col("lat"), col("lon")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|         25|           Svendborg|Sankt Nicolai Gade|            1|   5700|55.058|10.609|
|         11|          NÃƒÂ¦stved|       Farimagsvej|            8|   4700|55.233|11.763|
|         80|              Nyborg|        Vestergade|           35|   5800|55.318|10.781|
|         59|      Intern  Odense|      FÃƒÂ¦lledvej|            3|   5000|55.394| 10.37|
|          3|              Odense|      FÃƒÂ¦lledvej|            3|   5000|55.394| 10.37|
|         27|            Slagelse|Mariendals AllÃƒÂ¨|           29|   4200|55.398|11.342|
|         14|            Slagelse|   Mariendals Alle|           29|   4200|55.398|11.342|
|         47|             KÃƒÂ¸ge|    SÃƒÂ¸ndre Alle|            1|   4600|55.454|12.181|
|         

In [23]:
DIM_LOC.filter(col("location").contains("Intern")).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

## Creating a dataframe for ATM Dimension according to Target Dimension Model

In [24]:
#creating a temporary df and selecting required columns
atm_df = sprdf.select('atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
#renaming the column atm_id to atm_number as per requirement
atm_df = atm_df.withColumnRenamed('atm_id', 'atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
#joining the dim_location and atm dataframes
atm_df = atm_df.join(DIM_LOC, 
                     (atm_df['atm_location'] == DIM_LOC['location']) &
                     (atm_df['atm_streetname'] == DIM_LOC['streetname']) &
                     (atm_df['atm_street_number'] == DIM_LOC['street_number']) &
                     (atm_df['atm_zipcode'] == DIM_LOC['zipcode']) &
                     (atm_df['atm_lat'] == DIM_LOC['lat']) &
                     (atm_df['atm_lon'] == DIM_LOC['lon'])
                     , how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# checking to columns in the joined df
atm_df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_number', 'atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon']

In [28]:
atm_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [29]:
# selecting the required columns and making sure records are distinct
atm_df = atm_df.select('atm_number', 'atm_manufacturer', 'location_id').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# renaming the colums as per requirement
atm_df = atm_df.withColumnRenamed('location_id', 'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# viewing changes in columns
atm_df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_number', 'atm_manufacturer', 'atm_location_id']

In [32]:
# creating the primary key column
sprdf_temp = atm_df.rdd.zipWithIndex().toDF()
dim_atm = sprdf_temp.select(col("_1.*"),col("_2").alias('atm_id'))
dim_atm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+---------------+------+
|atm_number|atm_manufacturer|atm_location_id|atm_id|
+----------+----------------+---------------+------+
|        40| Diebold Nixdorf|             86|     0|
|        28|             NCR|             33|     1|
|        53|             NCR|             28|     2|
|        66|             NCR|             20|     3|
|        42|             NCR|             96|     4|
+----------+----------------+---------------+------+
only showing top 5 rows

In [79]:
# rearranging the columns according to the target model
DIM_ATM = dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
# checking that all required columns are present and named correctly
DIM_ATM.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [81]:
# validating the count of the dataframe
DIM_ATM.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### Count of atms - <strong><span style="color:Green">113</span></strong>

## Creating a dataframe for Date Dimension according to Target Dimension Model

In [82]:
# creating a temporary df and selecting required columns
date_df = sprdf.select('year', 'month', 'day', 'hour', 'weekday')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
date_df = date_df.withColumn('full_date', concat_ws('-', date_df.year, date_df.month, date_df.day))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
date_df = date_df.withColumn('full_time', concat_ws(':', date_df.hour, lit('00'), lit('00')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
date_df = date_df.withColumn('full_date_time', concat_ws(' ', date_df.full_date, date_df.full_time))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
Pattern = 'yyyy-MMM-dd HH:mm:ss'
date_df = date_df.withColumn('full_date_time', unix_timestamp(date_df.full_date_time, Pattern).cast('timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
date_df.show(5, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+-------+--------------+---------+-------------------+
|year|month  |day|hour|weekday|full_date     |full_time|full_date_time     |
+----+-------+---+----+-------+--------------+---------+-------------------+
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
+----+-------+---+----+-------+--------------+---------+-------------------+
only showing top 5 rows

In [88]:
# selecting the required columns and making sure records are distinct
date_df = date_df.select('year', 'month', 'day', 'hour', 'weekday', 'full_date_time').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
# creating the primary key column
sprdf_temp = date_df.rdd.zipWithIndex().toDF()
DIM_DATE = sprdf_temp.select(col("_1.*"),col("_2").alias('Date_id'))
DIM_DATE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---+----+--------+-------------------+-------+
|year|   month|day|hour| weekday|     full_date_time|Date_id|
+----+--------+---+----+--------+-------------------+-------+
|2017|February| 26|   8|  Sunday|2017-02-26 08:00:00|      0|
|2017|   March| 14|   6| Tuesday|2017-03-14 06:00:00|      1|
|2017|   March| 28|  12| Tuesday|2017-03-28 12:00:00|      2|
|2017|   March| 12|  13|  Sunday|2017-03-12 13:00:00|      3|
|2017|   March| 23|  23|Thursday|2017-03-23 23:00:00|      4|
+----+--------+---+----+--------+-------------------+-------+
only showing top 5 rows

In [90]:
# rearranging the columns according to the target model
DIM_DATE = DIM_DATE.select('Date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
# checking that all required columns are present and named correctly
DIM_DATE.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday']

In [92]:
# validating the count of the dataframe
DIM_DATE.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

### Count of date dimension - <strong><span style="color:Green">8685</span></strong>

## creating a dataframe for Card Type Dimension according to Target Dimension Model

In [93]:
# creating a temporary df and selecting required columns and making sure records are distinct
card_type_df = sprdf.select('card_type').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
# creating the primary key column
sprdf_temp = card_type_df.rdd.zipWithIndex().toDF()
DIM_CARD_TYPE = sprdf_temp.select(col("_1.*"),col("_2").alias('card_type_id'))
DIM_CARD_TYPE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|Visa Dankort - on-us|           0|
|  Mastercard - on-us|           1|
|         HÃƒÂ¦vekort|           2|
|            VisaPlus|           3|
|     Dankort - on-us|           4|
+--------------------+------------+
only showing top 5 rows

In [95]:
# rearranging the columns according to the target model
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id', 'card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
# checking that all required columns are present and named correctly
DIM_CARD_TYPE.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['card_type_id', 'card_type']

In [97]:
# validating the count of the dataframe
DIM_CARD_TYPE.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

### Count of Card Type - <strong><span style="color:Green">12</span></strong>

### Creating the Transaction Fact Table according to Target Model

In [98]:
# Stage 1 of FACT_ATM_TRANS Table -> joining original dataframe with DIM_LOC

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
# renaming the colums as per requirement
fact_location = sprdf.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
# joining the dfs
fact_location = fact_location.join(DIM_LOC, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
# viewing the columns
fact_location.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'year', 'month', 'day', 'weekday', 'hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'location_id']

In [102]:
# Validating the count of the df at the end of Stage 1
fact_location.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [103]:
# Stage 2 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_ATM

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [104]:
# renaming the colums as per requirement
fact_location = fact_location.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [105]:
# joining the dfs
fac_atm = fact_location.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [106]:
# performing necessary transformations, same as done to atm table
fac_atm = fac_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
# Validating the count of the df at the end of Stage 2
fac_atm.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [108]:
# Stage 3 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_DATE

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [109]:
# joining the dfs
fac_date = fac_atm.join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [110]:
# Validating the count of the df at the end of Stage 3
fac_date.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Stage 4 of fac_atmtrans Table -> joining the dataframe with DIM_CARD_TYPE

In [112]:
# joining the dfs
fac_atmtrans = fac_date.join(DIM_CARD_TYPE, on = ['card_type'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [113]:
# Validating the count of the df at the end of Stage 4
fac_atmtrans.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [114]:
# creating primary key of fact table and viewing 1st record of the table
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
FAC_ATMTRANS = fac_atmtrans.withColumn("trans_id", row_number().over(w))
FAC_ATMTRANS.show(1, True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+-------+---+----+-------+----------+----------------+--------------+-------------------+------------+-------------+-------+------+-----+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------+-------+-------------------+------------+--------+
|           card_type|year|  month|day|hour|weekday|atm_number|atm_manufacturer|weather_loc_id|           location|  streetname|street_number|zipcode|   lat|  lon|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|atm_id|Date_id|     full_date_time|card_type_id|trans_id|
+--------------------+----+-------+---+----+-------+----------+---------

In [115]:
# viewing the list of columns
FAC_ATMTRANS.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['card_type', 'year', 'month', 'day', 'hour', 'weekday', 'atm_number', 'atm_manufacturer', 'weather_loc_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_status', 'currency', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id', 'Date_id', 'full_date_time', 'card_type_id', 'trans_id']

In [116]:
# selecting and arranging only the required columns according to the target model
FAC_ATMTRANS = FAC_ATMTRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 
'clouds_all', 'weather_id', 'weather_main', 'weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [117]:
# checking that all required columns are present and named correctly
FAC_ATMTRANS.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

In [118]:
# validating the count of the dataframe
FAC_ATMTRANS.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Count of Atm transactions - <strong><span style="color:Green">2468572</span></strong>

## Writing the PySpark Dataframes to AWS S3 Storage in csv format

In [120]:
# writing data from pyspark df 'DIM_LOC' in csv format to dim_location folder in S3 bucket
DIM_LOC.coalesce(1).write.format('csv').option('header','false').save('s3://etlprojectfolder/dim_location/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [121]:
# writing data from pyspark df 'DIM_ATM' in csv format to dim_atm folder in S3 bucket
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3://etlprojectfolder/dim_atm/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [122]:
# writing data from pyspark df 'DIM_DATE' in csv format to dim_data folder in S3 bucket
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3://etlprojectfolder/dim_date/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [123]:
# writing data from pyspark df 'DIM_CARD_TYPE' in csv format to dim_card_type folder in S3 bucket
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3://etlprojectfolder/dim_card_type/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [124]:
# writing data from pyspark df 'FAC_ATMTRANS' in csv format to fact_atm_trans folder in S3 bucket
FAC_ATMTRANS.coalesce(1).write.format('csv').option('header','false').save('s3://etlprojectfolder/fact_atm_trans/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…