### Abhilasha - ETL Project - SPAR NORD Bank ATM Data Mart)

#### Using PySpark to Transform Data fetched from SQOOP 

### Problem Statement

Banks have to refill the ATMs when the money goes below a specific threshold limit.This depends on the activity and the area where a particular ATM is located as well as the weather, day of the week, etc.In our project, `Spar Nord Bank` is trying to observe the withdrawal behavior and the corresponding dependent factors to optimally manage the refill frequency. Apart from this, other insights also have to be drawn from the data.

`Spar Nord Bank` has also built a dimensional model datastore (ATM Data Mart) on this ATM transaction data to understand the ATM usage pattern.

1. Reading the data from the files in HDFS by a specific schema using PySpark
2. Command to create an input schema using StructType(We recommend you to create a custom schema using the StructType class of PySpark, to avoid any data type mismatch.)
3. Commands to read the data using the input schema created and verifying the data using the count function
4. Creation of dimension tables using PySpark
5. Command to create a data frame for the dimension according to the target schema(dimension model) provided
6. Commands to clean and transform the data:
7. Making sure that duplicate records are cleaned(Avoid duplicate info especially in the dimension tables.
8. Making sure that appropriate primary keys are present for the dimensions( You need to generate a primary key for each dimension table. For example for the 'Date' dimension one way to generate the primary key can be by adding "date" as the prefix  to the row number i.e. 'date1', 'date2' and so on.) 
9. Rearranging the fields if necessary(According to the target schema)

##### Broadly you will be performing the following task-

1. Extracting the transactional data from a given MySQL RDS server to HDFS(EC2) instance using Sqoop.
2. Transforming the transactional data according to the given target schema using PySpark. 
3. This transformed data is to be loaded to an S3 bucket.
4. Creating the Redshift tables according to the given schema.
5. Loading the data from Amazon S3 to Redshift tables.
6. Performing the analysis queries.

In [1]:
## Importing Libraries
import os
import sys

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1662279780186_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
## create PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETL Spar Nord ATM').master("local").getOrCreate()
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7feb78e36b90>

In [3]:
sc = spark.sparkContext
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

#### Reading data to Spark

In [4]:
### Reading  data from HDFS with default schema
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, inferSchema = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+------+---+--------+---+---+----------+-------------------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
| _c0|    _c1|_c2|   _c3|_c4|     _c5|_c6|_c7|       _c8|                _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19|  _c20|  _c21|   _c22|          _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|   _c32|                _c33|
+----+-------+---+------+---+--------+---+---+----------+-------------------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
|2017|January|  1|Sunday|  0|  Active|  1|NCR|NÃƒÂ¦stved|        Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null| 55.23|11.761|2616038|      Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|   Rain|          

In [6]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

##### Creating own schema using Struct Type

In [7]:
### importing Struct Type for Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
### Defining Schema
FileSchema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
### Improting the data again using the defined schema
df_main = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, schema = FileSchema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
### Verifying count in defined schema
df_main.select('*').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [11]:
### Checking schema
df_main.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [12]:
## validate data in new schema
df_main.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [13]:
df_main.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'month', 'day', 'weekday', 'hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

#### Creating Dimensions and Fact tables

In [14]:
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Creating Dimension Table - DIM_LOCATION

In [16]:
dl_df=df_main.select("atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon").distinct()
dl_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [17]:
DIM_LOCATION = dl_df.withColumn('location_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
DIM_LOCATION.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------+-----------------+-----------+-------+-------+-----------+
|    atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+----------------+--------------+-----------------+-----------+-------+-------+-----------+
|         Kolding|      Vejlevej|              135|       6000| 55.505|  9.457|          1|
|  Skelagervej 15|   Skelagervej|               15|       9000| 57.023|  9.891|          2|
|Intern HolbÃƒÂ¦k|   Slotsvolden|                7|       4300| 55.718| 11.704|          3|
+----------------+--------------+-----------------+-----------+-------+-------+-----------+
only showing top 3 rows

#### Creating Dimension Table - DIM_ATM

In [19]:
da_df=df_main.select("atm_id","atm_manufacturer","atm_location","atm_streetname","atm_lat","atm_lon").distinct()
da_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [20]:
### RENAME atm_id to atm_number.
da_df = da_df.withColumnRenamed("atm_id","atm_number")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
DIM_ATM = da_df.withColumn('atm_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
DIM_ATM.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+------------+--------------+-------+-------+------+
|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_lat|atm_lon|atm_id|
+----------+----------------+------------+--------------+-------+-------+------+
|        17|             NCR|     Randers|  ÃƒËœstervold| 56.462| 10.038|     1|
|        23| Diebold Nixdorf|     Vodskov|    Vodskovvej| 57.104| 10.027|     2|
|        73|             NCR| HÃƒÂ¸jbjerg|Rosenvangsalle| 56.119| 10.192|     3|
+----------+----------------+------------+--------------+-------+-------+------+
only showing top 3 rows

#### Creating Dimension Table - DIM_DATE

In [23]:
dd_df=df_main.select("year","month","day","hour","weekday").distinct()
dd_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [24]:
DIM_DATE = dd_df.withColumn('date_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
DIM_DATE.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+--------+-------+
|year|  month|day|hour| weekday|date_id|
+----+-------+---+----+--------+-------+
|2017|January|  5|  21|Thursday|      1|
|2017|January| 22|  15|  Sunday|      2|
|2017|  April|  7|   9|  Friday|      3|
+----+-------+---+----+--------+-------+
only showing top 3 rows

#### Creating Dimension Table - DIM_CARD_TYPE

In [26]:
dct_df=df_main.select("card_type").distinct()
dct_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [27]:
DIM_CARD_TYPE = dct_df.withColumn('card_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
DIM_CARD_TYPE.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+
|           card_type|card_id|
+--------------------+-------+
|Visa Dankort - on-us|      1|
|  Mastercard - on-us|      2|
|         HÃƒÂ¦vekort|      3|
+--------------------+-------+
only showing top 3 rows

#### Creating Fact Table - FACT_ATM_TRANS

In [29]:
fat_df=df_main.select("year","month","day","hour","card_type","atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")
fat_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [30]:
### RENAME atm_id to atm_number.
fat_df=fat_df.withColumnRenamed("atm_id","atm_number")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
FACT_ATM_TRANS = fat_df.withColumn('trans_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
FACT_ATM_TRANS.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|year|  month|day|hour| card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|trans_id|
+----+-------+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|2017|January|  1|   0|MasterCard|         1|             NCR|  NÃƒÂ¦stved|   Farimagsvej|                8|       4700| 55.233| 11.763|    Activ

In [33]:
from pyspark.sql import functions as func
from pyspark.sql.functions import*
from pyspark.sql.types import*

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Converting month into numeric format

In [34]:
DIM_DATE=DIM_DATE.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))
cols=["year","month","day"]
DIM_DATE=DIM_DATE.withColumn("date",concat_ws("-",*cols).cast("date"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Creating full_date_time

In [35]:
DIM_DATE=DIM_DATE.withColumn("full_date_time",func.date_format(func.to_timestamp(func.concat("date","hour"),"yyyy-MM-ddHH"),"yyyy-MM-dd hh:ss:SSa"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Dropping extra column

In [36]:
 DIM_DATE=DIM_DATE.drop("date")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Reverting the month from numeric back to string

In [37]:
DIM_DATE=DIM_DATE.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MM'),'MMM'))
DIM_DATE=DIM_DATE.withColumn('full_date_time',func.unix_timestamp('full_date_time','yyyy-MM-dd HH:mm:ss').cast(TimestampType()))
DIM_DATE.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+----+--------+-------+-------------------+
|year|month|day|hour| weekday|date_id|     full_date_time|
+----+-----+---+----+--------+-------+-------------------+
|2017|  Jan|  5|  21|Thursday|      1|2017-01-05 09:00:00|
|2017|  Jan| 22|  15|  Sunday|      2|2017-01-22 03:00:00|
|2017|  Apr|  7|   9|  Friday|      3|2017-04-07 09:00:00|
+----+-----+---+----+--------+-------+-------------------+
only showing top 3 rows

#### Performing the same transformation on FACT_ATM_TRANS

In [38]:
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MM'),'MMM'))
FACT_ATM_TRANS.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|year|month|day|hour| card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|trans_id|
+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|2017|  Jan|  1|   0|MasterCard|         1|             NCR|  NÃƒÂ¦stved|   Farimagsvej|                8|       4700| 55.233| 11.763|    Active|     D

#### Rearranging Columns

In [39]:
DIM_DATE = DIM_DATE.select("date_id","full_date_time","year","month","day","hour","weekday")
DIM_CARD_TYPE = DIM_CARD_TYPE.select("card_id","card_type")
DIM_ATM = DIM_ATM.select("atm_id","atm_number","atm_manufacturer","atm_location","atm_streetname","atm_lat","atm_lon")
DIM_LOCATION = DIM_LOCATION.select("location_id","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Performing join for FACT table

In [40]:
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_DATE,on=['year','month','day','hour'],how="left")
FACT_ATM_TRANS.show(3)
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_CARD_TYPE,on=['card_type'],how="left")
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_ATM,on=['atm_number','atm_manufacturer'],how="left")
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_LOCATION,on=['atm_location','atm_streetname','atm_lat','atm_lon'],how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+-------+-------------------+-------+
|year|month|day|hour| card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|trans_id|date_id|     full_date_time|weekday|
+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+-------+-------------------+-------+
|2017|  Jan|  1|   0|MasterCard|         1|

#### Rearranging Fact table and rename columns per target schema

In [41]:
FACT_ATM_TRANS.drop("year","month","day","hour","card_type","atm_number","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")

FACT_ATM_TRANS = FACT_ATM_TRANS.select("trans_id","atm_id","location_id","date_id","card_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumnRenamed("location_id","weather_loc_id")
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumnRenamed("card_id","card_type_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Performing join to import location_id to DIM_ATM

In [42]:
DIM_ATM=DIM_ATM.join(DIM_LOCATION,on=['atm_location','atm_streetname','atm_lat','atm_lon'],how="left")
DIM_ATM=DIM_ATM.select("atm_id","atm_number","atm_manufacturer","location_id")
DIM_ATM=DIM_ATM.withColumnRenamed("location_id","atm_location_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Rename according to target schema

In [43]:
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_location","location")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_streetname","streetname")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_zipcode","zipcode")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_lat","lat")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_lon","lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
### Verify the columns - per target schema
DIM_DATE.printSchema()
DIM_ATM.printSchema()
DIM_LOCATION.printSchema()
DIM_CARD_TYPE.printSchema()
FACT_ATM_TRANS.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

root
 |-- card_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nu

In [45]:
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

In [46]:
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [47]:
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [49]:
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [48]:
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

#### Write to S3

In [102]:
DIM_DATE.write.csv('s3://s3etlbucket/dim_date/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
DIM_ATM.write.csv('s3://s3etlbucket/dim_atm/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
DIM_LOCATION.write.csv('s3://s3etlbucket/dim_location/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [109]:
DIM_CARD_TYPE.write.csv('s3://s3etlbucket/dim_card_type/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3://s3etlbucket/fact_atm_trans', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…