In [40]:
%%html
<style>
table{float:left;border-style:solid;}
</style>

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('etl').getOrCreate()

In [7]:
spark

##### Types are imported for creating input schema

In [8]:
from pyspark.sql.types import *

##### The input schema is created according to the data in the csv file imported using scoop

In [9]:
schema = StructType(
    [
        StructField("year", IntegerType(), True),
        StructField("month", StringType(), True),
        StructField("day", IntegerType(), True),
        StructField("weekday", StringType(), True),
        StructField("hour", IntegerType(), True),
        StructField("atm_status", StringType(), True),
        StructField("atm_id", StringType(), True),
        StructField("atm_manufacturer", StringType(), True),
        StructField("atm_location", StringType(), True),
        StructField("atm_streetname", StringType(), True),
        StructField("atm_street_number", IntegerType(), True),
        StructField("atm_zipcode", IntegerType(), True),
        StructField("atm_lat", DoubleType(), True),
        StructField("atm_lon", DoubleType(), True),
        StructField("currency", StringType(), True),
        StructField("card_type", StringType(), True),
        StructField("service", StringType(), True),
        StructField("message_code", StringType(), True),
        StructField("message_text", StringType(), True),
        StructField("weather_lat", DoubleType(), True),
        StructField("weather_lon", DoubleType(), True),
        StructField("weather_city_id", IntegerType(), True),
        StructField("weather_city_name", StringType(), True),
        StructField("temp", IntegerType(), True),
        StructField("preasure", IntegerType(), True),
        StructField("humidity", IntegerType(), True),
        StructField("wind_speed", IntegerType(), True),
        StructField("wind_deg", IntegerType(), True),
        StructField("rain_3h", DoubleType(), True),
        StructField("clouds_all", IntegerType(), True),
        StructField("weather_id", IntegerType(), True),
        StructField("weather_main", StringType(), True),
        StructField("weather_description", StringType(), True),
    ]
)

##### Importing Data from HDFS

1. The path of hdfs where the imported data is stored
2. The data is stored in csv format where each field is enclosed without quotes


<font color='red'>\* The numbered list are comments for the code bellow</font>

In [10]:
path='./csv/imported_data.csv'
temp_atm = spark.read.load(path,format='csv',schema=schema,quote='')

Here I have used rand function to generate random values for the column transaction amount

In [13]:
from pyspark.sql.functions import rand
temp_atm = temp_atm.withColumn('transaction_amount',(rand()*10000).cast('int'))
temp_atm.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: integer (nullable

##### The count of the imported table using scoop was <u>2468572</u>

In [14]:
print('The count of records in the imported file is',temp_atm.count())
# atm.show(5)

The count of records in the imported file is 2468572


- ## Fact Table
    1. [FACT_ATM_TRANS](#FACT_ATM_TRANS)
- ## Dimention Table
    1. [DIM_DATE](#DIM_DATE)
    2. [DIM_LOCATION](#DIM_LOCATION)
    3. [DIM_ATM](#DIM_ATM)
    4. [DIM_CARD_TYPE](#DIM_CARD_TYPE)

##### The following imports will be used for creating ID for the fact and dimenssion tables

In [15]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window

In [16]:
temp_atm.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: integer (nullable

## Creating Location Dimension table

### **DIM_LOCATION**
|Columns|Data Type|Constriant|
| :- | :- |:-|
|location_id|int|**Primary**|
|location|string|
|street_name|string|
|street_number|int|
|zipcode|int|
|lat|double|
|lon|double|

2. A new column is created called 'location_id'.
3. 'location_id' is to be filled with value zero.
4. The data frame is rearranged and the headers are renamed.


12. Distinct will remove the duplicate.
13. The column *card_type_id* is selected for updating its values
14. IDs are generated such that the ID starts with number 2

<font color='red'>\* The numbered list are comments for the code bellow</font>

In [17]:
dim_loc = temp_atm.withColumn(
    'location_id',
    lit(0).cast('int')
).select(
    'location_id',
    temp_atm.atm_location.alias('location'),
    temp_atm.atm_streetname.alias('street_name'),
    temp_atm.atm_street_number.alias('street_number'),
    temp_atm.atm_zipcode.alias('zipcode'),
    temp_atm.atm_lat.alias('lat'),
    temp_atm.atm_lon.alias('lon')
).distinct().withColumn(
    'location_id',
    row_number().over(Window().orderBy('location','street_name'))*2+lit(20001)
)

# print(dim_loc.count())
# dim_loc.show()

## Creating Card Type Dimension table

### **DIM_CARD_TYPE**
|Columns|Data Type|Constriant|
|:-|:-|:-|
|card_type_id|int|**Primary**|
|card_type|string|

1. The distinct values are selected from *card_type*.
2. A new column *card_type_id* is created for storing IDs
3. IDs are generated such that the ID starts with number 3


4. The columns are rearranged.

<font color='red'>\* The numbered list are comments for the code bellow</font>

In [18]:
dim_card_type = temp_atm.select('card_type').distinct().withColumn(
    'card_type_id',
    row_number().over(Window().orderBy('card_type'))*lit(2)+lit(30001)
).select('card_type_id','card_type')
# dim_card_type.show()

## Creating ATM Dimension table

### **DIM_ATM**
|Columns|Data Type|Constriant|
|:-|:-|:-|
|atm_id|int|**Primary**|
|atm_number|string|
|atm_manufacturer|string|
|atm_location_id|int|**Foreign(dim_loc.location_id)**|

The *col* fulction will bel used for altering the headers

In [19]:
from pyspark.sql.functions import col

1. Creating a dataframe *dim_atm_s1* for storing the partial data collected from *temp_atm* dataframe. The columns are selected for performing join operation with *dim_loc* and the column *atm_id* is renamed as *atm_number*


10. The the duplicates are removed using distinct.
11. A new column called *atm_id* is created to storing ID.
12. The ID values are generated such that the ID starts with number 4
13. The Columns are rearranged.

<font color='red'>\* The numbered list are comments for the code bellow</font>

In [20]:
dim_atm_s1 = temp_atm.select(
    col('atm_id').alias('atm_number'),
    'atm_manufacturer',
    'atm_location',
    'atm_streetname',
    'atm_street_number',
    'atm_zipcode',
    'atm_lat',
    'atm_lon'
).distinct().withColumn(
    'atm_id',
    row_number().over(Window().orderBy('atm_number'))*lit(2)+lit(40001)
).select(
    'atm_id',
    'atm_number',
    'atm_manufacturer',
    'atm_location',
    'atm_streetname',
    'atm_street_number',
    'atm_zipcode',
    'atm_lat',
    'atm_lon'
)
# dim_atm_s1.count()
# dim_atm.show()

1. The two dataframes *dim_atm_s1* and *dim_loc* are joined with their respective columns.

In [21]:
dim_atm_s2 = dim_atm_s1.join(
    dim_loc,
    (dim_atm_s1['atm_location']==dim_loc['location'])&\
    (dim_atm_s1['atm_streetname']==dim_loc['street_name'])&\
    (dim_atm_s1['atm_street_number']==dim_loc['street_number'])&\
    (dim_atm_s1['atm_zipcode']==dim_loc['zipcode'])&\
    (dim_atm_s1['atm_lat']==dim_loc['lat'])&\
    (dim_atm_s1['atm_lon']==dim_loc['lon']),
    'left'
)
del dim_atm_s1
# dim_atm_s2.show()

1. The columns are rearranged and stored in *dim_atm*


5. Column *location_id* is renamed as *atm_location_id*

In [22]:
dim_atm = dim_atm_s2.select(
    'atm_id',
    'atm_number',
    'atm_manufacturer',
    col('location_id').alias('atm_location_id')
)
del dim_atm_s2
# print(dim_atm.count())
# dim_atm.printSchema()

### **DIM_DATE**

|Columns|Data Type|Constriant|
| :- | :- |:-|
|date_id|int|**Primary**|
|full_date_time|string||
|year|int||
|month|string||
|day|int||
|hour|int||
|weekday|string||

1. The required library for creating the column called *full_date_time* are imported

In [23]:
from pyspark.sql.functions import format_string,udf

1. Adding a user defined function to return an integer by using a decorator.
2. *monthToNumber* will take input as string and return the month number.
3. The dictionary has month string as the key and value as month number


18. The string is capitalized and used as the key to return the value from the dictionary.


20. If the key does not exist in the dictionary 0 is returned implying that the month name is invalid.
21. The required columns are selected for creating the date dimension and and distinct is for removing duplicates.
22. *full_date_time* is created for stroing the time stamp generated from line 23
23. *to_timestamp* converts the string to timestamp for that to happen the srting should be in this format "YYYY-MM-DD hh:mm:ss".
24. *format_string* will create a string from values of other columns or UDF function and return the string in a format appropriate for timestamp conversion.
25. The string or style is passed reformat the string.


27. The UDF function *monthToNumber* is used where the column containing month string is passed


33. A new column *date_id* is created for storing IDs
34. The ID values are generated such that the ID starts with number 5
35. The columns are rearranged

In [24]:
@udf(returnType=IntegerType())
def monthToNumber(sMonth:str):
    dictMonth = {
        'January':1,
        'February':2,
        'March':3,
        'April':4,
        'May':5,
        'June':6,
        'July':7,
        'August':8,
        'September':9,
        'October':10,
        'November':11,
        'December':12
    }
    try:
        return dictMonth[sMonth.capitalize()]
    except KeyError:
        return 0
dim_date = temp_atm.select('year','month','day','hour','weekday').distinct().withColumn(
    'full_date_time',
    format_string(
        '%d-%02d-%02d %02d:00:00',
        'year',
        monthToNumber('month'),
        'day',
        'hour'
    )
).withColumn(
    'date_id',
    row_number().over(Window().orderBy('full_date_time'))*lit(2)+lit(50001)
).select(
    'date_id',
    'full_date_time',
    'year',
    'month',
    'day',
    'hour',
    'weekday'
)
dim_date.printSchema()

root
 |-- date_id: integer (nullable = false)
 |-- full_date_time: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [25]:
dim_loc.printSchema()

root
 |-- location_id: integer (nullable = false)
 |-- location: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



##### Stage 1
1. The dataframe *fact_atm_status_s1* will store the the join of *temp_atm* and *dim_loc* with their respective columns.


10. The unnecessary columns are droped beforing storing in *fact_atm_status_s1*

In [26]:
fact_atm_status_s1 = temp_atm.join(
    dim_loc,
    (temp_atm['atm_location']==dim_loc['location'])&\
    (temp_atm['atm_streetname']==dim_loc['street_name'])&\
    (temp_atm['atm_street_number']==dim_loc['street_number'])&\
    (temp_atm['atm_zipcode']==dim_loc['zipcode'])&\
    (temp_atm['atm_lat']==dim_loc['lat'])&\
    (temp_atm['atm_lon']==dim_loc['lon']),
    'left'
).drop(
    'atm_location',
    'location',
    'atm_streetname',
    'street_name',
    'atm_street_number',
    'street_number',
    'atm_zipcode',
    'zipcode',
    'atm_lat',
    'lat',
    'atm_lon',
    'lon'
)
# fact_atm_status_s1.printSchema()

##### Stage 2
1. The dataframe *fact_atm_status_s2* will store the the join of *fact_atm_status_s1* and *dim_card_type* with their respective columns.


5. The unnecessary columns are droped beforing storing in *fact_atm_status_s2*

In [27]:
fact_atm_status_s2 = fact_atm_status_s1.join(
    dim_card_type,
    (fact_atm_status_s1['card_type']==dim_card_type['card_type']),
    'left'
).drop('card_type')
# fact_atm_status_s2.printSchema()

##### Stage 3
1. The column *atm_id* is renamed to *atm_number*.<br>The column *atm_id* has the same name as the column in *dim_atm* which are different attributes. The join operation will create an extra column called *atm_id*. This will cause an exception when selecting the columns.
2. The dataframe *fact_atm_status_s3* will store the the join of *fact_atm_status_s2* and *dim_atm* with their respective columns.


6. The unnecessary columns are droped beforing storing in *fact_atm_status_s3*

In [28]:
fact_atm_status_s2 = fact_atm_status_s2.withColumnRenamed('atm_id','atm_number')
fact_atm_status_s3 = fact_atm_status_s2.join(
    dim_atm,
    (fact_atm_status_s2['atm_number']==dim_atm['atm_number'])&\
    (fact_atm_status_s2['atm_manufacturer']==dim_atm['atm_manufacturer'])
).drop('atm_number','atm_manufacturer','atm_location_id')
# fact_atm_status_s3.printSchema()

##### Stage 4
1. The dataframe *fact_atm_status_s4* will store the the join of *fact_atm_status_s3* and *dim_date* with their respective columns.


5. The unnecessary columns are droped beforing storing in *fact_atm_status_s2*

In [29]:
fact_atm_status_s4 = fact_atm_status_s3.join(
    dim_date,
    (fact_atm_status_s3['year']==dim_date['year'])&\
    (fact_atm_status_s3['month']==dim_date['month'])&\
    (fact_atm_status_s3['day']==dim_date['day'])&\
    (fact_atm_status_s3['hour']==dim_date['hour'])&\
    (fact_atm_status_s3['weekday']==dim_date['weekday'])
).drop('year','month','day','hour','weekday','full_date_time')
# fact_atm_status_s4.printSchema()

### **FACT_ATM_TRANS**
|Columns|Data Type|Constriant|
|:-|:-|:-|
|tran_id|bigint|**Primary**|
|atm_id|int|**Foreign(dim_atm.atm_id)**|
|weather_loc_id|int|**Foreign(dim_location.location_id)**|
|date_id|int|**Foreign(dim_date.date_id)**|
|card_type_id|int|**Foreign(dim_card_type.card_type_id)**|
|atm_status|string|
|curency|string|
|service|string|
|transaction_amount|int|
|message_code|string|
|message_text|string|
|rain_3h|float|
|cloud_all|int|
|weather_id|int|
|weather_main|string|
|weather_description|string|

##### Transaction ID
2. A new column *tran_id* is created for storing transaction IDs.
3. It returns the row number each line of the dataframe which is added with a long number

The large number is for identifying the ID as transaction ID.<br>Since the tran_id data type is *bigint* a suitable number is added to match the size of the data type

<font color='red'>\* The numbered list are comments for the code bellow</font>

In [30]:
fact_atm_status_s4 = fact_atm_status_s4.withColumn(
    'tran_id',
    row_number().over(Window().orderBy(lit('a')))+lit(2017000000000001)
    
)
# temp_atm.show(5)

1. The required columns are selected and renamed according to the above Fact table schema and stored in *fact_atm_tran*

In [31]:
fact_atm_tran = fact_atm_status_s4.select(
    'tran_id',
    'atm_id',
    col('location_id').alias('weather_loc_id'),
    'date_id',
    'card_type_id',
    'atm_status',
    'currency',
    'service',
    'transaction_amount',
    'message_code',
    'message_text',
    'rain_3h',
    'clouds_all',
    'weather_id',
    'weather_main',
    'weather_description'
).coalesce(1)
# fact_atm_tran.printSchema()

2. The null values in *fact_atm_tran* columns *message_code* and *message_text* is filled.
3. The double quotes in the string was causing error when being imported in to redshift. I have removed them using *regexp_replace* function

In [32]:
from pyspark.sql.functions import regexp_replace
fact_atm_tran = fact_atm_tran.fillna('0000',subset=['message_code']).fillna('Success',subset=['message_text'])
# fact_atm_tran.select('message_code','message_text').distinct().show()

##### S3 Path where csv files are created

In [26]:
s3Path = 's3://sharads-etl-project/Data/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
fact_atm_tran.write.csv(s3Path+'fact_atm_tran1')

dim_loc.write.csv(s3Path+'dim_loc')

dim_atm.write.csv(s3Path+'dim_atm')

dim_date.write.csv(s3Path+'dim_date')

dim_card_type.write.csv(s3Path+'dim_card_type')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…