## ETL Project - Rishabh Gupta

#### Steps followed in this notebook are to import spark and then load data from a HDFS location into a spark dataframe and then transform data to create a dimensional data model by creating 4 dimensional tables and a fact table, post which we write these tables in a S3 location

In [1]:
#Importing spark and java and cloudera manager

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
##Checking spark context and whether spark is running fine

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("jupyter_Spark").setMaster("yarn-client")
sc = SparkContext(conf=conf)
sc


In [3]:
#Importing Spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETLproject').master("local").getOrCreate()
spark


#### Now that we have imported spark and tested connection, we'll go ahead and load our data in the required data types before transforming that into specific tables

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [5]:
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True), 
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True), 
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_degree', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True),
                         
                        ])

In [6]:
###Reading file into a dataframe

file1 = spark.read.csv("/user/root/atm_data/part-m-00000", header = True, schema = fileSchema)

In [7]:
##Print current schema 
file1.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [9]:
file1.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+-----------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_degree|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+--------

In [8]:
##Checking preliminary count of data
file1.count()

2468571

#### Here we find that the count comes to be 1 less than the validation count, this is because we are including Header = true clause while reading the data frame, it is doen specifically beacause it'll help having Headers in future transformations

#### And for the sake of simplicity we take the count to be 2468571

In [9]:
file1.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_degree',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [10]:
file2 = file1.select('year','month','day','weekday','hour')

In [11]:
##Importing functions from pyspark.sql
from pyspark.sql import functions as sf


#### Now we go ahead and start our transformation on the data to create DIM_DATE first and for that we need a time stamp column and then a date_id which is more like a row_number and will work as a primary key

In [13]:
##Converting month names to month numbers to get a datatime field
file2 = file2.withColumn("month_num",sf.from_unixtime(sf.unix_timestamp(sf.col("month"),'MMMM'),'MM'))
file2.show(5)

+----+-------+---+-------+----+---------+
|year|  month|day|weekday|hour|month_num|
+----+-------+---+-------+----+---------+
|2017|January|  1| Sunday|   0|       01|
|2017|January|  1| Sunday|   0|       01|
|2017|January|  1| Sunday|   0|       01|
|2017|January|  1| Sunday|   0|       01|
|2017|January|  1| Sunday|   0|       01|
+----+-------+---+-------+----+---------+
only showing top 5 rows



#### Joining the day, month_num, year, hour to create a datetime field which afterward we'll change to timestamp data type

In [14]:
file2 = file2.withColumn('date', sf.concat(sf.col('day'),sf.lit(' '), sf.col('month_num'),sf.lit(' '), sf.col('year'),sf.lit(' '), sf.col('hour')))
file2.show(5)


+----+-------+---+-------+----+---------+-----------+
|year|  month|day|weekday|hour|month_num|       date|
+----+-------+---+-------+----+---------+-----------+
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|
+----+-------+---+-------+----+---------+-----------+
only showing top 5 rows



In [16]:
file2.printSchema

<bound method DataFrame.printSchema of DataFrame[year: int, month: string, day: int, weekday: string, hour: int, month_num: string, date: string]>

In [15]:
### Converting string datetime to timestamp

pattern1 = 'dd MM yyyy HH'
file3 = file2.withColumn('full_date_time', sf.unix_timestamp(file2['date'], pattern1).cast('timestamp'))
file3.show(5)

+----+-------+---+-------+----+---------+-----------+-------------------+
|year|  month|day|weekday|hour|month_num|       date|     full_date_time|
+----+-------+---+-------+----+---------+-----------+-------------------+
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
+----+-------+---+-------+----+---------+-----------+-------------------+
only showing top 5 rows



In [16]:
### Checking the distinct records in our dataframe for DIM_DATE

file3 = file3.distinct()
file3.count()

8685

In [17]:
###Order by on the full_date_time column would help us do a date_id and help in transformations

file3 = file3.orderBy('full_date_time')
file3.show(5)

+----+-------+---+-------+----+---------+-----------+-------------------+
|year|  month|day|weekday|hour|month_num|       date|     full_date_time|
+----+-------+---+-------+----+---------+-----------+-------------------+
|2017|January|  1| Sunday|   0|       01|1 01 2017 0|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   1|       01|1 01 2017 1|2017-01-01 01:00:00|
|2017|January|  1| Sunday|   2|       01|1 01 2017 2|2017-01-01 02:00:00|
|2017|January|  1| Sunday|   3|       01|1 01 2017 3|2017-01-01 03:00:00|
|2017|January|  1| Sunday|   4|       01|1 01 2017 4|2017-01-01 04:00:00|
+----+-------+---+-------+----+---------+-----------+-------------------+
only showing top 5 rows



#### We now create a surrogate key of row numbers which will act as a primary key. We use monotonically_increasing_id() function to do this and this has been the approach for all the primary key ID columns in all future tables

In [20]:
file3=file3.coalesce(1);
file4 = file3.withColumn('id',sf.monotonically_increasing_id())
file4 = file4.withColumn('date_id', sf.col('id')+1)
file4.show()

+----+-------+---+-------+----+---------+------------+-------------------+---+-------+
|year|  month|day|weekday|hour|month_num|        date|     full_date_time| id|date_id|
+----+-------+---+-------+----+---------+------------+-------------------+---+-------+
|2017|January|  1| Sunday|   0|       01| 1 01 2017 0|2017-01-01 00:00:00|  0|      1|
|2017|January|  1| Sunday|   1|       01| 1 01 2017 1|2017-01-01 01:00:00|  1|      2|
|2017|January|  1| Sunday|   2|       01| 1 01 2017 2|2017-01-01 02:00:00|  2|      3|
|2017|January|  1| Sunday|   3|       01| 1 01 2017 3|2017-01-01 03:00:00|  3|      4|
|2017|January|  1| Sunday|   4|       01| 1 01 2017 4|2017-01-01 04:00:00|  4|      5|
|2017|January|  1| Sunday|   5|       01| 1 01 2017 5|2017-01-01 05:00:00|  5|      6|
|2017|January|  1| Sunday|   6|       01| 1 01 2017 6|2017-01-01 06:00:00|  6|      7|
|2017|January|  1| Sunday|   7|       01| 1 01 2017 7|2017-01-01 07:00:00|  7|      8|
|2017|January|  1| Sunday|   8|       01| 1

In [21]:
###Moving the required columsn for DIm_DATE to datdf dataframe
datedf = file4.select('date_id','full_date_time','year','month','day','hour','weekday')
datedf.show()

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
|      6|2017-01-01 05:00:00|2017|January|  1|   5| Sunday|
|      7|2017-01-01 06:00:00|2017|January|  1|   6| Sunday|
|      8|2017-01-01 07:00:00|2017|January|  1|   7| Sunday|
|      9|2017-01-01 08:00:00|2017|January|  1|   8| Sunday|
|     10|2017-01-01 09:00:00|2017|January|  1|   9| Sunday|
|     11|2017-01-01 10:00:00|2017|January|  1|  10| Sunday|
|     12|2017-01-01 11:00:00|2017|January|  1|  11| Sunday|
|     13|2017-01-01 12:00:00|2017|January|  1|  12| Sunday|
|     14|2017-01-01 13:00:00|2017|Januar

In [22]:
#Counting the number of records to check with the validations given for the assignment
datedf.count()

8685

#### So the count of records for DIM_DATE matches with the number given in the validation doc

In [23]:
datedf.printSchema

<bound method DataFrame.printSchema of DataFrame[date_id: bigint, full_date_time: timestamp, year: int, month: string, day: int, hour: int, weekday: string]>

In [24]:
## We relized that dat_id is bigint and wwe want it as integer in our target dimensional model so we change the type to integer
datedf = datedf.withColumn("date_id", datedf["date_id"].cast(IntegerType()))
datedf.printSchema

<bound method DataFrame.printSchema of DataFrame[date_id: int, full_date_time: timestamp, year: int, month: string, day: int, hour: int, weekday: string]>

#### Now that we have done our transformation and created our table, we'll write the table in our S3 location

In [25]:
### To write to our S3 location, \
### we need to send the access_key and secret_access_key to spark context so that we can write without access issues 

#Setting S3 credentials
import configparser

config = configparser.ConfigParser()
#Read the credentials file
config.read(os.path.expanduser("~/.aws/credentials"))

#get sccess keys
access_id = config.get("default","aws_access_key_id") 
secret_access_key = config.get("default", "aws_secret_access_key")

#Set keys into context
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", access_id)
hadoop_conf.set("fs.s3n.awsSecretAccessKey",  secret_access_key)

In [27]:
datedf.write.csv("s3n://myetlproject/DIM_DATE",mode="overwrite")

#### Moving on to DIM_CARD_TYPE table now

In [28]:
file10  = file1.select('card_type').distinct()
file10.show()

+--------------------+
|           card_type|
+--------------------+
|     Dankort - on-us|
|              CIRRUS|
|         HÃƒÂ¦vekort|
|                VISA|
|  Mastercard - on-us|
|             Maestro|
|Visa Dankort - on-us|
|        Visa Dankort|
|            VisaPlus|
|          MasterCard|
|             Dankort|
| HÃƒÂ¦vekort - on-us|
+--------------------+



In [29]:
#Creating ID field as primary key using the same method

file10=file10.coalesce(1);
file11 = file10.withColumn('id',sf.monotonically_increasing_id())
file11 = file11.withColumn('card_type_id', sf.col('id')+1)
file11.show()

+--------------------+---+------------+
|           card_type| id|card_type_id|
+--------------------+---+------------+
|     Dankort - on-us|  0|           1|
|              CIRRUS|  1|           2|
|         HÃƒÂ¦vekort|  2|           3|
|                VISA|  3|           4|
|  Mastercard - on-us|  4|           5|
|             Maestro|  5|           6|
|Visa Dankort - on-us|  6|           7|
|        Visa Dankort|  7|           8|
|            VisaPlus|  8|           9|
|          MasterCard|  9|          10|
|             Dankort| 10|          11|
| HÃƒÂ¦vekort - on-us| 11|          12|
+--------------------+---+------------+



In [30]:
file11.printSchema

<bound method DataFrame.printSchema of DataFrame[card_type: string, id: bigint, card_type_id: bigint]>

In [31]:
###Converting to Integer type from big int
file11 = file11.withColumn("card_type_id", file11["card_type_id"].cast(IntegerType()))
file11.printSchema

<bound method DataFrame.printSchema of DataFrame[card_type: string, id: bigint, card_type_id: int]>

In [32]:
card_type_df = file11.select('card_type_id','card_type')
card_type_df.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [33]:
#Checking count for validation 
card_type_df.count()

12

#### We notice that validation count matches, so far so good

In [35]:
card_type_df.write.csv("s3n://myetlproject/DIM_CARD_TYPE",mode="overwrite")

#### Moving on to DIM_LOCATION table

In [36]:
file12 = file1.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon',)
file12 = file12.distinct()
file12.show()

+--------------------+----------------+-----------------+-----------+-------+-------+
|        atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+--------------------+----------------+-----------------+-----------+-------+-------+
|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|   HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|
|                 Fur|      StenÃƒÂ¸re|               19|       7884| 56.805|   9.02|
|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|
|            Hasseris|     Hasserisvej|              1

In [39]:
###Renaming column names based on target dimensional table

file12 = file12.withColumnRenamed("atm_location", "location")\
        .withColumnRenamed("atm_streetname", "streetname")\
        .withColumnRenamed("atm_street_number", "street_number")\
        .withColumnRenamed("atm_zipcode", "zipcode")\
        .withColumnRenamed("atm_lat", "lat")\
        .withColumnRenamed("atm_lon", "lon")\


In [40]:
file12.show(5)

+-----------------+----------------+-------------+-------+------+------+
|         location|      streetname|street_number|zipcode|   lat|   lon|
+-----------------+----------------+-------------+-------+------+------+
|            Vadum|  Ellehammersvej|           43|   9430|57.118| 9.861|
|         Slagelse| Mariendals Alle|           29|   4200|55.398|11.342|
|       Fredericia|SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|
|          Kolding|        Vejlevej|          135|   6000|55.505| 9.457|
|HÃƒÂ¸rning Hallen|        Toftevej|           53|   8362|56.091|10.033|
+-----------------+----------------+-------------+-------+------+------+
only showing top 5 rows



In [41]:
##Creating ID column
file12=file12.coalesce(1);
file13 = file12.withColumn('id',sf.monotonically_increasing_id())
file13 = file13.withColumn('location_id', sf.col('id')+1)
file13.show()

+--------------------+----------------+-------------+-------+------+------+---+-----------+
|            location|      streetname|street_number|zipcode|   lat|   lon| id|location_id|
+--------------------+----------------+-------------+-------+------+------+---+-----------+
|               Vadum|  Ellehammersvej|           43|   9430|57.118| 9.861|  0|          1|
|            Slagelse| Mariendals Alle|           29|   4200|55.398|11.342|  1|          2|
|          Fredericia|SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|  2|          3|
|             Kolding|        Vejlevej|          135|   6000|55.505| 9.457|  3|          4|
|   HÃƒÂ¸rning Hallen|        Toftevej|           53|   8362|56.091|10.033|  4|          5|
|                Aars| Himmerlandsgade|           70|   9600|56.803| 9.518|  5|          6|
|     Aarhus Lufthavn| Ny Lufthavnsvej|           24|   8560|56.308|10.627|  6|          7|
|                 Fur|      StenÃƒÂ¸re|           19|   7884|56.805|  9.02|  7| 

In [42]:
file13 = file13.withColumn("location_id", file13["location_id"].cast(IntegerType()))
file13.printSchema

<bound method DataFrame.printSchema of DataFrame[location: string, streetname: string, street_number: int, zipcode: int, lat: double, lon: double, id: bigint, location_id: int]>

In [43]:
location_df = file13.select('location_id','location','streetname','street_number','zipcode','lat','lon')
location_df.show(5)

+-----------+-----------------+----------------+-------------+-------+------+------+
|location_id|         location|      streetname|street_number|zipcode|   lat|   lon|
+-----------+-----------------+----------------+-------------+-------+------+------+
|          1|            Vadum|  Ellehammersvej|           43|   9430|57.118| 9.861|
|          2|         Slagelse| Mariendals Alle|           29|   4200|55.398|11.342|
|          3|       Fredericia|SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|
|          4|          Kolding|        Vejlevej|          135|   6000|55.505| 9.457|
|          5|HÃƒÂ¸rning Hallen|        Toftevej|           53|   8362|56.091|10.033|
+-----------+-----------------+----------------+-------------+-------+------+------+
only showing top 5 rows



In [44]:
###Checking final count of the records

location_df.count()

109

#### Our validation count matches

In [46]:
location_df.write.csv("s3n://myetlproject/DIM_LOCATION",mode="overwrite")

#### Moving on to DIM_ATM

In [48]:
file1.select('atm_id','atm_manufacturer','atm_lat','atm_lon').distinct().count()

113

In [49]:
### Joining with location table to get location ID into atm table

atm_f = file1.select('atm_id','atm_manufacturer','atm_lat','atm_lon')
left_join = atm_f.join(location_df, (atm_f.atm_lat == location_df.lat) & (atm_f.atm_lon == location_df.lon) ,how='left') # Could also use 'left_outer'
left_join.show(5)

+------+----------------+-------+-------+-----------+-----------+----------+-------------+-------+------+-----+
|atm_id|atm_manufacturer|atm_lat|atm_lon|location_id|   location|streetname|street_number|zipcode|   lat|  lon|
+------+----------------+-------+-------+-----------+-----------+----------+-------------+-------+------+-----+
|   109| Diebold Nixdorf| 57.005|  9.881|         35|Aalborg Syd|  Hobrovej|          440|   9200|57.005|9.881|
|   109| Diebold Nixdorf| 57.005|  9.881|         35|Aalborg Syd|  Hobrovej|          440|   9200|57.005|9.881|
|   109| Diebold Nixdorf| 57.005|  9.881|         35|Aalborg Syd|  Hobrovej|          440|   9200|57.005|9.881|
|   109| Diebold Nixdorf| 57.005|  9.881|         35|Aalborg Syd|  Hobrovej|          440|   9200|57.005|9.881|
|   109| Diebold Nixdorf| 57.005|  9.881|         35|Aalborg Syd|  Hobrovej|          440|   9200|57.005|9.881|
+------+----------------+-------+-------+-----------+-----------+----------+-------------+-------+------

In [50]:
atm_f3 = left_join.select('atm_id','atm_manufacturer','location_id')
atm_f3.distinct().count()

156

In [51]:
atm_f3 = atm_f3.distinct()
atm_f3.show(5)

+------+----------------+-----------+
|atm_id|atm_manufacturer|location_id|
+------+----------------+-----------+
|     6|             NCR|          3|
|   104|             NCR|         25|
|    62| Diebold Nixdorf|         30|
|    45|             NCR|         22|
|    95|             NCR|         10|
+------+----------------+-----------+
only showing top 5 rows



In [52]:
###Renaming atm table names based on target schema

atm_f3 = atm_f3.withColumnRenamed("atm_id", "atm_number")\
        .withColumnRenamed("location_id", "atm_location_id")
atm_f3.show(5)

+----------+----------------+---------------+
|atm_number|atm_manufacturer|atm_location_id|
+----------+----------------+---------------+
|       104|             NCR|             25|
|         6|             NCR|              3|
|        95|             NCR|             10|
|        62| Diebold Nixdorf|             30|
|        25| Diebold Nixdorf|             38|
+----------+----------------+---------------+
only showing top 5 rows



In [53]:
###Including ID column for the primary key

atm_f3=atm_f3.coalesce(1);
atm_f4 = atm_f3.withColumn('id',sf.monotonically_increasing_id())
atm_f4 = atm_f4.withColumn('atm_id', sf.col('id')+1)
atm_f4.show()

+----------+----------------+---------------+---+------+
|atm_number|atm_manufacturer|atm_location_id| id|atm_id|
+----------+----------------+---------------+---+------+
|         6|             NCR|              3|  0|     1|
|       104|             NCR|             25|  1|     2|
|        62| Diebold Nixdorf|             30|  2|     3|
|        95|             NCR|             10|  3|     4|
|        25| Diebold Nixdorf|             38|  4|     5|
|        25| Diebold Nixdorf|             94|  5|     6|
|        40| Diebold Nixdorf|             88|  6|     7|
|        51|             NCR|             65|  7|     8|
|        56| Diebold Nixdorf|             61|  8|     9|
|        16|             NCR|             34|  9|    10|
|        91|             NCR|             18| 10|    11|
|       110| Diebold Nixdorf|             37| 11|    12|
|         9| Diebold Nixdorf|             77| 12|    13|
|        82|             NCR|             79| 13|    14|
|        97|             NCR|  

In [54]:
atm_f4.printSchema()

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)
 |-- id: long (nullable = false)
 |-- atm_id: long (nullable = false)



In [55]:
atm_df = atm_f4.select('atm_id','atm_number','atm_manufacturer','atm_location_id')
atm_df.show(5)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|       104|             NCR|             25|
|     2|         6|             NCR|              3|
|     3|        62| Diebold Nixdorf|             30|
|     4|        95|             NCR|             10|
|     5|        25| Diebold Nixdorf|             38|
+------+----------+----------------+---------------+
only showing top 5 rows



In [56]:
atm_df = atm_df.withColumn("atm_id", atm_df["atm_id"].cast(IntegerType()))
atm_df.printSchema()

root
 |-- atm_id: integer (nullable = false)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)



In [57]:
##Checking count for validation purposes
atm_df.count()

156

#### Validation check for this table passed

In [58]:
atm_df.write.csv("s3n://myetlproject/DIM_ATM",mode="overwrite")

#### Moving to FACT_ATM_TRANS table

#### We need to do a couple of joins here to get foreign key fields like card_type_id, date_id, weather_loc_id

In [59]:
### Selecting important columns

fact_f = file1.select('atm_status','currency','service','transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description','atm_id','atm_location', \
                      'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','card_type','year','month','day','hour')

In [60]:
##Getting card_type_id and then checking count to maintain the count of the table
fact_f_join = fact_f.join(card_type_df, fact_f.card_type == card_type_df.card_type , how = 'left') 
                            
fact_f_join.count()

2468571

In [61]:
fact_f = fact_f_join.select('atm_status','currency','service','transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description','atm_id','atm_location', \
                      'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','card_type_id','year','month','day','hour')

In [62]:
### Joining with DIM_DATE to get date_id
fact_f_join10 = fact_f.join(datedf, (fact_f.year == datedf.year) & (fact_f.month == datedf.month) & \
                            (fact_f.day == datedf.day) & (fact_f.hour == datedf.hour), how = 'left') 

fact_f_join10.count()

2468571

In [63]:
fact_f = fact_f_join10.select('atm_status','currency','service','transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description','atm_id','atm_location', \
                      'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','card_type_id','date_id')

In [64]:
### Join with location table to get location Id which will be weather_loc_id

fact_f_join1 = fact_f.join(location_df, (fact_f.atm_lon  == location_df.lon) & \
                           (fact_f.atm_lat == location_df.lat) & (fact_f.atm_location == location_df.location) &\
                           (fact_f.atm_streetname == location_df.streetname) & (fact_f.atm_street_number == location_df.street_number) & \
                           (fact_f.atm_zipcode == location_df.zipcode) , how = 'left') 
                            
fact_f_join1.count()

2468571

In [65]:
### Choosing appropriate columns required for target dataframe

fact_f_join1 = fact_f_join1.select('atm_status','currency','service','transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description','atm_id','location_id', 'card_type_id','date_id')


In [66]:
## Renaming fields

fact_f_join1 = fact_f_join1.withColumnRenamed("atm_id", "atm_num")\
        .withColumnRenamed("location_id", "weather_loc_id")


In [67]:
### Joining with DIM_ATM to get ATM_ID

fact_f_join2 = fact_f_join1.join(atm_df, (fact_f_join1.atm_num == atm_df.atm_number) & \
                                 (fact_f_join1.weather_loc_id == atm_df.atm_location_id), how = 'left') 
                            
fact_f_join2.count()

2468571

In [68]:
fact_f_join2 = fact_f_join2.select ('atm_status','currency','service','transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description','atm_number','weather_loc_id','atm_id','card_type_id','date_id')

In [69]:
fact_tran_df  = fact_f_join2.select('atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service',\
                                    'transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description')


In [70]:
###Creating a primary key TRANS_ID

fact_tran_df=fact_tran_df.coalesce(1);
fact_atm_tran_df = fact_tran_df.withColumn('id',sf.monotonically_increasing_id())
fact_atm_tran_df = fact_atm_tran_df.withColumn('trans_id', sf.col('id')+1)
fact_atm_tran_df = fact_atm_tran_df.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service',\
                                    'transaction_amount','message_code','message_text', 'rain_3h',\
                      'clouds_all','weather_id','weather_main','weather_description')

In [71]:
fact_atm_tran_df1 = fact_atm_tran_df.select('trans_id',  'atm_id', 'weather_loc_id','date_id' ,'card_type_id','atm_status',
                                            'clouds_all', 'weather_id','weather_main','weather_description','transaction_amount',
                                           'message_code', 'message_text', 'currency','service',)

In [72]:
fact_atm_tran_df1.write.csv("s3n://myetlproject/FACT_ATM_TYPE",mode="overwrite")

#### Since the count is matching the validation is also passed and we have written the tables into 5 different folders in the S3 location