### ETL Project 
#### By Sowkhya Reddy Geeda

In [1]:
import os
import sys
os.environ[ "PYSPARK_PYTHON" ] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ[ "JAVA_HOME" ] = "/usr/java/jdk1.8.0_161/jre"
os.environ[ "SPARK_HOME" ] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ[ "PYLIB" ] = os.environ[ "SPARK_HOME" ] + "/python/lib"
sys.path.insert( 0 , os.environ[ "PYLIB" ] + "/py4j-0.10.6-src.zip" )
sys.path.insert( 0 , os.environ[ "PYLIB" ] + "/pyspark.zip" )

In [2]:
# Import Spark Session
from pyspark.sql import SparkSession

In [3]:
## Spark Session Creation
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark

In [4]:
## Import necessary Library to define various data types for Schema Creation,
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [5]:
## Import necessary Library for customized date field creation,
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import *

In [6]:
## Schema Preparation.
fileSchema = StructType([StructField('year', IntegerType(),True),
                         StructField('month', StringType(),True), 
                         StructField('day', IntegerType(),True), 
                         StructField('weekday', StringType(),True), 
                         StructField('hour', IntegerType(),True), 
                         StructField('atm_status', StringType(),True), 
                         StructField('atm_id', StringType(),True), 
                         StructField('atm_manufacturer', StringType(),True), 
                         StructField('atm_location', StringType(),True), 
                         StructField('atm_streetname', StringType(),True), 
                         StructField('atm_street_number', IntegerType(),True), 
                         StructField('atm_zipcode', IntegerType(),True), 
                         StructField('atm_lat', DoubleType(),True), 
                         StructField('atm_lon', DoubleType(),True), 
                         StructField('currency', StringType(),True), 
                         StructField('card_type', StringType(),True), 
                         StructField('transaction_amount', IntegerType(),True), 
                         StructField('service', StringType(),True), 
                         StructField('message_code', StringType(),True), 
                         StructField('message_text', StringType(),True), 
                         StructField('weather_lat', DoubleType(),True), 
                         StructField('weather_lon', DoubleType(),True), 
                         StructField('weather_city_id', IntegerType(),True), 
                         StructField('weather_city_name', StringType(),True), 
                         StructField('temp', DoubleType(),True), 
                         StructField('pressure', IntegerType(),True), 
                         StructField('humidity', IntegerType(),True), 
                         StructField('wind_speed', IntegerType(),True), 
                         StructField('wind_deg', IntegerType(),True), 
                         StructField('rain_3h', DoubleType(),True), 
                         StructField('clouds_all', IntegerType(),True), 
                         StructField('weather_id', IntegerType(),True), 
                         StructField('weather_main', StringType(),True), 
                         StructField('weather_description', StringType(),True),
])

In [7]:
## Read Data from HDFS
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000",sep=",",  header="false",  schema = fileSchema)

In [8]:
# Checking the Count
df.count()

#### Check1: Total Row Count Matches with Validation Sheet

In [9]:
## Printing Schema
df.printSchema()

### Location Dimension Table

In [10]:
## Create Location Dimension Dataframe
DIM_LOCATION = df.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()

In [11]:
# Show the recods.
DIM_LOCATION.show()

In [12]:
## No of distinct records present in the location Dimenstion dataframe.
DIM_LOCATION.count()

In [13]:
## Import necessary library to generate surrogate Key
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number

In [14]:
## Check a column having any NULL value or not
DIM_LOCATION.filter(DIM_LOCATION.atm_location.isNull()).count()

In [15]:
## Create location_id (primary key/surrogate Key)
wi  =  Window().orderBy("atm_location")
DIM_LOCATION  =  DIM_LOCATION.select(row_number().over(wi).alias("location_id"), col("*"))

In [16]:
# Display the data
DIM_LOCATION.show()

In [17]:
### creating a temporary table ######
DIM_LOCATION.registerTempTable("DIM_LOCATION")

In [18]:
## Check the count through SQL query
spark.sql("select  count(*)  from  DIM_LOCATION").show()

#### Check2: Location Dimension Count Matches with Validation sheet

### Card Type Dimension Table

In [19]:
# Create Card Type Dimension Data Frame
DIM_CARD_TYPE = df.select('card_type').distinct()

In [20]:
## Create location_id (primary key/surrogate Key)
wi = Window().orderBy("card_type")
DIM_CARD_TYPE = DIM_CARD_TYPE.select(row_number().over(wi).alias("card_type_id"), col("*"))

In [21]:
# Displaying date in the Card Type Dataframe
DIM_CARD_TYPE.show()

In [22]:
## Checking the Count
DIM_CARD_TYPE.count()

#### Check3: Card Type Dimension Count Matches with Validation sheet

### ATM Dimension Table

In [23]:
## Populate Data in ATM Dimension Dataframe
DIM_ATM = df.select('atm_id','atm_manufacturer','atm_lat','atm_lon').distinct()

In [24]:
## Join (Left) ATM Dimension with Location Dimension since more ATM can be present acro ss different Latitude/Longitude
DIM_ATM  =  DIM_ATM.join(DIM_LOCATION,on=['atm_lat','atm_lon'], how='left')

In [25]:
# Check Count
DIM_ATM.count()

In [26]:
# Check Schema of ATM Dimension
DIM_ATM.printSchema()

In [27]:
DIM_ATM = DIM_ATM.select('atm_id','atm_manufacturer','location_id').distinct()

In [28]:
# Check Count
DIM_ATM.count()

In [29]:
## Renaming the column
DIM_ATM = DIM_ATM.withColumnRenamed("atm_id","atm_number")

Original data = atm_id represents the atm_number

For ATM DIM table, we need to create another column (atm_id) which will be used as Primary Key.

Renaming the 'atm_id' column in ATM_dimension table' to 'atm_number' which will be used later during JOIN operation.


In [30]:
# Checking if any of the atm number is missing
DIM_ATM.filter(DIM_ATM.atm_number.isNull()).count()

0

In [31]:
# Add Surrogate/PK key
wi  =  Window().orderBy("atm_number")
DIM_ATM  =  DIM_ATM.select(row_number().over(wi).alias("atm_id"),  col("*"))

In [32]:
## Display the content
DIM_ATM.show(5)

In [33]:
## Check The overall count
DIM_ATM.count()

#### Check4: ATM Dimension Count Matches with Validation sheet

### DATE Dimension Table

In [34]:
# Populate data on Date Dimension Dataframe
DIM_DATE = df.select('year','month','day','hour','weekday').distinct()

In [35]:
from pyspark.sql.functions import *

In [36]:
# Preparing a temp column "month numver (i.e. January = 01, February = 02) which will be used to create Full DateTime Data"
DIM_DATE = DIM_DATE.withColumn("month_number",from_unixtime(unix_timestamp(col("month"),'MMMM'),'MM'))

In [37]:
# Validate the  change
DIM_DATE.show(8)

In [38]:
# Print the Schema
DIM_DATE.printSchema()

In [39]:
# Import Python Function (built-in functions available for DataFrame)
from pyspark.sql import functions as F

In [40]:
## Create customized "full date time" [desired format yyyy-MM-dd HH:mm:ss] 
DIM_DATE = DIM_DATE.withColumn('full_date_time',F.concat('year',F.lit('-'),'month_number',F.lit('-'),'day',F.lit(' '),'hour',F.lit(':00'),F.lit(':00')))\
.select("full_date_time","year","month","day","hour","weekday")

In [41]:
DIM_DATE.show(5)

In [42]:
## Check numm Count
DIM_DATE.filter(DIM_DATE.year.isNull()).count()

In [43]:
# Add PK key/Surrogate Key Date_id
wi  =  Window().orderBy("year")
DIM_DATE  =  DIM_DATE.select(row_number().over(wi).alias("date_id"),  col("*"))

In [44]:
## Change datatype for full_date_time (Timestamp format)
pattern = "yyyy-MM-dd HH:mm:ss"
DIM_DATE = DIM_DATE.withColumn("full_date_time",unix_timestamp("full_date_time", pattern).cast("timestamp"))

In [45]:
## Data value check
DIM_DATE.show(5)

In [46]:
## Schema check final
DIM_DATE.printSchema()

In [47]:
## Count Check for Date Dimension
DIM_DATE.count()

#### Check5: DATE Dimension Count Matches with Validation sheet

### Join and Create Fact Table

In [48]:
## Create Stage 1 Data frame : Condition Original Data frame left join with Location Dimension DF with valid Keys
stage_1 =  df.join(DIM_LOCATION,on=['atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon'],how = 'left')
stage_1.count()

#### Check6: Total Row Count Matches with Validation sheet after Stage1

In [49]:
# Check Stage 1 schema
stage_1.printSchema()

In [50]:
# Rename necessary column for Joining operation
stage_1  =  stage_1.withColumnRenamed("atm_id","atm_number")

Original data = atm_id represents the atm_number

ATM DIM table, we have two columns 1. atm_id (contains row number) 2. atm_number = (atm_id value of original data)

For Join, to create a matching key Renaming the 'atm_id' (original data) to 'atm_number' so original file and ATM dimenstion

table can be joined with key = ['atm_number','atm_manufacturer','atm_location']

In [51]:
## Print Schema of ATM Dimension
DIM_ATM.printSchema()

In [52]:
##  Create Stage 2  Data frame: Condition : Stage 1 Data frame left join with ATM Dimension DF with valid Keys
stage_2 = stage_1.join(DIM_ATM,on=['atm_number','atm_manufacturer','location_id'],how='left') 
stage_2.count()

#### Check7: Total Row Count Matches with Validation sheet after Stage2

In [53]:
# Check Stage 2 Schema
stage_2.printSchema()

In [54]:
## Create Stage 3 Data frame : Condition : Stage 3 Data frame left join with Date Dimension DF with valid Keys
stage_3 = stage_2.join(DIM_DATE,on=['year','month','day','hour','weekday'],how = 'left')
stage_3.count()

#### Check8: Total Row Count Matches with Validation sheet after Stage3

In [55]:
# Stage 3 Schema check
stage_3.printSchema()

In [56]:
##  Create Stage 4  Dataframe : Condition: Stage 4 Data frame left join with Card Type Dimension DF with valid Keys
stage_4 = stage_3.join(DIM_CARD_TYPE,on=['card_type'],how = 'left') 
stage_4.count()

#### Check9: Total Row Count Matches with Validation sheet after Stage4

In [57]:
# Stage 4 Schema check
stage_4.printSchema()

### Fact Table Creation
### Fact table with distinct() --- For Checking Purpose

In [58]:
### Create Fact table based on required/mentioned column
fact_table = stage_4.select('atm_id','location_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code',
                            'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description').distinct()

In [59]:
## Add PK/Surrogate Key (Trans_id)
wI  =  Window().orderBy("atm_id")
fact_table  =  fact_table.select(row_number().over(wI).alias("trans_id"),  col("*"))

In [60]:
fact_table.count()

#### Check10: Total Row Count Mismatch with Validation sheet for Fact Table with distinct()
#### Check count for all the Stages in the creation of Transaction Fact table
#### Count of Records in each stage - 2468572
### Fact table without distinct()

In [61]:
### Create Fact table based on required/mentioned column
FACT_ATM_TRANS = stage_4.select('atm_id','location_id','date_id','card_type_id','atm_status','currency','service','transaction_amount',
                              'message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [62]:
## Add PK/Surrogate Key (Trans_id)
wi  =  Window().orderBy("atm_id")
FACT_ATM_TRANS  =  FACT_ATM_TRANS.select(row_number().over(wi).alias("trans_id"),  col("*"))

In [63]:
# Count validation
FACT_ATM_TRANS.count()

#### Check11: Total Row Count Matches with Validation sheet for Fact Table

In [64]:
## Verify Fact Table Data
FACT_ATM_TRANS.show(5)

#### We will proceed without Distinct as per Data validation reference spreadsheet
#### Check count for the all the Stages in the creation of Transaction Fact table -
#### Count of Records in each stage - 2468572

### Create .csv file and store it HDFS

In [78]:
## Save Location Dimension Dataframe in HDFS as .csv format
DIM_LOCATION.write.format('csv').option('header',False).mode('overwrite').option('sep',',').save('s3a://etlproject-testbucket/DIM_LOCATION/')

In [73]:
## Save Card Type Dimension Dataframe in HDFS as .csv format 
DIM_CARD_TYPE.write.format('csv').option('header',False).mode('overwrite').option('sep', ',').save('s3a://etlproject-testbucket/DIM_CARD_TYPE/')

In [74]:
## Save ATM Dimension Dataframe in HDFS as .csv format
DIM_ATM.write.format('csv').option('header',False).mode('overwrite').option('sep',',').save('s3a://etlproject-testbucket/DIM_ATM/')

In [75]:
## Save DATE Dimension Dataframe in HDFS as .csv format 
DIM_DATE.write.format('csv').option('header',False).mode('overwrite').option('sep', ',').save('s3a://etlproject-testbucket/DIM_DATE/')

In [76]:
##  Save Fact Table	Dataframe in HDFS as .csv format
FACT_ATM_TRANS.write.format('csv').option('header',False).mode('overwrite').option('sep', ',').save('s3a://etlproject-testbucket/FACT_ATM_TRANS/')