# Importing required libraries and setting up SparkSession

In [1]:
# Importing Libraries

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/usr/lib/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1712144607327_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Setting up SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('jupyter_Spark').master("local").getOrCreate()
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f73b714d2b0>

# Loading Data into Spark

### Reading Data from HDFS with Default Schema to Ensure Successful Loading

In [3]:
# Storing data from HDFS in 'tempdf' Dataframe

tempdf = spark.read.csv("/user/hadoop/ETL/ATMDATA/part-m-00000", header = False, inferSchema = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Reading 1 row of 'tempdf'

tempdf.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|NULL|NULL|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

### Generating a Custom Input Schema with StructType and Importing the Data

In [5]:
# Importing DataTypes for Schema

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# creating Schema

Schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Reading data with the schema

tempdf = spark.read.format("csv")\
                .option("header", "false")\
                .option("inferSchema", "false")\
                .schema(Schema)\
                .option("mode", "FAILFAST")\
                .load("/user/hadoop/ETL/ATMDATA/part-m-00000")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
tempdf.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

In [9]:
# Checking the number of records in the Dataframe

tempdf.select('*').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [10]:
# Checking schema and datatypes of dataframe

tempdf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

###### Observation:
Based on the above results, we observe the following:
- The record count from the Spark dataframe aligns with the source data in the MySQL table (2468572).
- All attributes contain the necessary datatypes according to the data dictionary provided.

Now, let's divide this single dataframe into dimension and fact tables/files and establish appropriate relationships between dimensions and fact data.

# Creating Dimension and Fact Tables

### Location Dimension

###### Generating a DataFrame for the Location Dimension Based on the Target Dimension Model

In [11]:
# Creating a temporary DataFrame, selecting necessary columns, and ensuring distinct records

location = tempdf.select("atm_location", "atm_streetname", "atm_street_number", "atm_zipcode", "atm_lat", "atm_lon").distinct()
location.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [12]:
# Importing required pyspark.sql functions

from pyspark.sql.window import Window
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# creating the primary key column 'location_id'

dim_location = location.select(row_number().over(Window.orderBy(location[0])).alias("location_id"), "*")
dim_location.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+--------------+-----------------+-----------+-------+-------+
|location_id|        atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+--------------------+--------------+-----------------+-----------+-------+-------+
|          1|             Aabybro|  ÃƒËœstergade|                6|       9440| 57.162|   9.73|
|          2|      Aalborg Hallen|  Europa Plads|                4|       9000| 57.044|  9.913|
|          3|Aalborg Storcente...|      Hobrovej|              452|       9200| 57.005|  9.876|
|          4|Aalborg Storcente...|      Hobrovej|              452|       9200| 57.005|  9.876|
|          5|         Aalborg Syd|      Hobrovej|              440|       9200| 57.005|  9.881|
+-----------+--------------------+--------------+-----------------+-----------+-------+-------+
only showing top 5 rows

In [14]:
# renaming the columns as per requirement
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location')\
                            .withColumnRenamed('atm_streetname','streetname')\
                            .withColumnRenamed('atm_street_number','street_number')\
                            .withColumnRenamed('atm_zipcode','zipcode')\
                            .withColumnRenamed('atm_lat','lat')\
                            .withColumnRenamed('atm_lon','lon')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Ensuring that all required columns are present and named correctly
DIM_LOCATION.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon']

In [16]:
# Verifying the Count of Records in the DataFrame
DIM_LOCATION.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

###### Observation:
- The creation of location dimension data has been completed successfully, ensuring unique records with unique keys assigned to each location.
- The dataset contains a total of 109 locations.

### ATM Dimension

###### Generating a DataFrame for the ATM Dimension Based on the Target Dimension Model

In [17]:
# Creating  a temporary DataFrame and selecting necessary columns
atm = tempdf.select('atm_id','atm_manufacturer', 'atm_location', 'atm_streetname','atm_street_number','atm_zipcode','atm_lat', 'atm_lon').distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Creating temporary dataframes and joining the dim_location and atm dataframes

df1 = tempdf.select('atm_id','atm_manufacturer', 'atm_location', 'atm_streetname','atm_street_number','atm_zipcode','atm_lat', 'atm_lon').distinct()
df2 = DIM_LOCATION
df1 = df1.alias('df1')
df2 = df2.alias('df2')

atm = df1.join(df2, (df1['atm_location'] == df2['location']) & 
                        (df1['atm_streetname'] == df2['streetname']) & 
                        (df1['atm_street_number'] == df2['street_number']) & 
                        (df1['atm_zipcode'] == df2['zipcode']) & 
                        (df1['atm_lat'] == df2['lat']) & 
                        (df1['atm_lon'] == df2['lon'])).select('df1.atm_id', 'df1.atm_manufacturer', 'df1.atm_location', 'df2.location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Renaming columns as required
atm = atm.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Selecting the necessary columns
atm = atm.select('atm_number', 'atm_manufacturer', 'atm_location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Creating the primary key column

df_temp = atm.rdd.zipWithIndex().toDF()
dim_atm = df_temp.select(col("_1.*"),col("_2").alias('atm_id'))
dim_atm.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+---------------+------+
|atm_number|atm_manufacturer|atm_location_id|atm_id|
+----------+----------------+---------------+------+
|        45|             NCR|             11|     0|
|       109| Diebold Nixdorf|              5|     1|
|        14|             NCR|             46|     2|
|        73|             NCR|             44|     3|
|       113| Diebold Nixdorf|             89|     4|
+----------+----------------+---------------+------+
only showing top 5 rows

In [22]:
# Rearranging the columns according to the target model

DIM_ATM = dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Confirming that all required columns are present and named correctly
DIM_ATM.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [24]:
# Checking the dataframe
DIM_ATM.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     0|        45|             NCR|             11|
|     1|       109| Diebold Nixdorf|              5|
|     2|        14|             NCR|             46|
|     3|        73|             NCR|             44|
|     4|       113| Diebold Nixdorf|             89|
+------+----------+----------------+---------------+
only showing top 5 rows

In [25]:
# Verifying the Count of Records in the DataFrame
DIM_ATM.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

###### Observation:
 - The creation of ATM dimension data has been successfully accomplished, ensuring unique records with unique keys assigned to each type.
 - The ATM dimension dataset comprises a total of 113 records.

### Date Dimension

###### Generating a DataFrame for the Date Dimension Based on the Target Dimension Model

In [26]:
# creating a temporary dataframe and selecting required columns
date = tempdf.select('year', 'month', 'day', 'hour', 'weekday')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Creating a 'full_date_time' column
date = date.withColumn('full_date_time', to_timestamp(concat(date.year, lit('-'), date.month, lit('-'),date.day, lit(' '), date.hour), 'yyyy-MMMM-d H'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
date.show(5, truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+-------+-------------------+
|year|month  |day|hour|weekday|full_date_time     |
+----+-------+---+----+-------+-------------------+
|2017|January|1  |0   |Sunday |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-01-01 00:00:00|
+----+-------+---+----+-------+-------------------+
only showing top 5 rows

In [29]:
# Selecting the necessary columns and ensuring uniqueness of records
date = date.select('full_date_time', 'year', 'month', 'day', 'hour', 'weekday').distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Creating primary key column and creating final dimension
DIM_DATE = date.select(row_number().over(Window.orderBy(date[0])).alias('date_id'), '*')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# Verifying the Presence and Correct Naming of All Required Columns
DIM_DATE.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday']

In [32]:
# Checking the dataframe
DIM_DATE.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
+-------+-------------------+----+-------+---+----+-------+
only showing top 5 rows

In [33]:
# Verifying the Count of Records in the DataFrame
DIM_DATE.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

###### Observation:
- The creation of date dimension data has been completed successfully, ensuring unique records with unique keys assigned for each type.
- The date dimension dataset comprises a total of 8685 records.

### Card Type Dimension

###### Generating a DataFrame for the Card Type Dimension Based on the Target Dimension Model

In [34]:
# Creating a temporary DataFrame, selecting necessary columns, and ensuring distinct records.
card_type = tempdf.select('card_type').distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Creating the primary key column
DIM_CARD_TYPE = card_type.select(row_number().over(Window.orderBy(card_type[0])).alias("card_type_id"), "*")
DIM_CARD_TYPE.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+
|card_type_id|          card_type|
+------------+-------------------+
|           1|             CIRRUS|
|           2|            Dankort|
|           3|    Dankort - on-us|
|           4|        HÃƒÂ¦vekort|
|           5|HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows

In [36]:
# Validating the presence and correct naming of all required columns.
DIM_CARD_TYPE.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['card_type_id', 'card_type']

In [37]:
# Verifying the Count of Records in the DataFrame
DIM_CARD_TYPE.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

###### Observation:
- The creation of card type dimension data has been completed successfully, ensuring unique records with unique keys assigned to each type.
- The dataset contains a total of 12 card types.

### Transaction Fact table

###### Generating a DataFrame for the Transaction Fact table Based on the Target Model

###### Step 1: Joining the original DataFrame with DIM_LOC to create FACT_ATM_TRANS.

In [38]:
# Renaming columns as required
fact_loc = tempdf.withColumnRenamed('atm_location','location')\
    .withColumnRenamed('atm_streetname','streetname')\
    .withColumnRenamed('atm_street_number','street_number')\
    .withColumnRenamed('atm_zipcode','zipcode')\
    .withColumnRenamed('atm_lat','lat')\
    .withColumnRenamed('atm_lon','lon')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# Joining original dataframe with DIM_LOCTION
fact_loc = fact_loc.join(DIM_LOCATION, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Viewing the columns
fact_loc.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'year', 'month', 'day', 'weekday', 'hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'location_id']

###### Step 2: Joining the DataFrame with DIM_ATM.

In [41]:
# Renaming columns as required
fact_loc = fact_loc.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# Joining the dataframe with DIM_ATM
fact_atm = fact_loc.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# Applying the same transformations as those performed on the ATM table.
fact_atm = fact_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
# Viewing the columns
fact_atm.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_number', 'atm_manufacturer', 'weather_loc_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'year', 'month', 'day', 'weekday', 'hour', 'atm_status', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id']

###### Step 3: Joining the DataFrame with DIM_DATE.

In [45]:
# Joining the dataframe with DIM_DATE
fact_date = fact_atm.join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Viewing the columns
fact_date.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'month', 'day', 'hour', 'weekday', 'atm_number', 'atm_manufacturer', 'weather_loc_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_status', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id', 'date_id', 'full_date_time']

###### Step 4: Joining the DataFrame with DIM_CARD_TYPE.

In [47]:
# Joining the dataframe with DIM_CARD_TYPE
fact_atm_trans = fact_date.join(DIM_CARD_TYPE, on = ['card_type'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###### Creating the final fact table

In [48]:
# Creating primary key of fact table
FACT_ATM_TRANS = fact_atm_trans.withColumn("trans_id", row_number().over(Window.orderBy('date_id')))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Viewing the list of columns
FACT_ATM_TRANS.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['card_type', 'year', 'month', 'day', 'hour', 'weekday', 'atm_number', 'atm_manufacturer', 'weather_loc_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_status', 'currency', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id', 'date_id', 'full_date_time', 'card_type_id', 'trans_id']

In [50]:
# Selecting and arranging only the necessary columns based on the target model.
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 
'clouds_all', 'weather_id', 'weather_main', 'weather_description')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Viewing and verifying the columns
FACT_ATM_TRANS.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

In [52]:
# Checking the dataframe
FACT_ATM_TRANS.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|     2|            46|      1|          11|  Inactive|     DKK|Withdrawal|              1133|        NULL|        NULL|    0.0|        92|       300|     Drizzle|light intensity d...|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
only showing top 1 row

In [53]:
# Verifying the Count of Records in the DataFrame
FACT_ATM_TRANS.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

###### Observation:
- The creation of ATM transaction fact table data has been successfully completed, with established primary-foreign key relationships with dimension tables.
- The fact dataset contains a total of 2468572 records.

# Saving the PySpark Dataframes to AWS S3 Storage in CSV Format

In [54]:
# Defining function for moving dataframes to S3
def SaveDF(table_name,file_name):
    table_name.write.format('csv').option('header','false').save(f's3://etl-project-bucket/{file_name}', mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
# Saving all the dimention and fact table created to S3
SaveDF(DIM_LOCATION,'dim_location')
SaveDF(DIM_ATM,'dim_atm')
SaveDF(DIM_DATE,'dim_date')
SaveDF(DIM_CARD_TYPE,'dim_card_type')
SaveDF(FACT_ATM_TRANS,'fact_atm_trans')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…