# ETL Project By Deepansh Jha

### Setting Up Spark Session

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("DJ_Spark_Notebook").master("local").getOrCreate()
sc=spark.sparkContext
spark

### Reading Data From Hdfs Location, Where We Extracted Data Using Sqoop, With Default Schema For Now.

In [2]:
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000",header=False,inferSchema=True)

#### Checking Dataset

In [3]:
df.show(10)

+----+-------+---+------+---+--------+---+---------------+-----------+-------------------+----+----+------+------+----+------------------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
| _c0|    _c1|_c2|   _c3|_c4|     _c5|_c6|            _c7|        _c8|                _c9|_c10|_c11|  _c12|  _c13|_c14|              _c15|_c16|      _c17|_c18|_c19|  _c20|  _c21|   _c22|          _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|   _c32|                _c33|
+----+-------+---+------+---+--------+---+---------------+-----------+-------------------+----+----+------+------+----+------------------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
|2017|January|  1|Sunday|  0|  Active|  1|            NCR| NÃƒÂ¦stved|        Farimagsvej|   8|4700|55.233|11.763| DKK|        MasterCard|5643|Withdrawal|null|null| 55.23|11.761|

#### Using StructType to change the datatype of every column according to our preferences.

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [5]:
d_type = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

#This 'd_type' Will Be Used as Parameter in schema below and datatypes of each column will be assigned according to it.

In [6]:
df = spark.read.csv("D://Upgrad//Data Engineering//Spark//ETL Project//Spar-Nord-Bank_ETL-Project-main//SRC_ATM_TRANS//part-m-00000", header = False, schema = d_type)

# Here We are reading dataset which we extracted using sqoop in hdfs directory.
# but insted of using inferschema, which by default assign datatypes to each column, we have used d_type(structType) insted.

In [7]:
df.show(5) # Just checking Dataset.

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

#### Checking Count Of Records.

In [8]:
df.select('*').count() 

2468572

#### Checking Columns

In [9]:
df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [10]:
len(df.columns) #Number Of Columns

34

#### Checking Datatypes,Columns.

In [11]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

### Creating Dimention And Fact Tables

##### Creating a dataframe for Location Dimension according to Target Dimension Model.

In [12]:
location = df.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()

#Selecting Some Columns With Distinct Values And Assigning Variable 'location' to it.

In [13]:
from pyspark.sql.functions import * #importing everything from pyspark.sql.functions

In [14]:
df_temp = location.rdd.zipWithIndex().toDF()
dim_location = df_temp.select(col("_1.*"),col("_2").alias('location_id'))
dim_location.show(10)

#Creating Primary Key Column

+--------------------+----------------+-----------------+-----------+-------+-------+-----------+
|        atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+--------------------+----------------+-----------------+-----------+-------+-------+-----------+
|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|          0|
|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|          1|
|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|          2|
|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|          3|
|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|          4|
|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|          5|
|                 Fur|      StenÃƒÂ¸re|               19|       7884| 56.805|   9.02|          6|
|            Hasseri

##### Renaming Columns

In [15]:
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

#atm_location to location
#atm_streetname to streetname
#atm_street_number to street_number
#atm_zipcode to zipcode
#atm_lat to lat
#atm_lon to lon

In [16]:
DIM_LOCATION = DIM_LOCATION.select('location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')

#Selecting The Columns Who's Name Has Been Renamed And Assigning Them In 'DIM_LOCATION' Variable.

In [17]:
DIM_LOCATION.columns

# Making Sure Everything Is Fine.
# We Can See Those Renamed Columns.

['location_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon']

In [18]:
DIM_LOCATION.select('*').count()

#Counting Number Of Records Present In DIM_LOCATION's Columns.

109

#### Creating a dataframe for ATM Dimension according to Target Dimension Model

In [19]:
atm = df.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon')

#Selecting Some Columns And Assigning Them To Variable 'atm'.

In [20]:
atm = atm.withColumnRenamed('atm_id', 'atm_number')

#Renaming 'atm_id' to 'atm_number'

In [21]:
atm = atm.join(dim_location, on = ['atm_lat', 'atm_lon'], how = "left")

#Joining dim_location and atm columns using left join and assigning it to variable 'atm'.

In [22]:
atm.columns

#Checking If Everything Is As Per Requirement.

['atm_lat',
 'atm_lon',
 'atm_number',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'location_id']

In [23]:
len(atm.columns)

# Checking Number Of Columns

9

In [24]:
atm = atm.select('atm_number','atm_manufacturer','location_id').distinct()

#Selecting Columns Which Are Required And Taking Distinct Values Of Those Columns.

In [25]:
atm = atm.withColumnRenamed('location_id', 'atm_location_id')

#Renaming location_id to atm_location_id

In [26]:
atm.columns

#Checking All The Columns
#location_id is renamed to atm_location_id

['atm_number', 'atm_manufacturer', 'atm_location_id']

In [27]:
df_temp = atm.rdd.zipWithIndex().toDF()
dim_atm = df_temp.select(col("_1.*"),col("_2").alias('atm_id'))
dim_atm.show(10)

#Creating Primary Key Column

+----------+----------------+---------------+------+
|atm_number|atm_manufacturer|atm_location_id|atm_id|
+----------+----------------+---------------+------+
|        12|             NCR|             23|     0|
|        98|             NCR|             88|     1|
|       101|             NCR|             30|     2|
|        36|             NCR|             67|     3|
|        85| Diebold Nixdorf|            100|     4|
|        43|             NCR|              4|     5|
|        21|             NCR|             23|     6|
|        33|             NCR|              0|     7|
|        84|             NCR|             41|     8|
|         3|             NCR|             44|     9|
+----------+----------------+---------------+------+
only showing top 10 rows



In [28]:
DIM_ATM = dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

#Selecting Columns As Per The Target Dimention Model And Assigning 'DIM_ATM' To Them.

In [29]:
DIM_ATM.columns

#Checking If Columns Are Assigned To DIM_ATM Variable.

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [30]:
DIM_ATM.select('*').count()

#Counting The Number Of Records In The Columns Selected Above

156

#### Creating a dataframe for Date Dimension according to Target Dimension Model

In [31]:
date = df.select('year', 'month', 'day', 'hour', 'weekday')

#Selecting Columns As Per The Requirement.

In [32]:
date = date.withColumn('full_date', concat_ws('-', date.year, date.month, date.day))

#concatenating year-month-day with a '-' in between and assigning new column 'full_date' to it.

In [33]:
date = date.withColumn('full_time', concat_ws(':', date.hour, lit('00'), lit('00')))

#concatenating hour:00:00 with ':' in between and assigning it to new column 'full_time'

In [34]:
date = date.withColumn('full_date_time', concat_ws(' ', date.full_date, date.full_time))

#No concatenating full_date full_time with a ' ' in between and assigning it to new column full_date_time.
#The process was to make everything in this pattern 'yyyy-MMM-dd HH:mm:ss' to convert it into datetime(timestamp) datatype.

#### The process was to make everything in this pattern 'yyyy-MMM-dd HH:mm:ss' to convert it into datetime(timestamp) datatype

In [35]:
pattern = 'yyyy-MMM-dd HH:mm:ss'
date = date.withColumn('full_date_time', unix_timestamp(date.full_date_time, pattern).cast('timestamp'))

#Converting datatype To Datetime(timestamp).

In [36]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [37]:
date.show(10, truncate = False)

#checking date dataframe.

#when values are so huge, just to read the whole value we use 'truncate=False' parameter.

+----+-------+---+----+-------+--------------+---------+-------------------+
|year|month  |day|hour|weekday|full_date     |full_time|full_date_time     |
+----+-------+---+----+-------+--------------+---------+-------------------+
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|

In [38]:
date = date.select('year', 'month', 'day', 'hour', 'weekday', 'full_date_time').distinct()

#Selecting The Required Columns With Distinct Values.

In [39]:
df_temp = date.rdd.zipWithIndex().toDF()
DIM_DATE = df_temp.select(col("_1.*"),col("_2").alias('date_id'))
DIM_DATE.show(10)

#Creating Primary-Key Column

+----+--------+---+----+---------+-------------------+-------+
|year|   month|day|hour|  weekday|     full_date_time|date_id|
+----+--------+---+----+---------+-------------------+-------+
|2017| January|  9|  16|   Monday|2017-01-09 16:00:00|      0|
|2017| January| 11|  17|Wednesday|2017-01-11 17:00:00|      1|
|2017| January| 30|  21|   Monday|2017-01-30 21:00:00|      2|
|2017|February|  4|  19| Saturday|2017-02-04 19:00:00|      3|
|2017|February|  8|   9|Wednesday|2017-02-08 09:00:00|      4|
|2017|February| 14|   2|  Tuesday|2017-02-14 02:00:00|      5|
|2017|February| 14|  21|  Tuesday|2017-02-14 21:00:00|      6|
|2017|February| 25|   5| Saturday|2017-02-25 05:00:00|      7|
|2017|February| 27|  23|   Monday|2017-02-27 23:00:00|      8|
|2017|   March| 12|   0|   Sunday|2017-03-12 00:00:00|      9|
+----+--------+---+----+---------+-------------------+-------+
only showing top 10 rows



In [40]:
DIM_DATE = DIM_DATE.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

#Arranging Columns According To Target Dimention Model.

In [41]:
DIM_DATE.columns

#Checking The Columns

['date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday']

In [42]:
DIM_DATE.select('*').count()

#Total Number Of Records In DIM_Date.

8685

#### Creating a dataframe for Card Type Dimension according to Target Dimension Model

In [43]:
card_type = df.select('card_type').distinct()

#selecting card_type column from df and assigning it to card_type variable.

In [44]:
df_temp = card_type.rdd.zipWithIndex().toDF()
DIM_CARD_TYPE = df_temp.select(col("_1.*"),col("_2").alias('card_type_id'))
DIM_CARD_TYPE.show(10)

#Creating Primary-Key Column

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           0|
|              CIRRUS|           1|
|         HÃƒÂ¦vekort|           2|
|                VISA|           3|
|  Mastercard - on-us|           4|
|             Maestro|           5|
|Visa Dankort - on-us|           6|
|        Visa Dankort|           7|
|            VisaPlus|           8|
|          MasterCard|           9|
+--------------------+------------+
only showing top 10 rows



In [45]:
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id', 'card_type')

#Arranging Columns As Per Requirement.

In [46]:
DIM_CARD_TYPE.columns

#Checking Columns

['card_type_id', 'card_type']

In [47]:
DIM_CARD_TYPE.select('*').count()

#Checking Total Number Of Records In DIM_CARD_TYPE

12

#### Creating the Transaction Fact Table according to Target Model Dimension

In [48]:
fact_loc = df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

#renaming Column atm_location to location.
#renaming atm_streetname to streetname
#renaming atm_street_number to street_number
#renaming atm_zipcode to zipcode
#renaming atm_lat to lat

In [49]:
fact_loc = fact_loc.join(DIM_LOCATION, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")

#Joining DIM_LOCATION With The Above Columns using Left Join.

In [50]:
fact_loc.columns

#Checking Columns

['location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'location_id']

In [51]:
fact_loc.select('*').count()

#Counting The Number Of Records In fact_loc

2468572

In [52]:
fact_loc = fact_loc.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')

#renaming atm_id to atm_number
#renaming location_id to atm_location_id

In [53]:
fact_atm = fact_loc.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

#joining DIM_ATM with Above Given Columns Using Left Join.

In [54]:
fact_atm = fact_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

#Renaming atm_location_id to weather_loc_id is also required.

In [55]:
fact_atm.select('*').count()

#Counting Total Number Of Records In fact_atm.

2468572

#### Stage 3 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_DATE

In [56]:
fact_date = fact_atm.join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

#joining DIM_DATE with Above Columns Using Left Join.

In [57]:
fact_date.select('*').count()

#Counting Total Number Of Records In fact_date

2468572

#### Stage 4 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_CARD_TYPE

In [58]:
fact_atm_trans = fact_date.join(DIM_CARD_TYPE, on = ['card_type'], how = "left")

#Joining fact_date with card_type column using left join.

In [59]:
fact_atm_trans.select('*').count()

#Counting Total Number Of Records In fact_atm_trans

2468572

In [60]:
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
FACT_ATM_TRANS = fact_atm_trans.withColumn("trans_id", row_number().over(w))
FACT_ATM_TRANS.show(5, True)

# Creating primary key of FACT_ATM_TRANS and viewing 5 record of the table and using truncate=True.

+--------------------+----+-------+---+----+-------+----------+----------------+--------------+--------------+----------------+-------------+-------+------+-----+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------+-------+-------------------+------------+--------+
|           card_type|year|  month|day|hour|weekday|atm_number|atm_manufacturer|weather_loc_id|      location|      streetname|street_number|zipcode|   lat|  lon|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|atm_id|date_id|     full_date_time|card_type_id|trans_id|
+--------------------+----+-------+---+----+-------+----------+-----------

In [61]:
FACT_ATM_TRANS.columns

#Checking Columns

['card_type',
 'year',
 'month',
 'day',
 'hour',
 'weekday',
 'atm_number',
 'atm_manufacturer',
 'weather_loc_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'atm_status',
 'currency',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'atm_id',
 'date_id',
 'full_date_time',
 'card_type_id',
 'trans_id']

In [62]:
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 
'clouds_all', 'weather_id', 'weather_main', 'weather_description')

#Selecting Some Required Columns.

In [63]:
FACT_ATM_TRANS.columns

#Checking Number Of Columns

['trans_id',
 'atm_id',
 'weather_loc_id',
 'date_id',
 'card_type_id',
 'atm_status',
 'currency',
 'service',
 'transaction_amount',
 'message_code',
 'message_text',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [64]:
len(FACT_ATM_TRANS.columns)

#Checking Number Of Columns

16

In [65]:
FACT_ATM_TRANS.select('*').count()

#Checking The Count Of Number Of Total Records In FACT_ATM_TRANS.

2468572

In [67]:
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3://dj-etlproject/dim_location', mode='overwrite')
#Saving To s3 Bucket With Folder Name dim_location in csv format.

In [68]:
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3://dj-etlproject/dim_atm', mode='overwrite')

#Saving To s3 Bucket With Folder Name dim_atm in csv format.

In [69]:
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3://dj-etlproject/dim_date', mode='overwrite')

#Saving To s3 Bucket With Folder Name dim_date in csv format.

In [70]:
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3://dj-etlproject/dim_card_type', mode='overwrite')

#Saving To s3 Bucket With Folder Name dim_card_type in csv format.

In [71]:
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3://dj-etlproject/fact_atm_trans', mode='overwrite')

#Saving To s3 Bucket With Folder Name fact_atm_trans in csv format.