In [None]:
# Initializing spark related environment variables

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [None]:
# Creating Spark Context

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("etl_assignment").setMaster("yarn-client")
sc = SparkContext(conf = conf)
sc

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("assignment").master("").getOrCreate()
spark

#### Loading data from Hadoop cluster.
This data was saved as a parquet file during sqoop import.

In [None]:
transactionDF = spark.read.parquet('/user/root/atm_trans/99b03bca-c9e7-4fb6-916f-1baa29e96797.parquet')

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF.count()

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, DecimalType, DateType, TimestampType, NullType

In [None]:
# Type casting columns to required datatype.
transactionDF = transactionDF.withColumn("atm_lon", transactionDF['atm_lon'].cast('double')).withColumn("atm_lat", transactionDF['atm_lat'].cast('double')).withColumn("rain_3h", transactionDF['rain_3h'].cast('double'))

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF.count()

In [None]:
### Creating LOCATION Dimension table ###

In [None]:
uniqueLocationDF = transactionDF.select(transactionDF.atm_location, transactionDF.atm_streetname, transactionDF.atm_street_number, transactionDF.atm_zipcode, transactionDF.atm_lat, transactionDF.atm_lon).distinct()

In [None]:
uniqueLocationDF.printSchema()

In [None]:
uniqueLocationDF.count()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

Generating unique id for the location dimension to link it with transaction fact entity.

In [None]:
uniqueLocationDFWithID = uniqueLocationDF.withColumn('location_id', monotonically_increasing_id())

In [None]:
uniqueLocationDFWithID.printSchema()

In [None]:
uniqueLocationDFWithID.take(5)

In [None]:
# using spark sql to rename columns as required

In [None]:
uniqueLocationDFWithID.createOrReplaceTempView("DIM_LOCATION")

#### Location Dimension Table

In [None]:
DIM_LOCATION_DF = spark.sql("select location_id, atm_location as location, atm_streetname as streetname, atm_street_number as street_number, atm_zipcode as zipcode, atm_lat as lat, atm_lon as lon from DIM_LOCATION")

In [None]:
DIM_LOCATION_DF.printSchema()

In [None]:
DIM_LOCATION_DF.take(5)

In [None]:
DIM_LOCATION_DF.count()

In [None]:
DIM_LOCATION_DF_FINAL = DIM_LOCATION_DF.withColumn("lon", DIM_LOCATION_DF['lon'].cast('double')).withColumn("lat", DIM_LOCATION_DF['lat'].cast('double'))

In [None]:
DIM_LOCATION_DF_FINAL.take(2)

In [None]:
DIM_LOCATION_DF_FINAL.select("location_id").distinct().count()

### Check Point
DIM_LOCATION dimension table returned 109 unique entries

In [None]:
DIM_LOCATION_DF_FINAL.printSchema()

#### ATM Dimension Table

In [None]:
# type casting few columns as per requirement
dim_atm_schema = StructType([StructField("atm_id", StringType(), True), StructField("atm_manufacturer", StringType(), True), StructField("atm_location", StringType(), True) ])

In [None]:
dim_atm_df = transactionDF.select('atm_id', 'atm_manufacturer','atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()

In [None]:
dim_atm_df.printSchema()

In [None]:
dim_atm_df.count()

In [None]:
dim_atm_df_loc_id = dim_atm_df.join(DIM_LOCATION_DF_FINAL, (DIM_LOCATION_DF_FINAL.lat == dim_atm_df.atm_lat) & (DIM_LOCATION_DF_FINAL.lon == dim_atm_df.atm_lon), how= 'inner')

In [None]:
dim_atm_df_loc_id.printSchema()

In [None]:
dim_atm_df_loc_id.count()

In [None]:
# Renaming columns as required
dim_atm_df_loc_id_1 = dim_atm_df_loc_id.withColumnRenamed('atm_id','atm_number').withColumnRenamed('location_id','atm_location_id')

In [None]:
dim_atm_df_loc_id_1.printSchema()

In [None]:
dim_atm_df_loc_id_2 =dim_atm_df_loc_id_1.withColumn('atm_id', dim_atm_df_loc_id_1['atm_number'].cast('integer'))

In [None]:
dim_atm_df_loc_id_2.printSchema()

In [None]:
dim_atm_df_loc_id_2.filter(dim_atm_df_loc_id_2.atm_id == 102).show()

In [None]:
dim_atm_df_final = dim_atm_df_loc_id_2.select('atm_id', 'atm_number','atm_manufacturer','atm_location_id')

In [None]:
dim_atm_df_final.printSchema()

In [None]:
dim_atm_df_final.count()

### Check Point.
Retrieved 156 unique records from ATM_Dimension table.

In [None]:
dim_atm_df_final.take(5)

#### Date Dimension Table

In [None]:
dim_date_1 = transactionDF.select('year', 'month', 'day', 'weekday', 'hour').distinct()

In [None]:
dim_date_1.printSchema()

In [None]:
dim_date_1.count()

In [None]:
dim_date_1.take(10)

In [None]:
from pyspark.sql.functions import *

In [None]:
dim_date_1.select('month').distinct().show()

#### Creating a dummy column "mon" to build date_time column in later steps.

In [None]:
dim_date_1 = dim_date_1.withColumn('mon', when(col('month')=='January',1).when(col('month')=='February',2).when(col('month')=='March',3).when(col('month')=='April',4).when(col('month')=='May',5).when(col('month')=='June',6).when(col('month')=='July',7).when(col('month')=='August',8).when(col('month')=='September',9).when(col('month')=='October',10).when(col('month')=='November',11).when(col('month')=='December',12))

In [None]:
dim_date_1 =dim_date_1.withColumn('full_date_time', to_timestamp(concat_ws(" ",concat_ws("-",dim_date_1['year'],dim_date_1['mon'], dim_date_1['day']),dim_date_1['hour'])))

In [None]:
dim_date_1.printSchema()

In [None]:
dim_date_1.show(5)

In [None]:
dim_date_1 = dim_date_1.withColumn('date_id', monotonically_increasing_id())

In [None]:
dim_date_1.printSchema()

In [None]:
dim_date_df_final = dim_date_1.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

In [None]:
dim_date_df_final.printSchema()

In [None]:
dim_date_df_final.show(5)

In [None]:
dim_date_df_final.count()

### Check Point.
Retrieved 8685 unique records from Date_Dimension table.

#### Card_Type Dimension Table

In [None]:
transactionDF.printSchema()

In [None]:
dim_card_df = transactionDF.select('card_type').distinct()

In [None]:
dim_card_df.show()

In [None]:
dim_card_df.printSchema()

In [None]:
dim_card_df.count()

In [None]:
dim_card_df_final = dim_card_df.withColumn('card_type_id' , monotonically_increasing_id())

In [None]:
dim_card_df_final = dim_card_df_final.select('card_type_id', 'card_type')

In [None]:
dim_card_df_final.printSchema()

In [None]:
dim_card_df_final.count()

### Check Point.
12 unique records are retrieved from Card_Dimension table.

### Final Transaction Fact Table
A final fact table is created by merging the dimension tables with the original transaction table. Excluding those columns that are unwanted, inorder to get a normalized table.

In [None]:
transactionDF.printSchema()

#### 1. Joining Card Dimension table and fact table.

In [None]:
transactionDF = transactionDF.join(dim_card_df_final, (dim_card_df_final.card_type == transactionDF.card_type), how='inner')

In [None]:
transactionDF.count()

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['card_type']])

In [None]:
transactionDF.printSchema()

In [None]:
# *** Check Point- if 2468572 records are retrieved
transactionDF.count()

#### 2. Joining Date dimension table and fact table

In [None]:
transactionDF.filter("year == 2017 and month == 'April'").count()

In [None]:
transactionDF = transactionDF.join(dim_date_df_final, (dim_date_df_final.year==transactionDF.year)&(dim_date_df_final.month==transactionDF.month)&(dim_date_df_final.day==transactionDF.day)&(dim_date_df_final.hour==transactionDF.hour)&(dim_date_df_final.weekday==transactionDF.weekday),how='inner')

In [None]:
transactionDF.count()

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['year','month','day','weekday','hour','full_date_time']])

In [None]:
transactionDF.printSchema()

In [None]:
# *** Check Point- if 2468572 records are retrieved
transactionDF.count()

#### 3. Joining Location dimension table and Fact table

In [None]:
DIM_LOCATION_DF_FINAL.printSchema()

In [None]:
transactionDF.filter(transactionDF.atm_id == 102).select('atm_id','atm_lat','atm_lon').count()

In [None]:
transactionDF = transactionDF.join(DIM_LOCATION_DF_FINAL,(transactionDF.atm_location==DIM_LOCATION_DF_FINAL.location)&(transactionDF.atm_streetname==DIM_LOCATION_DF_FINAL.streetname)&(transactionDF.atm_street_number==DIM_LOCATION_DF_FINAL.street_number)&(transactionDF.atm_zipcode==DIM_LOCATION_DF_FINAL.zipcode)&(transactionDF.atm_lat==DIM_LOCATION_DF_FINAL.lat)&(transactionDF.atm_lon==DIM_LOCATION_DF_FINAL.lon), how='inner')

In [None]:
transactionDF.filter(transactionDF.atm_id == 102).select('atm_id','atm_lat','atm_lon').count()

In [None]:
transactionDF.count()

In [None]:
DIM_LOCATION_DF_FINAL.columns

In [None]:
transactionDF.printSchema()

In [None]:
#transactionDF.columns

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in [
 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_location',
    'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']])

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.withColumnRenamed(existing='atm_id',new='atm_id_r')

In [None]:
# *** Check Point- if 2468572 records are retrieved
transactionDF.count()

#### Joining ATM dimension table and Fact table

In [None]:
dim_atm_df_final.printSchema()

In [None]:
transactionDF = transactionDF.join(dim_atm_df_final, (transactionDF.atm_id_r==dim_atm_df_final.atm_number)&(transactionDF.atm_manufacturer==dim_atm_df_final.atm_manufacturer)&(transactionDF.location_id==dim_atm_df_final.atm_location_id), how='left')

In [None]:
# *** Check Point- if 2468572 records are retrieved
transactionDF.count()

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['atm_number','atm_manufacturer','atm_location_id']])

### Checking if all the dimension tables are created as per requirement

In [None]:
DIM_LOCATION_DF_FINAL.printSchema()

In [None]:
dim_atm_df_final.printSchema()

In [None]:
dim_card_df_final.printSchema()

In [None]:
dim_date_df_final.printSchema()

#### Creating "trans_id" column that acts as primary key of the Fact table.

In [None]:
transactionDF = transactionDF.withColumn('trans_id', monotonically_increasing_id())

In [None]:
transactionDF.printSchema()

In [None]:
# Removing unwanted columns and renaming the columns as per requirement, if any.

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['weather_lat','weather_lon','weather_city_id','temp','pressure','humidity','wind_speed','wind_deg']])

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['weather_city_name']])

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF = transactionDF.withColumnRenamed(existing='location_id',new='weather_loc_id')

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF.select('trans_id').distinct().count()

In [None]:
transactionDF = transactionDF.withColumn('atm_id_r',transactionDF.atm_id_r.cast(dataType='integer'))

In [None]:
transactionDF = transactionDF.select([c for c in transactionDF.columns if c not in ['atm_id']])

In [None]:
transactionDF = transactionDF.withColumnRenamed(existing='atm_id_r',new='atm_id')

In [None]:
transactionDF = transactionDF.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description')

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF.count()

### Check Point:
Retrieved 2468572 unique records from final fact table that matches the entries in original data. A Good Sign!

#### Setting "access key" and "secret key" to load the above dataframes into S3 buckets.

In [None]:
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", "<secret access key>")

#### Loading data into S3 buckets

In [None]:
dim_card_df_final.write.csv(header=True, path='s3a://etlassignment-deepthi/dim_cardtype_df/')

In [None]:
dim_atm_df_final.write.csv(path='s3a://etlassignment-deepthi/dim_atm_df/')

In [None]:
dim_date_df_final.write.csv(path='s3a://etlassignment-deepthi/dim_date_df/')

In [None]:
DIM_LOCATION_DF_FINAL.write.csv(path='s3a://etlassignment-deepthi/dim_location_df/')

In [None]:
transactionDF.printSchema()

In [None]:
transactionDF.write.csv(path='s3a://etlassignment-deepthi/fact_trans_df/')

#### Checking if data is properly loaded into S3 buckets.

In [None]:
test_df = spark.read.csv('s3a://etlassignment-deepthi/fact_trans_df/')

In [None]:
test_df.printSchema()

In [None]:
test_df.count()