# **ETL Assignment (Pyspark)**

### Tasks performed in this sheet

* After the data is imported in HDFS we need to read the file and create Target dimension model in Pyspark
* Environment variable set up and Initiating SparkSession and creating sparkContext
* Custom Schema needs to be created to avoid any data type issues
* 4 Dimension and 1 Fact table needs to be created
* Cleaning and Tranforming of data is done and de-duplication needs to be taken care
* Creating Primary Key for each dimension table
* Rearranging the fields if necessary
* Making all Primary key available for Fact table creation
* Write the DataFrames containing fact and dimensions directly to an s3 bucket folder

In [58]:
### Environment set up to access our code in Jupytor notebook.Here we set the path for JAVA_HOME, SPARK_HOME as well as Python library
#### Import SparkSession : Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark's functionality with a lesser number of constructs.
#### Create sparkContext entry point

In [60]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [62]:
from pyspark.sql import SparkSession

In [64]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark

In [66]:
sc = spark.sparkContext

In [68]:
sc

#### **Reading the data from the files in HDFS by a specific schema using PySpark**

####    Creating an input schema using StructType to avoid any data type mismatch

In [70]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

##### Defining Struct Types for each column based on Schema

In [72]:
fileSchema = StructType([StructField('year', IntegerType(),False),
                        StructField('month', StringType(),False),
                        StructField('day', IntegerType(),False),
                        StructField('weekday', StringType(),False),
                        StructField('hour', IntegerType(),False),
                        StructField('atm_status', StringType(),False),
                        StructField('atm_id', StringType(),False),
                        StructField('atm_manufacturer', StringType(),False),
                        StructField('atm_location', StringType(),False),
                        StructField('atm_streetname', StringType(),False),
                        StructField('atm_street_number', IntegerType(),False),
                        StructField('atm_zipcode', IntegerType(),False),
                        StructField('atm_lat', DoubleType(),False),
                        StructField('atm_lon', DoubleType(),False),
                        StructField('currency', StringType(),False),
                        StructField('card_type', StringType(),False),
                        StructField('transaction_amount', IntegerType(),False), 
                        StructField('service', StringType(),False),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),False),
                        StructField('weather_lon', DoubleType(),False),
                        StructField('weather_city_id', IntegerType(),False),
                        StructField('weather_city_name', StringType(),False), 
                        StructField('temp', DoubleType(),False),
                        StructField('pressure', IntegerType(),False), 
                        StructField('humidity', IntegerType(),False), 
                        StructField('wind_speed', IntegerType(),False), 
                        StructField('wind_deg', IntegerType(),False), 
                        StructField('rain_3h', DoubleType(),True), 
                        StructField('clouds_all', IntegerType(),False), 
                        StructField('weather_id', IntegerType(),False), 
                        StructField('weather_main', StringType(),False), 
                        StructField('weather_description', StringType(),False), 
                        ])

#### Creating a dataframe to read the csv file with schema as the custom schema created above i.e fileSchema

In [74]:
df = spark.read.csv("hdfs:/user/root/etl_assignment", header = False, schema = fileSchema)

In [76]:
### Checking the number of records loaded from HDFS

In [78]:
df.count()    # count is 2468572 which is as expected

2468572

In [80]:
df.printSchema()   #### printSchema() method provides an easily readable view of the DataFrame schema created above

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [82]:
### Check the columns in data frame
df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

### Importing libraries

In [84]:
## Import all sql functions
## monotonically_increasing_id() function generates monotonically increasing 64-bit integers which are unique but not necessary consecutive so combining 
## monotonically_increasing_id() with row_number() will generate the consecutive number
## Import libraries for working with window function

In [86]:
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

In [88]:
from pyspark.sql import functions as sf

In [90]:
#Just above we checked schema after assigning struct types, checked number of rows loaded and column names. 
#Since all of them look expected, lets create DIMensions as per requirement.

In [92]:
#Creation of dimension tables using PySpark

In [94]:
#Command to create a data frame for the dimension according to the target schema(dimension model) provided

### LOCATION Dim
<table>
  <thead>
    <tr>
      <th>Column Name</th>
      <th>Data Type</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>atm_location</td>
      <td>VARCHAR(50)</td>
    </tr>
    <tr>
      <td>atm_streetname</td>
      <td>VARCHAR(255)</td>
    </tr>
<tr>
      <td>atm_streetnumber</td>
      <td>INT</td>
    </tr>
<tr>
      <td>atm_zipcode</td>
      <td>INT</td>
    </tr>
<tr>
      <td>atm_lat</td>
      <td>DECIMAL(10,3)</td>
    </tr>
<tr>
      <td>atm_lon</td>
       <td>DECIMAL(10,3)</td>
    </tr>
<tr>
      <td>PRIMARY KEY</td>
      <td>location_id</td>
    </tr>
  </tbody>
</table>

In [96]:
# Identifying LOCATION DIM From Source File.
temp_dim_loc= df.select('atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon')
#Drop-Duplicates(De-Duplicating) based on Location parameters
DIM_LOCS = temp_dim_loc.dropDuplicates((['atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']))

In [98]:
temp_dim_loc.show()

+------------------+--------------------+-----------------+-----------+-------+-------+
|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+------------------+--------------------+-----------------+-----------+-------+-------+
|        NÃƒÂ¦stved|         Farimagsvej|                8|       4700| 55.233| 11.763|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|
|             Ikast| RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|
|        Svogerslev|        BrÃƒÂ¸nsager|                1|       4000| 55.634| 12.018|
|              Nibe|              Torvet|                1|       9240| 56.983|  9.639|
|        Fredericia|    SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|         Hjallerup|   Hjallerup Centret|               18|       9320| 57.168| 10.148|
|       GlyngÃƒÂ¸re|         FÃƒ

In [100]:
DIM_LOCS.count()   # count is 109 which is as expected


109

In [102]:
#location_id is Primary key for LOCATION Dimension

In [104]:
DIM_LOC = DIM_LOCS.withColumn(
    "location_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [106]:
#Order LOC DIM: Location by Primary Index
DIM_LOC.orderBy('location_id', ascending=True)

DataFrame[atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, location_id: int]

In [108]:
DIM_LOC.show(5)

+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|          1|
|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|          2|
|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|          3|
|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|          4|
|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|          5|
+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows



In [110]:
DIM_LOC= DIM_LOC.select('location_id','atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat',
                       'atm_lon')

In [112]:
DIM_LOC.show(5)

+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
|location_id|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
|          1|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|          2|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          3|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|          4|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|          5|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
only showing top 5 rows



### ATM Dim
<table>
  <thead>
    <tr>
      <th>Column Name</th>
      <th>Data Type</th>
      <th>Comments/Foreign Key</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>atm_id</td>
      <td>INT</td>
    </tr>
    <tr>
      <td>atm_number</td>
      <td>VARCHAR(20)</td>
    </tr>
   <tr>
      <td>atm_manufacturer</td>
      <td>VARCHAR(50)</td>
    </tr>
<tr>
      <td>atm_location_id</td>
      <td>INT</td>
    </tr>
<tr>
      <td>atm_lat</td>
      <td>DECIMAL(10,3)</td>
    </tr>
<tr>
      <td>atm_lon</td>
       <td>DECIMAL(10,3)</td>
    </tr>

<tr>
      <td>PRIMARY KEY</td>
      <td>atm_id</td>
    </tr>
<tr>
      <td>FOREIGN KEY</td>
      <td>atm_location_id</td>
      <td>REFERENCES DIM_LOCATION (location_id)</td>
    </tr>
  </tbody>
</table>

In [114]:
# Identifying ATM DIM From Source File.
#file_indexes = files.withColumnRenamed('atm_id','atm_number')
temp_dim_atm= df.select('atm_lat','atm_lon','atm_id','atm_manufacturer')
#Drop-Duplicates based on ATM parameters
DIM_ATM = temp_dim_atm.dropDuplicates((['atm_id']))

In [116]:
## Creating a temp table to join with location table to copy the atm location.

In [118]:
temp_dim_atm.registerTempTable("atm")
DIM_LOC.registerTempTable("loc")

In [120]:
DIM_ATM_TEMP= spark.sql("select atm.atm_id,atm.atm_manufacturer,loc.*from atm left join loc on atm.atm_lat==loc.atm_lat and atm.atm_lon==loc.atm_lon").distinct()

In [122]:
DIM_ATM_TEMP.count()   # count is 156 which is as expected

156

In [124]:
##creating Primary key for DIM_ATM

In [126]:
DIM_ATM_PK = DIM_ATM_TEMP.withColumn("atm_prim_id",row_number().over(Window.orderBy(monotonically_increasing_id())))

In [128]:
#Order DIM: ATM by Primary Index
DIM_ATM_PK.orderBy('atm_prim_id', ascending=True)

DataFrame[atm_id: string, atm_manufacturer: string, location_id: int, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, atm_prim_id: int]

In [130]:
DIM_ATM_PK.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_prim_id: integer (nullable = true)



In [132]:
DIM_ATM = DIM_ATM_PK.select('atm_prim_id','atm.atm_id','atm_manufacturer','location_id')

In [134]:
DIM_ATM.count()   ###156 which is as expected


156

In [136]:
DIM_ATM.show()

+-----------+------+----------------+-----------+
|atm_prim_id|atm_id|atm_manufacturer|location_id|
+-----------+------+----------------+-----------+
|          1|    35|             NCR|         15|
|          2|    51|             NCR|         65|
|          3|     9| Diebold Nixdorf|         76|
|          4|    62| Diebold Nixdorf|         30|
|          5|    34|             NCR|         43|
|          6|    76|             NCR|         41|
|          7|    49|             NCR|         80|
|          8|    88|             NCR|         66|
|          9|    59| Diebold Nixdorf|         90|
|         10|    93|             NCR|         55|
|         11|    42|             NCR|         31|
|         12|     8|             NCR|         21|
|         13|    30|             NCR|         90|
|         14|    89|             NCR|         88|
|         15|   113| Diebold Nixdorf|         49|
|         16|    79|             NCR|         29|
|         17|   104|             NCR|         68|


###  DATE Dim

<table>
  <thead>
    <tr>
      <th>Column Name</th>
      <th>Data Type</th>
       </tr>
  </thead>
  <tbody>
    <tr>
      <td>date_id</td>
      <td>INT</td>
    </tr>
    <tr>
      <td>full_date_time</td>
      <td>TIMESTAMP</td>
    </tr>
<tr>
      <td>year</td>
      <td>INT</td>
    </tr>
<tr>
      <td>month</td>
      <td>VARCHAR(20)</td>
    </tr>
<tr>
      <td>day</td>
      <td>INT</td>
    </tr>
<tr>
      <td>hour</td>
      <td>INT</td>
    </tr>
<tr>
      <td>weekday</td>
       <td>VARCHAR(20)</td>
    </tr>

<tr>
      <td>PRIMARY KEY</td>
      <td>date_id</td>
    </tr>

  </tbody>
</table>

In [138]:
# Identifying DATE DIM From Source File.
temp_dim_date= df.select('year','month','day','hour','weekday')

In [140]:
###Convertint String Month to Numeric
###pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
date_conv = temp_dim_date.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))

In [142]:
date_conv.show()

+----+-----+---+----+-------+
|year|month|day|hour|weekday|
+----+-----+---+----+-------+
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
+----+-----+---+----+-------+
only showing top 20 rows



In [144]:
#creating a new column "full_date_time" by concatenating columns and adding delimeter using lit('/')

In [146]:
from pyspark.sql import functions as sf
date_conv = date_conv.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))

In [148]:
date_conv.show()

+----+-----+---+----+-------+-----------------+
|year|month|day|hour|weekday|   full_date_time|
+----+-----+---+----+-------+-----------------+
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0

In [150]:
#casting to timestamp
date_conv2 = date_conv.withColumn('full_date_time', unix_timestamp(date_conv['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))

In [152]:
date_conv2.show()   # in timestamp format

+----+-----+---+----+-------+-------------------+
|year|month|day|hour|weekday|     full_date_time|
+----+-----+---+----+-------+-------------------+
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|


In [154]:
date_conv2.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- full_date_time: timestamp (nullable = true)



In [156]:
#De-Duplicating Data based on DATE parameters
DIM_DATE = date_conv2.dropDuplicates((['full_date_time']))
#Order DIM: ATM by Primary Index

In [158]:
DIM_DATE.count()
#Count looks as expected i.e 8685

8685

In [160]:
DIM_DATE.show()

+----+-----+---+----+---------+-------------------+
|year|month|day|hour|  weekday|     full_date_time|
+----+-----+---+----+---------+-------------------+
|2017|   01|  5|   1| Thursday|2017-01-05 01:00:00|
|2017|   01| 11|   8|Wednesday|2017-01-11 08:00:00|
|2017|   01| 21|  22| Saturday|2017-01-21 22:00:00|
|2017|   02|  7|  11|  Tuesday|2017-02-07 11:00:00|
|2017|   02|  9|   7| Thursday|2017-02-09 07:00:00|
|2017|   02|  9|  22| Thursday|2017-02-09 22:00:00|
|2017|   02| 16|   5| Thursday|2017-02-16 05:00:00|
|2017|   02| 23|  18| Thursday|2017-02-23 18:00:00|
|2017|   02| 25|  11| Saturday|2017-02-25 11:00:00|
|2017|   02| 26|  13|   Sunday|2017-02-26 13:00:00|
|2017|   03| 12|  13|   Sunday|2017-03-12 13:00:00|
|2017|   04|  1|   8| Saturday|2017-04-01 08:00:00|
|2017|   04|  6|  14| Thursday|2017-04-06 14:00:00|
|2017|   04| 21|  17|   Friday|2017-04-21 17:00:00|
|2017|   05|  6|  15| Saturday|2017-05-06 15:00:00|
|2017|   05|  9|  14|  Tuesday|2017-05-09 14:00:00|
|2017|   05|

In [162]:
#Adding primary key
DIM_DATE = DIM_DATE.withColumn(
    "date_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [164]:
DIM_DATE.count()

8685

In [165]:
DIM_DATE.show()

+----+-----+---+----+---------+-------------------+-------+
|year|month|day|hour|  weekday|     full_date_time|date_id|
+----+-----+---+----+---------+-------------------+-------+
|2017|   01|  5|   1| Thursday|2017-01-05 01:00:00|      1|
|2017|   01| 11|   8|Wednesday|2017-01-11 08:00:00|      2|
|2017|   01| 21|  22| Saturday|2017-01-21 22:00:00|      3|
|2017|   02|  7|  11|  Tuesday|2017-02-07 11:00:00|      4|
|2017|   02|  9|   7| Thursday|2017-02-09 07:00:00|      5|
|2017|   02|  9|  22| Thursday|2017-02-09 22:00:00|      6|
|2017|   02| 16|   5| Thursday|2017-02-16 05:00:00|      7|
|2017|   02| 23|  18| Thursday|2017-02-23 18:00:00|      8|
|2017|   02| 25|  11| Saturday|2017-02-25 11:00:00|      9|
|2017|   02| 26|  13|   Sunday|2017-02-26 13:00:00|     10|
|2017|   03| 12|  13|   Sunday|2017-03-12 13:00:00|     11|
|2017|   04|  1|   8| Saturday|2017-04-01 08:00:00|     12|
|2017|   04|  6|  14| Thursday|2017-04-06 14:00:00|     13|
|2017|   04| 21|  17|   Friday|2017-04-2

### Creating CARD Dim

<table>
  <thead>
    <tr>
      <th>Column Name</th>
      <th>Data Type</th>
       </tr>
  </thead>
  <tbody>
    <tr>
      <td>card_type_id</td>
      <td>INT</td>
    </tr>
    <tr>
      <td>card_type</td>
      <td>VARCHAR(20)</td>
    </tr>
<tr>
      <td>PRIMARY KEY</td>
      <td>card_type_id</td>
    </tr>


  </tbody>
</table>

In [166]:
# Identifying CARD TYPE DIM From Source File.
temp_dim_card= df.select('card_type')
#De-Duplicating Data based on DATE parameters
DIM_CARD = temp_dim_card.dropDuplicates((['card_type']))

In [167]:
DIM_CARD = DIM_CARD.withColumn(
    "card_type_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [168]:
DIM_CARD.orderBy('card_type_id', ascending=True)

DataFrame[card_type: string, card_type_id: int]

In [169]:
DIM_CARD.count()   #count 12

12

In [170]:
DIM_CARD.show()

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           1|
|              CIRRUS|           2|
|         HÃƒÂ¦vekort|           3|
|                VISA|           4|
|  Mastercard - on-us|           5|
|             Maestro|           6|
|Visa Dankort - on-us|           7|
|        Visa Dankort|           8|
|            VisaPlus|           9|
|          MasterCard|          10|
|             Dankort|          11|
| HÃƒÂ¦vekort - on-us|          12|
+--------------------+------------+



#### **Steps for having the Primary keys in the Fact Table**

* Used appropriate joins
* Rearranged fields when necessary
* created few temp columns and later deleted
* checked the record counts in each step to make sure there is no data discrepency


In [171]:
#left outer join on df and DIM_ATM_TEMP (which has the join for location and ATM)
df1 = df.join(DIM_ATM_TEMP,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')

In [172]:
df1.printSchema()  ## contains location_id 

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [173]:
df1.count()  # count is as expected

2468572

In [174]:
df1.filter(df1.location_id.isNotNull()).count()   ## count is as expected 2468572

2468572

In [175]:
## Renaming as there is clash on location id where cols from ATM location also gets copied during join. 
## These renamed cols will be retained and reverted to previous values while joined table cols will be deleted. 


In [176]:
df1 = df1.withColumnRenamed('location_id','tlocation_id')

In [177]:
df2 = df1.join(DIM_ATM_PK,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')

In [178]:
df2.printSchema()  #contains location_id and atm_prim_id

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [179]:
df2.count()  ## count is 2468572

2468572

In [180]:
df2.filter(df2.atm_prim_id.isNotNull()).count()  ## count as expected

2468572

In [181]:
df2 = df2.drop('tlocation_id')   ### drop the column as not required
df2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [182]:
## joining Primary key for card type 

In [183]:
df3 = df2.withColumnRenamed('card_type','card_types')


In [184]:
df_index = df3.join(DIM_CARD, df3.card_types == DIM_CARD.card_type,how='LEFT')  

In [185]:
df_index.printSchema()  #contains location_id and atm_prim_id and card_type_id

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_types: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

In [186]:
df_index.select('year', 'month', 'card_type', 'card_type_id','card_types').show(5)

+----+-------+---------------+------------+---------------+
|year|  month|      card_type|card_type_id|     card_types|
+----+-------+---------------+------------+---------------+
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
+----+-------+---------------+------------+---------------+
only showing top 5 rows



In [187]:
df_index = df_index.drop('card_type')

In [188]:
df_index= df_index.withColumnRenamed('card_types','card_type')

In [189]:
df_index.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [190]:
## Checking point to make sure no previous data is disturbed.
DIM_LOC.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)



In [191]:
DIM_LOC.count()  ## 109 as expected

109

In [192]:
##### joining PK for date dimension

In [193]:
df_index2 = df_index.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))

In [194]:
df_index2 = df_index2.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))

In [195]:
df_index2 = df_index2.withColumn('full_date_time', unix_timestamp(df_index2['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))

In [196]:
df_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [197]:
#### Renaming only for convenience and will be deleted sooner after purpose is sevred
df_index2 = df_index2.withColumnRenamed('year','ryear')
df_index2 = df_index2.withColumnRenamed('month','rmonth')
df_index2 = df_index2.withColumnRenamed('day','rday')
df_index2 = df_index2.withColumnRenamed('weekday','rweekday')
df_index2 = df_index2.withColumnRenamed('hour','rhour')
df_index2 = df_index2.withColumnRenamed('full_date_time','rfull_date_time')

In [198]:
df_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [199]:
df_index2 = df_index2.join(DIM_DATE, df_index2.rfull_date_time == DIM_DATE.full_date_time,how='LEFT') 

In [200]:
df_index2.printSchema()  ## contains all the primary key for dimension table i.e location_id,atm_prim_id,card_type_id and date_id

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [201]:
df_index2.count()  ## count as expected

2468572

In [202]:
df_index2 = df_index2.drop('year','month','day','weekday','hour','full_date_time')

In [203]:
df_index2 = df_index2.withColumnRenamed('ryear','year')
df_index2 = df_index2.withColumnRenamed('rmonth','month')
df_index2 = df_index2.withColumnRenamed('rday','day')
df_index2 = df_index2.withColumnRenamed('rweekday','weekday')
df_index2 = df_index2.withColumnRenamed('rhour','hour')
df_index2 = df_index2.withColumnRenamed('rfull_date_time','full_date_time')

In [204]:
df_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [205]:
df_index2 = df_index2.drop('full_date_time')

In [206]:
df_index2.printSchema()   ## Final schema looks as below with all columns for FACT table creation

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

### Creating Fact table

<table>
  <thead>
    <tr>
      <th>Column Name</th>
      <th>Data Type</th>
       </tr>
  </thead>
  <tbody>
    <tr>
      <td>trans_id</td>
      <td>INT</td>
    </tr>
    <tr>
      <td>atm_id</td>
      <td>INT</td>
    </tr>
<tr>
      <td>weather_loc_id</td>
      <td>INT</td>
    </tr>
    <tr>
      <td>date_id</td>
      <td>INT</td>
    </tr>
      <tr>
      <td>card_type_id</td>
      <td>INT</td>
    </tr>


  </tbody>
</table>

In [207]:
### Primary key trans_id for fact table

In [208]:
df_index2 = df_index2.withColumn(
    "trans_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [209]:
FACT_ATM_TRANS = df_index2.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text',
                                     'rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [210]:
FACT_ATM_TRANS.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_prim_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



In [211]:
### All rows loaded as expected and count is same

In [212]:
FACT_ATM_TRANS.count()  #2468572

2468572

In [213]:
FACT_ATM_TRANS.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status').show(10)

+--------+-----------+-----------+-------+------------+----------+
|trans_id|atm_prim_id|location_id|date_id|card_type_id|atm_status|
+--------+-----------+-----------+-------+------------+----------+
|       1|        150|         98|      1|          10|    Active|
|       2|        113|         16|      1|          10|    Active|
|       3|         31|         55|      2|           1|    Active|
|       4|          8|         66|      2|           1|  Inactive|
|       5|         62|          6|      2|           1|    Active|
|       6|         36|          9|      2|           1|    Active|
|       7|         36|          9|      2|           1|    Active|
|       8|        135|         75|      2|           1|    Active|
|       9|        127|         32|      2|           3|    Active|
|      10|         57|         27|      2|           4|    Active|
+--------+-----------+-----------+-------+------------+----------+
only showing top 10 rows



In [214]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.atm_prim_id.isNotNull()).count()   #2468572

2468572

In [215]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.location_id.isNotNull()).count()   #2468572

2468572

In [216]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.date_id.isNotNull()).count()     #2468572

2468572

In [217]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.card_type_id.isNotNull()).count()   #2468572

2468572

#### Copying FACT & DIMS to S3 bucket "redshiftpiyushi" and folder ETL

**Steps followed**
1. create s3 bucket and folder in dashboard
2. Setting spark.hadoop.fs.s3a.access.key and spark.hadoop.fs.s3a.secret.key in spark-defaults.conf
3. write the fact and dimension table under s3 bucket/Folder created for this purpose



**df.write.save("s3a//<redshiftbucket/folder/tablename>",format='csv',header='true')**

In [219]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIA2O332MRSX5WUWE4Q")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

In [220]:
DIM_LOC.write.save("s3a://redshiftpiyushi/ETL/DIM_LOC",format='csv',header='true')

In [221]:
DIM_ATM.write.save("s3a://redshiftpiyushi/ETL/DIM_ATM",format='csv',header='true')

In [222]:
DIM_DATE.write.save("s3a://redshiftpiyushi/ETL/DIM_DATE",format='csv',header='true')

In [227]:
DIM_CARD.write.save("s3a://redshiftpiyushi/ETL/DIM_CARD",format='csv',header='true')

In [226]:
FACT_ATM_TRANS.write.save("s3a://redshiftpiyushi/ETL/FACT_ATM_TRANS",format='csv',header='true')

#### After this we are going to check the s3 bucket for the files to be created and then create the tables in redshift and load this data in those tables for analytical queries
