- # ETL Project

- ## Jupyter notebook for transformation actions on the dataset using Spark

## 1. Starting the Spark application and reading the data into a dataframe

In [1]:
# Starting the Spark application.
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1638168361295_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7ff14024ead0>

In [2]:
# Spark session is now available. Reading the data into the dataframe.
atm_data = spark.read.load("part-m-00000", format="csv", sep=",", inferschema="true", header="false")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Verifying the number of records retrieved in the dataframe.
atm_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- #### We have verified the number of records in the dataframe are as expected i.e. 2468572

## 2. Assigning the correct column names and data types.

In [4]:
# Printing the schema of the dataframe to check the column names.
atm_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

- ### As we can see, the column names are not explaining the values in them. So, we will change the column names and give the meaningfull names according to the data dictionary.

In [5]:
# Changing the column names of the dataframe according to the data dictionary.
atm_data = atm_data.withColumnRenamed("_c0","year").withColumnRenamed("_c1","month").withColumnRenamed("_c2","day").withColumnRenamed("_c3","weekday").withColumnRenamed("_c4","hour").withColumnRenamed("_c5","atm_status").withColumnRenamed("_c6","atm_id").withColumnRenamed("_c7","atm_manufacturer").withColumnRenamed("_c8","atm_location").withColumnRenamed("_c9","atm_streetname").withColumnRenamed("_c10","atm_street_number").withColumnRenamed("_c11","atm_zipcode").withColumnRenamed("_c12","atm_lat").withColumnRenamed("_c13","atm_lon").withColumnRenamed("_c14","currency").withColumnRenamed("_c15","card_type").withColumnRenamed("_c16","transaction_amount").withColumnRenamed("_c17","service").withColumnRenamed("_c18","message_code").withColumnRenamed("_c19","message_text").withColumnRenamed("_c20","weather_lat").withColumnRenamed("_c21","weather_lon").withColumnRenamed("_c22","weather_city_id").withColumnRenamed("_c23","weather_city_name").withColumnRenamed("_c24","temp").withColumnRenamed("_c25","pressure").withColumnRenamed("_c26","humidity").withColumnRenamed("_c27","wind_speed").withColumnRenamed("_c28","wind_deg").withColumnRenamed("_c29","rain_3h").withColumnRenamed("_c30","clouds_all").withColumnRenamed("_c31","weather_id").withColumnRenamed("_c32","weather_main").withColumnRenamed("_c33","weather_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Printing the dataframe schema again to check the column names.
atm_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: integer (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: 

In [7]:
# Checking the data in the dataframe.
atm_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

- ### Datatype for 'atm_id' and 'message_code' is not matching with the datatype given in the data dictionary. So, we will go ahead and change the datatype for them.

In [8]:
# Importing everything from pyspark.sql.types
from pyspark.sql.types import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Creating a new schema 'atm_schema' with the column names and datatypes according to the data dictionary.
atm_schema = StructType([StructField('year', IntegerType(), True),
StructField('month', StringType(), True),
StructField('day', IntegerType(), True),
StructField('weekday', StringType(), True),
StructField('hour', IntegerType(), True),
StructField('atm_status', StringType(), True),
StructField('atm_id', StringType(), True),
StructField('atm_manufacturer', StringType(), True),
StructField('atm_location', StringType(), True),
StructField('atm_streetname', StringType(), True),
StructField('atm_street_number', IntegerType(), True),
StructField('atm_zipcode', IntegerType(), True),
StructField('atm_lat', DoubleType(), True),
StructField('atm_lon', DoubleType(), True),
StructField('currency', StringType(), True),
StructField('card_type', StringType(), True),
StructField('transaction_amount', IntegerType(), True),
StructField('service', StringType(), True),
StructField('message_code', StringType(), True),
StructField('message_text', StringType(), True),
StructField('weather_lat', DoubleType(), True),
StructField('weather_lon', DoubleType(), True),
StructField('weather_city_id', IntegerType(), True),
StructField('weather_city_name', StringType(), True),
StructField('temp', DoubleType(), True),
StructField('pressure', IntegerType(), True),
StructField('humidity', IntegerType(), True),
StructField('wind_speed', IntegerType(), True),
StructField('wind_deg', IntegerType(), True),
StructField('rain_3h', DoubleType(), True),
StructField('clouds_all', IntegerType(), True),
StructField('weather_id', IntegerType(), True),
StructField('weather_main', StringType(), True),
StructField('weather_description', StringType(), True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Creating a new dataframe 'atm_data1' using the newly created schema.
atm_data1 = spark.read.load("part-m-00000", format="csv", sep=",", schema=atm_schema, header="false")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Printing the schema to check the column names and datatypes.
atm_data1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [12]:
# Checking the number of records in the dataframe.
atm_data1.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### We have the dataframe ready with the column names and datatypes according to the data dictionary.
- ### Also, the number of records in the dataframe are as expected i.e. 2468572.

## 3. Creating Dimension tables

### 3.1 DIM_LOCATION

In [13]:
# Creating the DIM_LOCATION dataframe.
DIM_LOCATION = atm_data1.select("atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Changing the column names according to the DIM_LOCATION schema.
DIM_LOCATION = DIM_LOCATION.withColumnRenamed("atm_location","location").withColumnRenamed("atm_streetname","streetname").withColumnRenamed("atm_street_number","street_number").withColumnRenamed("atm_zipcode","zipcode").withColumnRenamed("atm_lat","lat").withColumnRenamed("atm_lon","lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Printing the schema to check the column names and datatypes.
DIM_LOCATION.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [16]:
# Checking the number of records in the DIM_LOCATION dataframe.
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 3.1.1 DIM_LOCATION contains lot of duplicate records. So, we will go ahead and drop the duplicate records.

In [17]:
# Dropping the duplicate records from DIM_LOCATION dataframe.
DIM_LOCATION = DIM_LOCATION.dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Verifying the number of records again in DIM_LOCATION dataframe.
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

- ### Number of records in DIM_LOCATION is 109 as given in the validation document.

### 3.1.2  Adding the "location_id" column (Primary key) to DIM_LOCATION dataframe.

In [19]:
# Importing the required functions.
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Adding the 'location_id' column.
DIM_LOCATION = DIM_LOCATION.select(F.row_number().over(Window.partitionBy().orderBy(DIM_LOCATION['location'])).alias("location_id"),"location","streetname","street_number","zipcode","lat","lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Verifying the number of records again in DIM_LOCATION dataframe.
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [22]:
# Printing the top 20 rows from DIM_LOCATION dataframe.
DIM_LOCATION.show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          1|             Aabybro|      ÃƒËœstergade|            6|   9440|57.162|  9.73|
|          2|      Aalborg Hallen|      Europa Plads|            4|   9000|57.044| 9.913|
|          3|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          4|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          5|         Aalborg Syd|          Hobrovej|          440|   9200|57.005| 9.881|
|          6|           AalbÃƒÂ¦k|        Centralvej|            5|   9982|57.593|10.412|
|          7|              Aarhus|        Ceres Byen|           75|   8000|56.157|10.194|
|          8|              Aarhus|    SÃƒÂ¸nder Alle|           11|   8000|56.153|10.206|
|         

- ### 'DIM_LOCATION' is ready as per the schema provided.

### 3.2 DIM_CARD_TYPE

In [23]:
# Creating the DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE = atm_data1.select("card_type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Printing the schema to check the column names and datatypes.
DIM_CARD_TYPE.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type: string (nullable = true)

In [25]:
# Checking the number of records in the DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 3.2.1  DIM_CARD_TYPE contains lot of duplicate records. So, we will go ahead and drop the duplicate records.

In [26]:
# Dropping the duplicate records from DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE = DIM_CARD_TYPE.dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Verifying the number of records in the DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

- ### Number of records in DIM_CARD_TYPE is 12 as given in the validation document.

### 3.2.2  Adding the "card_type_id" column (Primary key) to DIM_CARD_TYPE dataframe.

In [28]:
# Adding the 'card_type_id' column.
DIM_CARD_TYPE = DIM_CARD_TYPE.select(F.row_number().over(Window.partitionBy().orderBy(DIM_CARD_TYPE['card_type'])).alias("card_type_id"),"card_type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Verifying the number of records in the DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [30]:
# Printing all the rows from DIM_CARD_TYPE dataframe.
DIM_CARD_TYPE.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|         HÃƒÂ¦vekort|
|           5| HÃƒÂ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+

- ### 'DIM_CARD_TYPE' is ready as per the schema provided.

### 3.3 DIM_DATE

In [31]:
# Creating the DIM_DATE dataframe.
DIM_DATE = atm_data1.select("year","month","day","hour","weekday")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# Printing the schema to check the column names and datatypes.
DIM_DATE.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

In [33]:
# Checking the number of records in the DIM_DATE dataframe.
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 3.3.1  DIM_DATE contains lot of duplicate records. So, we will go ahead and drop the duplicate records.

In [34]:
# Dropping the duplicate records from DIM_DATE dataframe.
DIM_DATE = DIM_DATE.dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Verifying the number of records in the DIM_DATE dataframe.
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [36]:
# Printing first 5 rows for reference.
DIM_DATE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+--------+
|year|  month|day|hour| weekday|
+----+-------+---+----+--------+
|2017|January|  5|  21|Thursday|
|2017|January| 22|  15|  Sunday|
|2017|  April|  7|   9|  Friday|
|2017|January| 23|  18|  Monday|
|2017|  March| 17|   1|  Friday|
+----+-------+---+----+--------+
only showing top 5 rows

- ### Number of records in DIM_DATE is 8685 as given in the validation document.

### 3.3.2  Adding the full_date_time column to the DIM_DATE dataframe.

In [37]:
# Importing the required sql functions
from pyspark.sql.functions import concat,col
from pyspark.sql.functions import concat_ws,col

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# Adding the 'full_date_time' column to the DIM_DATE dataframe.
DIM_DATE = DIM_DATE.select(concat_ws('/',"year","month","day","hour").alias("full_date_time"),"year","month","day","hour","weekday")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# Printing the schema to check the datatype.
DIM_DATE.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- full_date_time: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

### 3.3.3  Data type of 'full_date_time' column is string. Now, we will change the datatype of 'full_date_time' column to TIMESTAMP.

In [40]:
# Printing first 5 rows for reference.
DIM_DATE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----+-------+---+----+--------+
|    full_date_time|year|  month|day|hour| weekday|
+------------------+----+-------+---+----+--------+
| 2017/January/5/21|2017|January|  5|  21|Thursday|
|2017/January/22/15|2017|January| 22|  15|  Sunday|
|    2017/April/7/9|2017|  April|  7|   9|  Friday|
|2017/January/23/18|2017|January| 23|  18|  Monday|
|   2017/March/17/1|2017|  March| 17|   1|  Friday|
+------------------+----+-------+---+----+--------+
only showing top 5 rows

In [41]:
# Importing everything from pyspark.sql.functions
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# Making a pattern for timestamp and converting the 'full_date_time' column to TIMESTAMP datatype.
pattern1 = 'yyyy/MMM/dd/HH'
DIM_DATE = DIM_DATE.withColumn('full_date_time', unix_timestamp(DIM_DATE['full_date_time'], pattern1).cast('timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# Verifying the datatype change.
DIM_DATE.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

In [44]:
# Printing first 5 rows for reference.
DIM_DATE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----+-------+---+----+--------+
|     full_date_time|year|  month|day|hour| weekday|
+-------------------+----+-------+---+----+--------+
|2017-01-05 21:00:00|2017|January|  5|  21|Thursday|
|2017-01-22 15:00:00|2017|January| 22|  15|  Sunday|
|2017-04-07 09:00:00|2017|  April|  7|   9|  Friday|
|2017-01-23 18:00:00|2017|January| 23|  18|  Monday|
|2017-03-17 01:00:00|2017|  March| 17|   1|  Friday|
+-------------------+----+-------+---+----+--------+
only showing top 5 rows

- ### Datatype for 'full_date_time' column is successfully changed to TIMESTAMP.

### 3.3.4  Adding the "date_id" column (Primary key) to DIM_DATE dataframe.

In [45]:
# Adding the 'date_id' column.
DIM_DATE = DIM_DATE.select(F.row_number().over(Window.partitionBy().orderBy(DIM_DATE['year'])).alias("date_id"),"full_date_time","year","month","day","hour","weekday")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Verifying the number of records in the DIM_DATE dataframe.
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [47]:
# Printing the first 10 rows from DIM_DATE dataframe.
DIM_DATE.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+---------+
|date_id|     full_date_time|year|  month|day|hour|  weekday|
+-------+-------------------+----+-------+---+----+---------+
|      1|2017-01-05 21:00:00|2017|January|  5|  21| Thursday|
|      2|2017-01-22 15:00:00|2017|January| 22|  15|   Sunday|
|      3|2017-04-07 09:00:00|2017|  April|  7|   9|   Friday|
|      4|2017-01-23 18:00:00|2017|January| 23|  18|   Monday|
|      5|2017-03-17 01:00:00|2017|  March| 17|   1|   Friday|
|      6|2017-03-27 13:00:00|2017|  March| 27|  13|   Monday|
|      7|2017-03-29 07:00:00|2017|  March| 29|   7|Wednesday|
|      8|2017-03-04 08:00:00|2017|  March|  4|   8| Saturday|
|      9|2017-03-12 12:00:00|2017|  March| 12|  12|   Sunday|
|     10|2017-01-04 04:00:00|2017|January|  4|   4|Wednesday|
+-------+-------------------+----+-------+---+----+---------+
only showing top 10 rows

- ### 'DIM_DATE' is ready as per the schema provided.

### 3.4 DIM_ATM

In [48]:
# Creating the DIM_ATM dataframe.
DIM_ATM = atm_data1.select("atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Printing the schema to check the column names and datatypes.
DIM_ATM.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)

### 3.4.1  Renaming the 'atm_id' column to 'atm_number'.

In [50]:
# Renaming the 'atm_id' column to 'atm_number'.
DIM_ATM = DIM_ATM.withColumnRenamed("atm_id","atm_number")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Printing the schema to check the column names and datatypes.
DIM_ATM.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)

In [52]:
# Checking the number of records in the DIM_ATM dataframe.
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 3.4.2  DIM_ATM contains lot of duplicate records. So, we will go ahead and drop the duplicate records.

In [53]:
# Dropping the duplicate records from DIM_ATM dataframe.
DIM_ATM = DIM_ATM.dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# Verifying the number of records in the DIM_ATM dataframe.
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [55]:
# Printing first 5 rows for reference.
DIM_ATM.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+
|atm_number|atm_manufacturer|        atm_location|   atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+----------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+
|        59| Diebold Nixdorf|NykÃƒÂ¸bing Mors ...|      Kirketorvet|                1|       7900| 56.795|   8.86|
|        41| Diebold Nixdorf|              Skagen|Sct. Laurentiivej|               36|       9990| 57.723|  10.59|
|        40| Diebold Nixdorf|       Frederikshavn|     Danmarksgade|               48|       9900| 57.441| 10.537|
|        87|             NCR|Aalborg Storcente...|         Hobrovej|              452|       9200| 57.005|  9.876|
|        31|             NCR|            Slagelse|  Mariendals Alle|               29|       4200| 55.398| 11.342|
+----------+----------------+--------------------+-----------------+------------

- ### Number of records in DIM_ATM is 113 as given in the validation document.

### 3.4.3  Adding the 'atm_location_id' column (Foreign key, referencing 'location_id' from 'DIM_LOCATION') to the DIM_ATM dataframe.

In [56]:
# Creating 'location_copy' dataframe as a copy of 'DIM_LOCATION' dataframe.
location_copy = DIM_LOCATION

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
# Printing first 5 rows for reference.
location_copy.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+-------------+-------+------+-----+
|location_id|            location|  streetname|street_number|zipcode|   lat|  lon|
+-----------+--------------------+------------+-------------+-------+------+-----+
|          1|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|          2|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|          3|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          4|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          5|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows

In [58]:
# Renaming the columns with appropriate column names for join with 'DIM_ATM' dataframe.
location_copy = location_copy.withColumnRenamed("location_id","atm_location_id").withColumnRenamed("location","atm_location").withColumnRenamed("streetname","atm_streetname").withColumnRenamed("street_number","atm_street_number").withColumnRenamed("zipcode","atm_zipcode").withColumnRenamed("lat","atm_lat").withColumnRenamed("lon","atm_lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# Printing first 5 rows for reference.
location_copy.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------+-----------------+-----------+-------+-------+
|atm_location_id|        atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+---------------+--------------------+--------------+-----------------+-----------+-------+-------+
|              1|             Aabybro|  ÃƒËœstergade|                6|       9440| 57.162|   9.73|
|              2|      Aalborg Hallen|  Europa Plads|                4|       9000| 57.044|  9.913|
|              3|Aalborg Storcente...|      Hobrovej|              452|       9200| 57.005|  9.876|
|              4|Aalborg Storcente...|      Hobrovej|              452|       9200| 57.005|  9.876|
|              5|         Aalborg Syd|      Hobrovej|              440|       9200| 57.005|  9.881|
+---------------+--------------------+--------------+-----------------+-----------+-------+-------+
only showing top 5 rows

In [60]:
# Performing the join operation on 'DIM_ATM' and 'location_copy' dataframes and saving in 'ATM_JOINED' dataframe.
ATM_JOINED = location_copy.join(DIM_ATM, on=['atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
# Checking the number of records in 'ATM_JOINED' dataframe.
ATM_JOINED.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [62]:
# Selecting the required columns from 'ATM_JOINED' dataframe as per the schema provided for 'DIM_ATM' dataframe.
DIM_ATM = ATM_JOINED.select("atm_number","atm_manufacturer",'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
# Checking the schema for 'DIM_ATM'.
DIM_ATM.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

In [64]:
# Checking the number of records in 'DIM_ATM' dataframe.
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

- ### Number of records in DIM_ATM is 113 as given in the validation document.

### 3.4.4  Adding the "atm_id" column (Primary key) to DIM_ATM dataframe.

In [65]:
# Adding the 'atm_id' column.
DIM_ATM = DIM_ATM.select(F.row_number().over(Window.partitionBy().orderBy(DIM_ATM['atm_number'])).alias("atm_id"),"atm_number","atm_manufacturer","atm_location_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
# Verifying the number of records in the 'DIM_ATM' dataframe.
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [67]:
# Printing the first 10 rows from 'DIM_ATM' dataframe.
DIM_ATM.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             75|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       101|             NCR|             17|
|     5|       102|             NCR|              3|
|     6|       103| Diebold Nixdorf|            103|
|     7|       104|             NCR|             58|
|     8|       105| Diebold Nixdorf|             76|
|     9|       106|             NCR|             55|
|    10|       107| Diebold Nixdorf|             62|
+------+----------+----------------+---------------+
only showing top 10 rows

- ### 'DIM_ATM' is ready as per the schema provided.

## 4.  Creating Fact table

### 4.1 FACT_ATM_TRANS

In [68]:
# Creating the 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS = atm_data1.select("atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","atm_id","atm_manufacturer","year","month","day","hour","weekday","card_type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
# Printing the schema to check the column names and datatypes.
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string

In [70]:
# Renaming the 'atm_id' column to 'atm_number'.
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumnRenamed("atm_id","atm_number")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
# Printing the schema to check the column names and datatypes.
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: st

In [72]:
# Checking the number of records in the 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

### 4.1.1  Adding the Foreign keys ('weather_loc_id', 'atm_id', 'date_id' and 'card_type_id') to the 'FACT_ATM_TRANS' dataframe.

### 4.1.1.1 Adding the 'weather_loc_id' column to 'FACT_ATM_TRANS' dataframe by using join operation on 'DIM_LOCATION' dataframe.

In [73]:
# Selecting only the required columns from 'location_copy1' dataframe which is the copy of 'DIM_LOCATION' dataframe.
location_copy1 = DIM_LOCATION

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
location_copy1 = location_copy1.withColumnRenamed("location","atm_location").withColumnRenamed("streetname","atm_streetname").withColumnRenamed("street_number","atm_street_number").withColumnRenamed("zipcode","atm_zipcode").withColumnRenamed("lat","atm_lat").withColumnRenamed("lon","atm_lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
location_copy1.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [76]:
# Performing the join operation on 'FACT_ATM_TRANS' and 'location_copy1' dataframes.
FACT_ATM_TRANS = FACT_ATM_TRANS.join(location_copy1, on=['atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
# Checking the number of records in 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

### 4.1.1.2 Adding the 'atm_id' column to 'FACT_ATM_TRANS' dataframe by using join operation on 'DIM_ATM' dataframe.

In [78]:
# Performing the join operation on 'FACT_ATM_TRANS' and 'DIM_ATM' dataframes.
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_ATM, on=['atm_number','atm_manufacturer'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
# Checking the number of records in 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

### 4.1.1.3 Adding the 'date_id' column to 'FACT_ATM_TRANS' dataframe by using join operation on 'DIM_DATE' dataframe.

In [80]:
# Performing the join operation on 'FACT_ATM_TRANS' and 'DIM_DATE' dataframes.
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_DATE, on=['year','month','day','hour','weekday'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
# Checking the number of records in 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

### 4.1.1.4 Adding the 'card_type_id' column to 'FACT_ATM_TRANS' dataframe by using join operation on 'DIM_CARD_TYPE' dataframe.

In [82]:
# Performing the join operation on 'FACT_ATM_TRANS' and 'DIM_CARD_TYPE' dataframes.
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_CARD_TYPE, on=['card_type'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
# Checking the number of records in 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

### 4.1.2  Removing the unwanted columns from  'FACT_ATM_TRANS' dataframe according to the schema provided.

In [84]:
# Printing the schema to check the column names.
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (null

In [85]:
# Selecting only the columns which are required according to the schema for 'FACT_ATM_TRANS'.
FACT_ATM_TRANS = FACT_ATM_TRANS.select("atm_id","location_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
# Renaming the 'location_id' to 'weather_loc_id'.
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumnRenamed("location_id","weather_loc_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
# Printing the schema to verify the column names.
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

### 4.1.3  Adding the "trans_id" column (Primary key) to 'FACT_ATM_TRANS' dataframe.

In [88]:
# Adding the 'trans_id' column.
FACT_ATM_TRANS = FACT_ATM_TRANS.select(F.row_number().over(Window.partitionBy().orderBy(FACT_ATM_TRANS['card_type_id'])).alias("trans_id"),"atm_id","weather_loc_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
# Verifying the number of records in 'FACT_ATM_TRANS' dataframe.
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [90]:
# Printing the schema to verify the column names and datatype.
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [91]:
# Printing the first 5 rows for reference.
FACT_ATM_TRANS.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|    29|            79|   7593|           1|    Active|     DKK|Withdrawal|              6303|        null|        null|    0.0|         0|       800|       Clear|        Sky is Clear|
|       2|    29|            79|   7593|           1|    Active|     DKK|Withdrawal|              7624|        null|        null|    0.0|         0|       800|       Clear|        Sky is Clear|
|       3|    54|            1

- ### Number of records in 'FACT_ATM_TRANS' is 2468572 as given in the validation document.

- ### All the Foreign keys and Primary keys are added to the 'FACT_ATM_TRANS' dataframe.

- ### All the dimension and fact dataframes/tables are created and are ready to load into Amazon S3.

## 5.  Loading the dataframes to Amazon S3.

- ### Created 'etl-project-saish' bucket in S3. Now, we will load the dataframes/tables to 'etl-project-saish' bucket in the S3.

### 5.1 Loading the 'DIM_LOCATION' dataframe to the S3 bucket.

In [92]:
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project-saish/DIM_LOCATION',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.2 Loading the 'DIM_ATM' dataframe to the S3 bucket.

In [93]:
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project-saish/DIM_ATM',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.3 Loading the 'DIM_DATE' dataframe to the S3 bucket.

In [94]:
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project-saish/DIM_DATE',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.4 Loading the 'DIM_CARD_TYPE' dataframe to the S3 bucket.

In [95]:
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project-saish/DIM_CARD_TYPE',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.5 Loading the 'FACT_ATM_TRANS' dataframe to the S3 bucket.

In [96]:
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3://etl-project-saish/FACT_ATM_TRANS',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- ### All the dataframes/tables are loaded to the Amazon S3 bucket.