### Transforming data to match the target dimension model (using Spark)

`Sqoop` has ingested the data from RDS and stored on the HDFS. The extracted data needs to be read from HDFS using PySpark and transformed to match the target dimension model by creation of fact and dimension tables. Lastly, the tables need to be stored in different folders on the AWS S3 bucket.

**Approach:**
1. Reading the data from the HDFS file using a specific schema
2. Creation of dimension tables - DIM_LOCATION, DIM_ATM, DIM_DATE, DIM_CARD_TYPE 
3. Creation of fact table
4. Storing the dimension tables and fact tables to AWS S3 bucket


**Sqoop has extracted the data at HDFS path:  `/user/root/bank_atm_project/part-m-00000`**

In [1]:
# Initiate a spark context
sc

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1671258357477_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-2>

In [2]:
# Read the data from HDFS without defining any custom schema
df = spark.read.csv('/user/root/bank_atm_project/part-m-00000', header = False, inferSchema = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# View first record
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

## Step 1: Reading the data from the HDFS file using a specific schema

**Creating a custom schema to load the data **

In [4]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
atmSchema = StructType([
                StructField('year',IntegerType(), True),
                StructField('month',StringType(), True),
                StructField('day',IntegerType(), True),
                StructField('weekday',StringType(), True),
                StructField('hour',IntegerType(), True),
                StructField('atm_status',StringType(), True),
                StructField('atm_id',IntegerType(), True),
                StructField('atm_manufacturer',StringType(), True),
                StructField('atm_location',StringType(), True),
                StructField('atm_streetname',StringType(), True),
                StructField('atm_street_number',IntegerType(), True),
                StructField('atm_zipcode',IntegerType(), True),
                StructField('atm_lat',DoubleType(), True),
                StructField('atm_lon',DoubleType(), True),
                StructField('currency',StringType(), True),
                StructField('card_type',StringType(), True),
                StructField('transaction_amount',IntegerType(), True),
                StructField('service',StringType(), True),
                StructField('message_code',StringType(), True),
                StructField('message_text',StringType(), True),
                StructField('weather_lat',DoubleType(), True),
                StructField('weather_lon',DoubleType(), True),
                StructField('weather_city_id',IntegerType(), True),
                StructField('weather_city_name',StringType(), True),
                StructField('temp',DoubleType(), True),
                StructField('pressure',IntegerType(), True),
                StructField('humidity',IntegerType(), True),
                StructField('wind_speed',IntegerType(), True),
                StructField('wind_deg',IntegerType(), True),
                StructField('rain_3h',DoubleType(), True),
                StructField('clouds_all',IntegerType(), True),
                StructField('weather_id',IntegerType(), True),
                StructField('weather_main',StringType(), True),
                StructField('weather_description',StringType(), True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Load data by defining a custom schema
df = spark.read.csv('/user/root/bank_atm_project/part-m-00000', header = False, schema = atmSchema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Print the schema
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

In [8]:
# Print the count of records
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [9]:
# View first record
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

## Step 2: Creation of dimension tables

**DIM_LOCATION**

In [10]:
# Selecting the columns for location dimension
location = df.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Importing all the functions to perform required transformations
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Generating the primary key column 
location_withPK = location.rdd.zipWithIndex().toDF()
location = location_withPK.select(col("_1.*"),col("_2").alias('location_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Renaming the columns to match the target dimension model
DIM_LOCATION = location.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')       

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# View the columns 
DIM_LOCATION.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'location_id']

In [15]:
# Rearranging the columns to match the target dimension model
DIM_LOCATION = DIM_LOCATION.select('location_id','location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# View first 5 records
DIM_LOCATION.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------------+-------------------+-------------+-------+------+------+
|location_id|        location|         streetname|street_number|zipcode|   lat|   lon|
+-----------+----------------+-------------------+-------------+-------+------+------+
|          0|         Kolding|           Vejlevej|          135|   6000|55.505| 9.457|
|          1|  Skelagervej 15|        Skelagervej|           15|   9000|57.023| 9.891|
|          2|Intern HolbÃƒÂ¦k|        Slotsvolden|            7|   4300|55.718|11.704|
|          3|          Odense|       FÃƒÂ¦lledvej|            3|   5000|55.394| 10.37|
|          4|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|           12|   7430|56.139| 9.154|
+-----------+----------------+-------------------+-------------+-------+------+------+
only showing top 5 rows

In [17]:
# Verify the count of records
DIM_LOCATION.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

**DIM_ATM**

In [18]:
# Selecting the columns for ATM dimension
atm_temp = df.select('atm_id','atm_manufacturer','atm_lat','atm_lon')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Renaming the column
atm_temp = atm_temp.withColumnRenamed('atm_id', 'atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# To get the atm_location_id col, join atm_temp with location table 
atm_temp = atm_temp.join(location, on = ['atm_lat','atm_lon'], how = 'left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Selecting distinct values
atm_temp = atm_temp.select('atm_number','atm_manufacturer','location_id').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Renaming the columns to match the target dimension model
atm_temp = atm_temp.withColumnRenamed('location_id', 'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Generating the primary key column 
atm_withPK = atm_temp.rdd.zipWithIndex().toDF()
DIM_ATM = atm_withPK.select(col("_1.*"),col("_2").alias('atm_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Rearranging the columns to match the target dimension model
DIM_ATM = DIM_ATM.select('atm_id','atm_number','atm_manufacturer','atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# View the columns 
DIM_ATM.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [26]:
# View first 5 records
DIM_ATM.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     0|        35|             NCR|             41|
|     1|        16|             NCR|              8|
|     2|        37|             NCR|             21|
|     3|        68|             NCR|             76|
|     4|        18| Diebold Nixdorf|             40|
+------+----------+----------------+---------------+
only showing top 5 rows

In [27]:
# Verify the count of records
DIM_ATM.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

156

**DIM_DATE**

In [28]:
# Selecting the columns for Date dimension
date=df.select('year','month','day','hour','weekday')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Generating fulltime column using hour column and lit() method
date = date.withColumn('fulltime', concat_ws(':',date.hour,lit('00'), lit('00')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Concatenating year, month, day columns to get fulldate
date = date.withColumn('fulldate', concat_ws('-',date.year,date.month,date.day))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# Generating full_date_time column by concatenating fulldate and fulltime columns
date = date.withColumn('full_date_time', concat_ws(' ',date.fulldate, date.fulltime) )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# Verifying the generated full_date_time column
date.select('fulldate','fulltime','full_date_time').show(3,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------+----------------------+
|fulldate      |fulltime|full_date_time        |
+--------------+--------+----------------------+
|2017-January-1|0:00:00 |2017-January-1 0:00:00|
|2017-January-1|0:00:00 |2017-January-1 0:00:00|
|2017-January-1|0:00:00 |2017-January-1 0:00:00|
+--------------+--------+----------------------+
only showing top 3 rows

In [33]:
# View the columns 
date.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'month', 'day', 'hour', 'weekday', 'fulltime', 'fulldate', 'full_date_time']

In [34]:
# Casting the full_date_time column to timestamp using a pattern
pattern = 'yyyy-MMM-dd HH:mm:ss'
date = date.withColumn('full_date_time', unix_timestamp(date.full_date_time, pattern).cast('timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Viewing the date and time columns to verify the full_date_time column
date.select('fulldate','fulltime','full_date_time').show(3,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------+-------------------+
|fulldate      |fulltime|full_date_time     |
+--------------+--------+-------------------+
|2017-January-1|0:00:00 |2017-01-01 00:00:00|
|2017-January-1|0:00:00 |2017-01-01 00:00:00|
|2017-January-1|0:00:00 |2017-01-01 00:00:00|
+--------------+--------+-------------------+
only showing top 3 rows

In [36]:
# Selecting required date related columns and keeping only distinct values
DIM_DATE = date.select('full_date_time','year','month','day','hour','weekday').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Generating the primary key column 
date_withPK = DIM_DATE.rdd.zipWithIndex().toDF()
DIM_DATE = date_withPK.select(col("_1.*"),col("_2").alias('date_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# Rearranging the columns to match the target dimension model
DIM_DATE = DIM_DATE.select('date_id','full_date_time','year','month','day','hour','weekday')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# View first 5 records
DIM_DATE.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+--------+---+----+-------+
|date_id|     full_date_time|year|   month|day|hour|weekday|
+-------+-------------------+----+--------+---+----+-------+
|      0|2017-01-06 08:00:00|2017| January|  6|   8| Friday|
|      1|2017-01-16 18:00:00|2017| January| 16|  18| Monday|
|      2|2017-01-29 08:00:00|2017| January| 29|   8| Sunday|
|      3|2017-02-26 08:00:00|2017|February| 26|   8| Sunday|
|      4|2017-03-14 06:00:00|2017|   March| 14|   6|Tuesday|
+-------+-------------------+----+--------+---+----+-------+
only showing top 5 rows

In [40]:
# Verify the count of records
DIM_DATE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

**DIM_CARD_TYPE**

In [41]:
# Select distinct values for card type 
card = df.select('card_type').distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# Generating the primary key column
card_withPK = card.rdd.zipWithIndex().toDF()
DIM_CARD_TYPE = card_withPK.select(col("_1.*"),col("_2").alias('card_type_id'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# Rearranging the columns to match the target dimension model
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id','card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
# Verify the count of records
DIM_CARD_TYPE.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

## Step 3: Creation of fact table

**FACT_ATM_TRANS**

**Stage 1: Get the atm_location_id for all records**

In [45]:
# To get the location_id col for all records, join df with location table 
fact1 = df.join(location, on = ['atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon'], how = 'left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Rename the location_id to atm_location_id
fact1 = fact1.withColumnRenamed('location_id', 'atm_location_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# Verify the count of records
fact1.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

**Stage 2: Get the atm_id for all records**

In [48]:
# Rename the atm_id to atm_number to join with DIM_ATM table
fact2 = fact1.withColumnRenamed('atm_id', 'atm_number')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# To get the atm_id col for all records, join fact2 with DIM_ATM table 
fact2 = fact2.join(DIM_ATM, on =['atm_number','atm_manufacturer','atm_location_id'] , how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
# Rename the location_id to atm_location_id
fact2 = fact2.withColumnRenamed('atm_location_id', 'weather_loc_id')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Verify the count of records 
fact2.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

**Stage 3: Get the date_id for all records**

In [52]:
# To get the date_id col for all records, join fact2 with DIM_DATE table 
fact3 = fact2.join(DIM_DATE, on = ['year','month','day','hour','weekday'], how = 'left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
# Verify the count of records 
fact3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

**Stage 4: Get the card_type_id for all records**

In [54]:
# To get the card_type_id col for all records, join fact3 with DIM_CARD_TYPE table 
fact4 = fact3.join(DIM_CARD_TYPE, on = ['card_type'], how = 'left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
# Verify the count of records 
fact4.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [56]:
# Generating primary key of fact table
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
FACT_ATM_TRANS = fact4.withColumn("trans_id", row_number().over(w))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
# Rearranging the columns to match the target dimension model
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
# Verify the count of records 
FACT_ATM_TRANS.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [59]:
# View first 3 records
FACT_ATM_TRANS.show(3,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|service   |transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|1       |56    |1             |0      |10          |Inactive  |DKK     |Withdrawal|4447              |null        |null        |0.0    |12        |801       |Clouds      |few clouds         |
|2       |65    |99            |0      |0           |Active    |DKK     |Withdrawal|9483              |null        |null        |0.0    |8         |800       |Clear       |sky is clear       |
|3       |27    |9             |0  

In [60]:
# View columns in fact table
FACT_ATM_TRANS.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

## Step 4: Storing the dimension tables and fact tables to AWS S3 bucket

**`atm-upgrad-proj`** is the AWS S3 bucket

**DIM_LOCATION**

In [61]:
# writing DIM_LOCATION df to dim_location folder in AWS S3 bucket 
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3://atm-upgrad-proj/dim_location', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**DIM_ATM**

In [62]:
# writing DIM_ATM df to dim_atm folder in AWS S3 bucket 
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3://atm-upgrad-proj/dim_atm', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**DIM_DATE**

In [63]:
# writing DIM_DATE df to dim_date folder in AWS S3 bucket 
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3://atm-upgrad-proj/dim_date', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**DIM_CARD_TYPE**

In [64]:
# writing DIM_CARD_TYPE df to dim_card_type folder in AWS S3 bucket
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3://atm-upgrad-proj/dim_card_type', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**FACT_ATM_TRANS**

In [65]:
# writing data from FACT_ATM_TRANS df to fact_atm_trans folder in AWS S3 bucket
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3://atm-upgrad-proj/fact_atm_trans', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…