#### ETL Project - Sagarmoy Sarkar - DS-38

### Reading the data from the files in HDFS and using PySpark to create fact and dimension table

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1659205829306_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5335ade850>

In [2]:
# importing PySpark Datatypes
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- __Creating custom schema using the StructType class of PySpark, to avoid any data type mismatch__

In [3]:
# Creating custom schema using the StructType class of PySpark, to avoid any data type mismatch
fileSchema = StructType([StructField('year', IntegerType(),True),
                         StructField('month', StringType(),True),
                         StructField('day', IntegerType(),True),
                         StructField('weekDay', StringType(),True),
                         StructField('hour', IntegerType(),True),
                         StructField('atm_status', StringType(),True),
                         StructField('atm_id', StringType(),True),
                         StructField('atm_manufacturer', StringType(),True),
                         StructField('atm_location', StringType(),True),
                         StructField('atm_streetname', StringType(),True),
                         StructField('atm_street_number', IntegerType(),True),
                         StructField('atm_zipcode', IntegerType(),True),
                         StructField('atm_lat', DoubleType(),True),
                         StructField('atm_lon', DoubleType(),True),
                         StructField('currency', StringType(),True),
                         StructField('card_type', StringType(),True),
                         StructField('transaction_amount', IntegerType(),True), 
                         StructField('service', StringType(),True),
                         StructField('message_code', StringType(),True),
                         StructField('message_text', StringType(),True),
                         StructField('weather_lat', DoubleType(),True),
                         StructField('weather_lon', DoubleType(),True),
                         StructField('weather_city_id', IntegerType(),True),
                         StructField('weather_city_name', StringType(),True), 
                         StructField('temp', DoubleType(),True),
                         StructField('pressure', IntegerType(),True), 
                         StructField('humidity', IntegerType(),True), 
                         StructField('wind_speed', IntegerType(),True),
                         StructField('wind_deg', IntegerType(),True), 
                         StructField('rain_3h', DoubleType(),True), 
                         StructField('clouds_all', IntegerType(),True), 
                         StructField('weather_id', IntegerType(),True), 
                         StructField('weather_main', StringType(),True), 
                         StructField('weather_description', StringType(),True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
raw_file = spark.read.csv("/user/root/atm_data/part-m-00000", header = False, schema = fileSchema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
raw_file.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [6]:
# importing PySpark SQL Funtions
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
raw_file.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekDay|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [8]:
# Checking count after importing data into a dataframe
raw_file.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- __Number of records after importing data into a dataframe : 2468571__

### Creating Facts and Dimension Table

__1. Creating Location Dimension table__

In [9]:
# creating a location table
location_data = raw_file.select('atm_location',
                                'atm_streetname',
                                'atm_street_number',
                                'atm_zipcode',
                                'atm_lat',
                                'atm_lon').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
location_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-----------------+-----------+-------+-------+
|    atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+----------------+-------------------+-----------------+-----------+-------+-------+
|         Kolding|           Vejlevej|              135|       6000| 55.505|  9.457|
|  Skelagervej 15|        Skelagervej|               15|       9000| 57.023|  9.891|
|Intern HolbÃƒÂ¦k|        Slotsvolden|                7|       4300| 55.718| 11.704|
|          Odense|       FÃƒÂ¦lledvej|                3|       5000| 55.394|  10.37|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|
+----------------+-------------------+-----------------+-----------+-------+-------+
only showing top 5 rows

In [11]:
location_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [12]:
location_data = location_data.withColumnRenamed('atm_location', 'location')
location_data = location_data.withColumnRenamed('atm_streetname', 'streetname')
location_data = location_data.withColumnRenamed('atm_street_number', 'street_number')
location_data = location_data.withColumnRenamed('atm_zipcode', 'zipcode')
location_data = location_data.withColumnRenamed('atm_lat', 'lat')
location_data = location_data.withColumnRenamed('atm_lon', 'lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
location_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [14]:
location_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-------------+-------+------+------+
|        location|         streetname|street_number|zipcode|   lat|   lon|
+----------------+-------------------+-------------+-------+------+------+
|         Kolding|           Vejlevej|          135|   6000|55.505| 9.457|
|  Skelagervej 15|        Skelagervej|           15|   9000|57.023| 9.891|
|Intern HolbÃƒÂ¦k|        Slotsvolden|            7|   4300|55.718|11.704|
|          Odense|       FÃƒÂ¦lledvej|            3|   5000|55.394| 10.37|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|           12|   7430|56.139| 9.154|
+----------------+-------------------+-------------+-------+------+------+
only showing top 5 rows

In [15]:
# create location dimension table 
dim_location = location_data.select(F.row_number().over(Window.partitionBy().orderBy(location_data['location'])).alias("location_id"),
                                    'location',
                                    'streetname',
                                    'street_number',
                                    'zipcode',
                                    'lat',
                                    'lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
dim_location.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+-------------+-------+------+-----+
|location_id|            location|  streetname|street_number|zipcode|   lat|  lon|
+-----------+--------------------+------------+-------------+-------+------+-----+
|          1|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|          2|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|          3|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          4|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          5|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows

In [17]:
dim_location.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [18]:
# Checking count for the Location Dimension
dim_location.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

- __Number of records in Location Dimension ---> dim_location : 109__

__2. Creating ATM Dimention table__

In [19]:
# creating atm data
atm_data_temp1 = raw_file.select('atm_id',
                                 'atm_manufacturer',
                                 'atm_location',
                                 'atm_streetname',
                                 'atm_lat',
                                 'atm_lon').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
atm_data_temp1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)

In [21]:
atm_data_temp1.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [22]:
atm_data_temp1.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------------+--------------------+--------------+-------+-------+
|atm_id|atm_manufacturer|        atm_location|atm_streetname|atm_lat|atm_lon|
+------+----------------+--------------------+--------------+-------+-------+
|    17|             NCR|             Randers|  ÃƒËœstervold| 56.462| 10.038|
|    23| Diebold Nixdorf|             Vodskov|    Vodskovvej| 57.104| 10.027|
|    73|             NCR|         HÃƒÂ¸jbjerg|Rosenvangsalle| 56.119| 10.192|
|    86|             NCR|HillerÃƒÂ¸d IdrÃƒ...|    Milnersvej| 55.921| 12.299|
|    18| Diebold Nixdorf|              Viborg|     Toldboden| 56.448|  9.401|
+------+----------------+--------------------+--------------+-------+-------+
only showing top 5 rows

In [23]:
atm_data_temp1 = atm_data_temp1.withColumnRenamed('atm_id', 'atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
atm_data_temp1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)

In [25]:
atm_data_temp1.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+--------------------+--------------+-------+-------+
|atm_number|atm_manufacturer|        atm_location|atm_streetname|atm_lat|atm_lon|
+----------+----------------+--------------------+--------------+-------+-------+
|        17|             NCR|             Randers|  ÃƒËœstervold| 56.462| 10.038|
|        23| Diebold Nixdorf|             Vodskov|    Vodskovvej| 57.104| 10.027|
|        73|             NCR|         HÃƒÂ¸jbjerg|Rosenvangsalle| 56.119| 10.192|
|        86|             NCR|HillerÃƒÂ¸d IdrÃƒ...|    Milnersvej| 55.921| 12.299|
|        18| Diebold Nixdorf|              Viborg|     Toldboden| 56.448|  9.401|
+----------+----------------+--------------------+--------------+-------+-------+
only showing top 5 rows

In [26]:
atm_data_temp1.registerTempTable('atm')
dim_location.registerTempTable('loc')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
atm_data_temp2 = spark.sql("select atm.atm_number, atm.atm_manufacturer, loc.location_id as atm_location_id from atm join loc on atm.atm_location = loc.location and atm.atm_streetname = loc.streetname and atm.atm_lat = loc.lat and atm.atm_lon = loc.lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
atm_data_temp2.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+---------------+
|atm_number|atm_manufacturer|atm_location_id|
+----------+----------------+---------------+
|        17|             NCR|             78|
|        23| Diebold Nixdorf|            107|
|        73|             NCR|             44|
|        86|             NCR|             36|
|        18| Diebold Nixdorf|            105|
+----------+----------------+---------------+
only showing top 5 rows

In [29]:
atm_data_temp2.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [30]:
dim_atm = atm_data_temp2.select(F.row_number().over(Window.partitionBy().orderBy(atm_data_temp2['atm_number'])).alias("atm_id"),
                                'atm_number',
                                'atm_manufacturer',
                                'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
dim_atm.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

In [32]:
dim_atm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             74|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       101|             NCR|             17|
|     5|       102|             NCR|              3|
+------+----------+----------------+---------------+
only showing top 5 rows

In [33]:
# Checking count for the ATM Dimension
dim_atm.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

- __Number of records in ATM Dimension ---> dim_atm : 113__

__3. Creating Date Dimension table__

In [34]:
date_data_temp1 = raw_file.select('year', 'month', 'day', 'hour', 'weekDay').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
date_data_temp1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)

In [36]:
date_data_temp1.select('month').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [37]:
date_data_temp1.select('month').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|    month|
+---------+
| February|
| December|
|September|
|     July|
|    March|
|    April|
|      May|
|   August|
|     June|
| November|
|  January|
|  October|
+---------+

In [38]:
date_data_temp1.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---+----+--------+
|year|   month|day|hour| weekDay|
+----+--------+---+----+--------+
|2017| January| 29|  23|  Sunday|
|2017| January| 29|   1|  Sunday|
|2017|February| 14|   5| Tuesday|
|2017|February|  2|   7|Thursday|
|2017|February| 13|   2|  Monday|
+----+--------+---+----+--------+
only showing top 5 rows

In [39]:
date_data_temp1 = date_data_temp1.withColumn('month_num', from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
date_data_temp1 = date_data_temp1.withColumn('day_num', format_string("%02d", col('day').cast('int')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
date_data_temp1 = date_data_temp1.withColumn('hour_num', format_string("%02d", col('hour').cast('int')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
date_data_temp1.show(8)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---+----+---------+---------+-------+--------+
|year|   month|day|hour|  weekDay|month_num|day_num|hour_num|
+----+--------+---+----+---------+---------+-------+--------+
|2017| January| 29|  23|   Sunday|       01|     29|      23|
|2017| January| 29|   1|   Sunday|       01|     29|      01|
|2017|February| 14|   5|  Tuesday|       02|     14|      05|
|2017|February|  2|   7| Thursday|       02|     02|      07|
|2017|February| 13|   2|   Monday|       02|     13|      02|
|2017| January| 11|  11|Wednesday|       01|     11|      11|
|2017|February| 12|  21|   Sunday|       02|     12|      21|
|2017| January| 13|  22|   Friday|       01|     13|      22|
+----+--------+---+----+---------+---------+-------+--------+
only showing top 8 rows

In [43]:
# concatenating year|month|day|hour columns to create new column full_date_time
date_data_temp2 = date_data_temp1.withColumn('full_date_time',concat(col('year'),
                                                                     lit('/'),
                                                                     col('month_num'),
                                                                     lit('/'),
                                                                     col('day_num'),
                                                                     lit(' '),
                                                                     col('hour_num'),
                                                                     lit(':00:00')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
date_data_temp2.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- month_num: string (nullable = true)
 |-- day_num: string (nullable = false)
 |-- hour_num: string (nullable = false)
 |-- full_date_time: string (nullable = true)

In [45]:
date_data_temp2.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---+----+--------+---------+-------+--------+-------------------+
|year|   month|day|hour| weekDay|month_num|day_num|hour_num|     full_date_time|
+----+--------+---+----+--------+---------+-------+--------+-------------------+
|2017| January| 29|  23|  Sunday|       01|     29|      23|2017/01/29 23:00:00|
|2017| January| 29|   1|  Sunday|       01|     29|      01|2017/01/29 01:00:00|
|2017|February| 14|   5| Tuesday|       02|     14|      05|2017/02/14 05:00:00|
|2017|February|  2|   7|Thursday|       02|     02|      07|2017/02/02 07:00:00|
|2017|February| 13|   2|  Monday|       02|     13|      02|2017/02/13 02:00:00|
+----+--------+---+----+--------+---------+-------+--------+-------------------+
only showing top 5 rows

In [46]:
pattern1 = 'yyyy/MM/dd HH:mm:ss'
date_data_temp3 = date_data_temp2.withColumn('full_date_time', unix_timestamp(date_data_temp2['full_date_time'], pattern1).cast('timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
date_data_temp3.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- month_num: string (nullable = true)
 |-- day_num: string (nullable = false)
 |-- hour_num: string (nullable = false)
 |-- full_date_time: timestamp (nullable = true)

In [48]:
date_data_temp3.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+---+----+--------+---------+-------+--------+-------------------+
|year|   month|day|hour| weekDay|month_num|day_num|hour_num|     full_date_time|
+----+--------+---+----+--------+---------+-------+--------+-------------------+
|2017| January| 29|  23|  Sunday|       01|     29|      23|2017-01-29 23:00:00|
|2017| January| 29|   1|  Sunday|       01|     29|      01|2017-01-29 01:00:00|
|2017|February| 14|   5| Tuesday|       02|     14|      05|2017-02-14 05:00:00|
|2017|February|  2|   7|Thursday|       02|     02|      07|2017-02-02 07:00:00|
|2017|February| 13|   2|  Monday|       02|     13|      02|2017-02-13 02:00:00|
+----+--------+---+----+--------+---------+-------+--------+-------------------+
only showing top 5 rows

In [49]:
date_data_temp3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [50]:
date_data_temp4 = date_data_temp3.select('full_date_time', 'year', 'month', 'day', 'hour', 'weekDay')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
date_data_temp4.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)

In [52]:
date_data_temp4.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----+--------+---+----+--------+
|     full_date_time|year|   month|day|hour| weekDay|
+-------------------+----+--------+---+----+--------+
|2017-01-29 23:00:00|2017| January| 29|  23|  Sunday|
|2017-01-29 01:00:00|2017| January| 29|   1|  Sunday|
|2017-02-14 05:00:00|2017|February| 14|   5| Tuesday|
|2017-02-02 07:00:00|2017|February|  2|   7|Thursday|
|2017-02-13 02:00:00|2017|February| 13|   2|  Monday|
+-------------------+----+--------+---+----+--------+
only showing top 5 rows

In [53]:
dim_date = date_data_temp4.select(F.row_number().over(Window.partitionBy().orderBy(date_data_temp4['full_date_time'])).alias("date_id"),
                                  'full_date_time',
                                  'year',
                                  'month',
                                  'day',
                                  'hour',
                                  'weekDay')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
dim_date.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)

In [55]:
dim_date.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekDay|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
|      6|2017-01-01 05:00:00|2017|January|  1|   5| Sunday|
|      7|2017-01-01 06:00:00|2017|January|  1|   6| Sunday|
|      8|2017-01-01 07:00:00|2017|January|  1|   7| Sunday|
|      9|2017-01-01 08:00:00|2017|January|  1|   8| Sunday|
|     10|2017-01-01 09:00:00|2017|January|  1|   9| Sunday|
|     11|2017-01-01 10:00:00|2017|January|  1|  10| Sunday|
|     12|2017-01-01 11:00:00|2017|January|  1|  11| Sunday|
|     13|2017-01-01 12:00:00|2017|January|  1|  12| Sunday|
|     14|2017-01-01 13:00:00|2017|Januar

In [56]:
# Checking count for the Date Dimension
dim_date.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

- __Number of records in Date Dimension ---> dim_date : 8685__

__4. Creating Card Dimension table__

In [57]:
card_type_data = raw_file.select("card_type").distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
card_type_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [59]:
card_type_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type: string (nullable = true)

In [60]:
dim_card_type = card_type_data.select(F.row_number().over(Window.partitionBy().orderBy(card_type_data['card_type'])).alias("card_type_id"),
                                      'card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
dim_card_type.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

In [62]:
dim_card_type.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|         HÃƒÂ¦vekort|
|           5| HÃƒÂ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+

In [63]:
# Checking count for the Card Type Dimension
dim_card_type.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

- __Number of records in Card Type Dimension ---> dim_date : 12__

__5. Creating Transaction Fact table__

- __Checking the Dimension tables__

In [64]:
dim_location.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+-------------+-------+------+-----+
|location_id|            location|  streetname|street_number|zipcode|   lat|  lon|
+-----------+--------------------+------------+-------------+-------+------+-----+
|          1|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|          2|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|          3|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          4|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          5|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows

In [65]:
dim_atm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             74|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       101|             NCR|             17|
|     5|       102|             NCR|              3|
+------+----------+----------------+---------------+
only showing top 5 rows

In [66]:
dim_date.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekDay|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
+-------+-------------------+----+-------+---+----+-------+
only showing top 5 rows

In [67]:
dim_card_type.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+
|card_type_id|          card_type|
+------------+-------------------+
|           1|             CIRRUS|
|           2|            Dankort|
|           3|    Dankort - on-us|
|           4|        HÃƒÂ¦vekort|
|           5|HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows

In [68]:
raw_file.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'month', 'day', 'weekDay', 'hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

In [69]:
raw_file.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- __First creating the temporary Transactional fact table from the raw file__
- __We will proceed by joining the dimension table with the Transactional fact table to create the main Transactional fact table__

In [70]:
fact_data_temp = raw_file.select('*')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
fact_data_temp.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [72]:
fact_data_temp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- Join with location dimension table

In [73]:
dim_location.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [74]:
# copying and creating temp location dimension data for joining
dim_location_t = dim_location

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
# renaming the temp location dimension data to avoid confusion
location_dim_col = dim_location_t.columns
for col_name in location_dim_col:
    old_name = col_name
    new_name = 'dim_'+ col_name
    dim_location_t = dim_location_t.withColumnRenamed(old_name, new_name)
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
dim_location_t.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- dim_location_id: integer (nullable = true)
 |-- dim_location: string (nullable = true)
 |-- dim_streetname: string (nullable = true)
 |-- dim_street_number: integer (nullable = true)
 |-- dim_zipcode: integer (nullable = true)
 |-- dim_lat: double (nullable = true)
 |-- dim_lon: double (nullable = true)

In [77]:
fact_data_temp1 = fact_data_temp.join(dim_location_t, (fact_data_temp.atm_location == dim_location_t.dim_location) & (fact_data_temp.atm_streetname == dim_location_t.dim_streetname) & (fact_data_temp.atm_lat == dim_location_t.dim_lat) & (fact_data_temp.atm_lon == dim_location_t.dim_lon))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [78]:
fact_data_temp1.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [79]:
fact_data_temp1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- Join with ATM dimension table

In [80]:
dim_atm.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

In [81]:
# copying and creating temp location dimension data for joining
dim_atm_t = dim_atm

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
# renaming the temp location dimension data to avoid confusion
atm_dim_col = dim_atm_t.columns
for col_name in atm_dim_col:
    old_name = col_name
    new_name = 'dim_'+ col_name
    dim_atm_t = dim_atm_t.withColumnRenamed(old_name, new_name)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
dim_atm_t.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- dim_atm_id: integer (nullable = true)
 |-- dim_atm_number: string (nullable = true)
 |-- dim_atm_manufacturer: string (nullable = true)
 |-- dim_atm_location_id: integer (nullable = true)

In [84]:
fact_data_temp2 = fact_data_temp1.join(dim_atm_t, (fact_data_temp1.atm_id == dim_atm_t.dim_atm_number) & (fact_data_temp1.atm_manufacturer == dim_atm_t.dim_atm_manufacturer))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
fact_data_temp2.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [86]:
fact_data_temp2.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- Join with Date dimension table

In [87]:
dim_date.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekDay: string (nullable = true)

In [88]:
fact_data_temp3 = fact_data_temp2.join(dim_date, (fact_data_temp2.year == dim_date.year) & (fact_data_temp2.month == dim_date.month) & (fact_data_temp2.day == dim_date.day) & (fact_data_temp2.hour == dim_date.hour))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
fact_data_temp3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [90]:
fact_data_temp3.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- Join with Card Type dimension table

In [91]:
dim_card_type.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

In [92]:
fact_data_temp4 = fact_data_temp3.join(dim_card_type, (fact_data_temp2.card_type == dim_card_type.card_type))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
fact_data_temp4.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [94]:
fact_data_temp4.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekDay: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- Creating main Transactional Fact Table

In [95]:
fact_atm_trans_temp = fact_data_temp4.select('dim_atm_id',   
                                             'dim_location_id',
                                             'date_id',
                                             'card_type_id',
                                             'atm_status',
                                             'currency',
                                             'service',
                                             'transaction_amount',
                                             'message_code',
                                             'message_text',
                                             'rain_3h',
                                             'clouds_all',
                                             'weather_id',
                                             'weather_main',
                                             'weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
fact_atm_trans_temp.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- dim_atm_id: integer (nullable = true)
 |-- dim_location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [97]:
fact_atm_trans_temp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [98]:
#renaming the columns as per dimentional model
fact_atm_trans_temp = fact_atm_trans_temp.withColumnRenamed('dim_atm_id', 'atm_id')
fact_atm_trans_temp = fact_atm_trans_temp.withColumnRenamed('dim_location_id', 'weather_loc_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
fact_atm_trans_temp.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [100]:
fact_atm_trans = fact_atm_trans_temp.select(F.row_number().over(Window.partitionBy().orderBy(fact_atm_trans_temp['atm_id'])).alias("trans_id"),
                                            'atm_id',
                                            'weather_loc_id',
                                            'date_id',
                                            'card_type_id',
                                            'atm_status',
                                            'currency',
                                            'service',
                                            'transaction_amount',
                                            'message_code',
                                            'message_text',
                                            'rain_3h',
                                            'clouds_all',
                                            'weather_id',
                                            'weather_main',
                                            'weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
fact_atm_trans.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|     1|            74|      1|           7|    Active|     DKK|Withdrawal|              5643|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       2|     1|            74|      2|           9|    Active|     DKK|Withdrawal|              1979|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       3|     1|            74|   

In [102]:
fact_atm_trans.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [103]:
#Checking count after the all the Stages in the creation of Transaction Fact table
fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- __Number of records in Transaction Fact table ---> fact_atm_trans: 2468572__

### Loading the dimension and fact tables into Amazon S3 bucket (etl-bucket-sagarmoy)

In [106]:
# loading Location Dimension table
dim_location.write.save("s3://etl-bucket-sagarmoy/dim_location/", format='csv', header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
# loading ATM Dimension table
dim_atm.write.save("s3://etl-bucket-sagarmoy/dim_atm/", format='csv', header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
# loading Date Dimension table
dim_date.write.save("s3://etl-bucket-sagarmoy/dim_date/", format='csv', header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [109]:
# loading Card Type Dimension table
dim_card_type.write.save("s3://etl-bucket-sagarmoy/dim_card_type/", format='csv', header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [110]:
# loading Transaction Fact table
fact_atm_trans.write.save("s3://etl-bucket-sagarmoy/fact_atm_trans/", format='csv', header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- __These Fact and Dimension table data will further help us to proceed with our analysis in Redshift__

### Thankyou