### PySpark ETL Code 
for to read data from HDFS, transform and create dimension and fact table to tore in S3 Bucket
Steps followed:
- Importing necessary libraries for Spark session and functions
- Read the raw file to form a data frame from HDFS path where sqoop data ingestion generated the file
- Created a schema using StructType (as per required target schema) 
- Read the file again using schema created. Validation for the count of data loaded.
- All dimension dataframes are cleaned and transformed as per target dimesion model
- Data frame for the fact table were enreached with id columns as reference to all the dimension tables (they are performed in stages)
- Fact table fields are cleaned and transformed as per target data model
- Loaded all dimension and fact table data into S3 bucket for further consumption by redshift for data analysis / analytical queries.


In [1]:
#Importing necessary libraries for Spark session and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

#Libraries needed for creating primary key columns with row number
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1683435041541_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
#Creating Spark Context
appName = "Spar Nord Bank ATM Tranform"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Loading data initially to verify if the file has proper data (sanity check)

In [3]:
#Reading the data from the files in HDFS by a specific schema using PySpark
## Using default schema first
FileNamePath = "/user/root/spar_nord_bank_atm/part-m-00000"
df = spark.read.csv(FileNamePath,header = False, inferSchema = True )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
#Checking data from default schema to check if data is loaded
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

##### Creating StructType schema 

In [5]:
#Creating custom schema using the StructType class of PySpark (This will avoid data type mismatch)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#Defining schema as per RDS Data 
Schema = StructType([StructField('year', IntegerType(), nullable = False),
                        StructField('month', StringType(), False),
                        StructField('day', IntegerType(), False),
                        StructField('weekday', StringType(), False),
                        StructField('hour', IntegerType(), False),
                        StructField('atm_status', StringType(), False),
                        StructField('atm_id', StringType(), False),  
                        StructField('atm_manufacturer', StringType(), False),
                        StructField('atm_location', StringType(), False),
                        StructField('atm_streetname', StringType(), False),
                        StructField('atm_street_number', IntegerType(), False),
                        StructField('atm_zipcode', IntegerType(), False),
                        StructField('atm_lat', DoubleType(), False),
                        StructField('atm_lon', DoubleType(), False),
                        StructField('currency', StringType(), False),
                        StructField('card_type', StringType(), False),
                        StructField('transaction_amount', IntegerType(), False),
                        StructField('service', StringType(), False),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), False),
                        StructField('weather_lon', DoubleType(), False),
                        StructField('weather_city_id', IntegerType(), False),
                        StructField('weather_city_name', StringType(), False),
                        StructField('temp', DoubleType(), False),
                        StructField('pressure', IntegerType(), False),
                        StructField('humidity', IntegerType(), False),
                        StructField('wind_speed', IntegerType(), False),
                        StructField('wind_deg', IntegerType(), False),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), False),
                        StructField('weather_id', IntegerType(), False),
                        StructField('weather_main', StringType(), False),
                        StructField('weather_description', StringType(), False)])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#Creating data frame for the dimension according to the target schema
df = spark.read.csv(FileNamePath, header = False, schema = Schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Validating count after reading all atm information into data frame.

In [8]:
#Validation: Verifying the count of the records loaded into the Dataframe
df.select('*').count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [10]:
#Validation: Manual verification of schema (column names, data types, nullable and etc)
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [11]:
#Sanity Check: Verify data loaded
df.show(1)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

### Transformation: Dataframes for Dimension and Fact tables

#### Creating data frames for the dimension according to the target schema

#### 1. Clean Tranform data for date dimension:

In [12]:
#Load the unique date related fields for date dimension
df_dim_date = df.dropDuplicates(["year","month","day","hour","weekday"]).select("year","month","day","hour","weekday")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#derive full_date_time columns
df_dim_date = df_dim_date.withColumn('full_date', concat_ws('-', df_dim_date.year, df_dim_date.month, df_dim_date.day))

df_dim_date = df_dim_date.withColumn('full_time', concat_ws(':', df_dim_date.hour, lit('00'), lit('00')))
df_dim_date = df_dim_date.withColumn('full_date_time', concat_ws(' ', df_dim_date.full_date, df_dim_date.full_time))

pattern = 'yyyy-MMM-dd HH:mm:ss'
df_dim_date = df_dim_date.withColumn('full_date_time', unix_timestamp(df_dim_date.full_date_time, pattern).cast('timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
#Verify columns in date dimension:
df_dim_date.show(5, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+--------+---------------+---------+-------------------+
|year|month  |day|hour|weekday |full_date      |full_time|full_date_time     |
+----+-------+---+----+--------+---------------+---------+-------------------+
|2017|January|5  |21  |Thursday|2017-January-5 |21:00:00 |2017-01-05 21:00:00|
|2017|January|22 |15  |Sunday  |2017-January-22|15:00:00 |2017-01-22 15:00:00|
|2017|April  |7  |9   |Friday  |2017-April-7   |9:00:00  |2017-04-07 09:00:00|
|2017|January|23 |18  |Monday  |2017-January-23|18:00:00 |2017-01-23 18:00:00|
|2017|March  |17 |1   |Friday  |2017-March-17  |1:00:00  |2017-03-17 01:00:00|
+----+-------+---+----+--------+---------------+---------+-------------------+
only showing top 5 rows

In [15]:
#Creating Primary key for date dimension
df_dim_date_temp = df_dim_date.rdd.zipWithIndex().toDF()
df_dim_date_final = df_dim_date_temp.select(col("_1.*"),col("_2").alias('date_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Rearranging the columns according to the target model
df_dim_date_final = df_dim_date_final.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Validating date dimension table
df_dim_date_final.printSchema() #verify schema of date dimension table
df_dim_date_final.show(1) #sanity check for date dimension table

#Validation: Verifying the count of the records loaded into the Dataframe
df_dim_date_final.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: long (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: long (nullable = true)
 |-- month: string (nullable = true)
 |-- day: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- weekday: string (nullable = true)

+-------+-------------------+----+-------+---+----+--------+
|date_id|     full_date_time|year|  month|day|hour| weekday|
+-------+-------------------+----+-------+---+----+--------+
|      0|2017-01-05 21:00:00|2017|January|  5|  21|Thursday|
+-------+-------------------+----+-------+---+----+--------+
only showing top 1 row

8685

#### 2. Clean Tranform data for location dimension:

In [52]:
#Load the unique location related fields for location dimension
df_dim_location = df.dropDuplicates(["atm_location", "atm_streetname","atm_street_number", "atm_zipcode","atm_lat", "atm_lon"]).select("atm_location", "atm_streetname","atm_street_number", "atm_zipcode","atm_lat", "atm_lon")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
#Creating Primary key for location dimension
df_dim_location_temp = df_dim_location.rdd.zipWithIndex().toDF()
df_dim_location = df_dim_location_temp.select(col("_1.*"),col("_2").alias('atm_location_id'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
#Renaming the columns as per target model
df_dim_location_final = df_dim_location.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
# Rearranging the columns according to the target model
df_dim_location_final = df_dim_location_final.select('atm_location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Validating date dimension table
#verify schema of location dimension table
df_dim_location_final.printSchema() 
#sanity check for location dimension table
df_dim_location_final.show(1) 
# validating the count of the dataframe
df_dim_location_final.select('*').count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_location_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: long (nullable = true)
 |-- zipcode: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

+---------------+--------+----------+-------------+-------+------+-----+
|atm_location_id|location|streetname|street_number|zipcode|   lat|  lon|
+---------------+--------+----------+-------------+-------+------+-----+
|              0| Kolding|  Vejlevej|          135|   6000|55.505|9.457|
+---------------+--------+----------+-------------+-------+------+-----+
only showing top 1 row

109

#### 3. Clean Tranform data for ATM dimension:

In [57]:
#Load the unique ATM related fields for ATM dimension
df_dim_atm = df.dropDuplicates(['atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname','atm_street_number', 'atm_zipcode',  'atm_lat', 'atm_lon']).select('atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
#Get location id from latitude and longitude
df_dim_atm = df_dim_atm.join(df_dim_location, on = ['atm_location', 'atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'], how = "left").select('atm_id', 'atm_manufacturer', 'atm_location_id')
df_dim_atm = df_dim_atm.withColumnRenamed('atm_id', 'atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
#Creating Primary key for location dimension
df_dim_atm_temp = df_dim_atm.rdd.zipWithIndex().toDF()
df_dim_atm = df_dim_atm_temp.select(col("_1.*"),col("_2").alias('atm_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
# rearranging the columns according to the target model
df_dim_atm_final = df_dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
# Validating atm dimension table
#verify schema of atm dimension table
df_dim_atm_final.printSchema() 
#sanity check for atm dimension table
df_dim_atm_final.show(1) 
# validating the count of the dataframe
df_dim_atm_final.select('*').count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: long (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: long (nullable = true)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     0|        59| Diebold Nixdorf|             57|
+------+----------+----------------+---------------+
only showing top 1 row

113

#### 4. Clean Tranform data for Card Type dimension:

In [62]:
#Load the unique card type dimension values
df_dim_card_type = df.dropDuplicates(["card_type"]).select("card_type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
#Creating Primary key for location dimension
df_dim_card_type_temp = df_dim_card_type.rdd.zipWithIndex().toDF()
df_dim_card_type = df_dim_card_type_temp.select(col("_1.*"),col("_2").alias('card_type_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
# rearranging the columns according to the target model
df_dim_card_type_final = df_dim_card_type.select('card_type_id', 'card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
# Validating card type dimension table
#verify schema of card type dimension table
df_dim_card_type_final.printSchema() 
#sanity check for atm dimension table
df_dim_card_type_final.show(1) 
# validating the count of the card_type dataframe
df_dim_card_type_final.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: long (nullable = true)
 |-- card_type: string (nullable = true)

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           0|Visa Dankort - on-us|
+------------+--------------------+
only showing top 1 row

12

### Joining transaction data frame with each dimension table for creating references. 
This is done in stages.

#### 5. Stage 1 - Clean Tranform data for location fact:

#### Creating the Transaction Fact Table according to Target Model

In [66]:
# Renaming location fact for the colums as per requirement
df_fact_loc = df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
# joining the dfs - dimension and fact for location
df_fact_loc = df_fact_loc.join(df_dim_location_final, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")
df_fact_loc = df_fact_loc.withColumnRenamed('atm_id','atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 6. Stage 2 - Clean Tranform data for atm fact:

In [68]:
# joining the dfs - dimension and fact for atm
df_fact_atm = df_fact_loc.join(df_dim_atm_final, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
# performing necessary transformations, for target data model
df_fact_atm = df_fact_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 7.Stage 3:  Clean Tranform data for date fact:


In [70]:
# joining the dfs - dimension and fact for date
df_fact_date = df_fact_atm.join(df_dim_date_final, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 8. Stage 4: Clean Tranform data for card type fact:

In [71]:
# joining the dfs - dimension and fact for card type
df_fact_card_type = df_fact_date.join(df_dim_card_type_final, on = ['card_type'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Validation of data frame for transaction fact 

In [72]:
# Validating final fact table
#verify schema of card type fact table
df_fact_card_type.printSchema() 
#sanity check for date dimension table
df_fact_card_type.show(1) 
# validating the count of the date dataframe
df_fact_card_type.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- weather_loc_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |

##### PK Creation for transaction fact data frame

In [73]:
# creating primary key of fact table 
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
df_fact_trans = df_fact_card_type.withColumn("trans_id", row_number().over(w))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Cleanup and keep the fields as per target data model

In [74]:
# Arranging the required columns according to the target model
df_fact_trans = df_fact_trans.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id','atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h','clouds_all', 'weather_id', 'weather_main', 'weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
# Validating name of attributes
df_fact_trans.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

In [76]:
# Validating count of fact table if they are intact
df_fact_trans.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [79]:
#Renamed atm location id as per required schema definition
df_dim_location_final = df_dim_location_final.withColumnRenamed('atm_location_id','location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
#Validate name of location dimension columns
df_dim_location_final.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon']

##### Final Step to write data frames into the CSV files in Upgrad S3 bucket created.
This will be useful for data analysis using redshift

In [161]:
df_dim_location_final.coalesce(1).write.format('csv').option('header','false').save('s3://upgradbucket/dim_location', mode='overwrite')
df_dim_atm_final.coalesce(1).write.format('csv').option('header','false').save('s3://upgradbucket/dim_atm', mode='overwrite')
df_dim_date_final.coalesce(1).write.format('csv').option('header','false').save('s3://upgradbucket/dim_date', mode='overwrite')
df_dim_card_type_final.coalesce(1).write.format('csv').option('header','false').save('s3://upgradbucket/dim_card_type', mode='overwrite')
df_fact_trans.coalesce(1).write.format('csv').option('header','false').save('s3://upgradbucket/dim_fact_trans', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###### Note: Once CSV file creation complete, following steps performed for completion of ETL project
    - Validated the files in S3 bucket.
    - Redshift cluster is created
    - Schema defined in redshift cluster database
    - Schema for dimension and fact table created 
    - Data Loaded on to dimension and fact tables
    - Data analytic query performed for derving insights
    