ETL Project

In [1]:
#importing necesary header files and path set
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('jupyter_Spark').master("local").getOrCreate()
spark

ModuleNotFoundError: No module named 'pyspark'

In [None]:
#Read the data from the SRC_ATM_TRANS table to spark
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, inferSchema = True)

In [None]:
#view the dataframe created above
df.show(1)

In [None]:
#Create custom input schema using StrucType
#import necessary header
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [None]:
Schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

In [None]:
#store the SRC_ATM_TRANS data in the dataframe
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, schema = Schema)

In [None]:
#count of records loaded in the df
df.select('*').count()

In [None]:
df.printSchema()

Creating Fact and Dimension 

In [None]:
# creating distinct df and selecting required columns
location = df.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()

In [None]:
#import necessary headers
from pyspark.sql.functions import *

In [None]:
# creating the primary key
df_temp = location.rdd.zipWithIndex().toDF()
dim_location = df_temp.select(col("_1.*"),col("_2").alias('location_id'))
dim_location.show(5)

In [None]:
#renaming the column as required 
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

In [None]:
# rechecking the column names 
DIM_LOCATION.columns

In [None]:
# creating a temporary df for the dimension atm and selecting required columns
atm = df.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon')

In [None]:
#renaming the column as required 
atm = atm.withColumnRenamed('atm_id', 'atm_number')

In [None]:
# joining the dim_location and atm dataframes
atm = atm.join(dim_location, on = ['atm_lat', 'atm_lon'], how = "left")

In [None]:
# checking to columns in the joined df
atm.columns

In [None]:
# selecting the required columns and making sure records are distinct
atm = atm.select('atm_number', 'atm_manufacturer', 'location_id').distinct()

In [None]:
#renaming the column as required 
atm = atm.withColumnRenamed('location_id', 'atm_location_id')

In [None]:
# viewing changes in columns
atm.columns

In [None]:
# creating the primary key column
df_temp = atm.rdd.zipWithIndex().toDF()
dim_atm = df_temp.select(col("_1.*"),col("_2").alias('atm_id'))
dim_atm.show(5)

In [None]:
# rearranging the columns according to the target
DIM_ATM = dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

In [None]:
# checking that all required columns are present and named correctly
DIM_ATM.columns

In [None]:
# creating a df date and selecting required columns
date = df.select('year', 'month', 'day', 'hour', 'weekday')

In [None]:
date = date.withColumn('full_date', concat_ws('-', date.year, date.month, date.day))

In [None]:
date = date.withColumn('full_time', concat_ws(':', date.hour, lit('00'), lit('00')))

In [None]:
date = date.withColumn('full_date_time', concat_ws(' ', date.full_date, date.full_time))

In [None]:
#formatting date 
pattern = 'yyyy-MMM-dd HH:mm:ss'
date = date.withColumn('full_date_time', unix_timestamp(date.full_date_time, pattern).cast('timestamp'))

In [None]:
date.show(5, truncate = False)

In [None]:
# selecting the required columns and make it distinct
date = date.select('year', 'month', 'day', 'hour', 'weekday', 'full_date_time').distinct()

In [None]:
# creating the primary key column
df_temp = date.rdd.zipWithIndex().toDF()
DIM_DATE = df_temp.select(col("_1.*"),col("_2").alias('date_id'))
DIM_DATE.show(5)

In [None]:
# rearranging the columns according to the target model
DIM_DATE = DIM_DATE.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

In [None]:
# checking that all required columns are present and named correctly
DIM_DATE.columns

In [None]:
# creating a df card_type and selecting required columns with distinct values
card_type = df.select('card_type').distinct()

In [None]:
# creating the primary key column
df_temp = card_type.rdd.zipWithIndex().toDF()
DIM_CARD_TYPE = df_temp.select(col("_1.*"),col("_2").alias('card_type_id'))
DIM_CARD_TYPE.show(5)

In [None]:
# rearranging the columns according to the target model
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id', 'card_type')

In [None]:
# checking that all required columns are present and named correctly
DIM_CARD_TYPE.columns

In [None]:
# validating the count of the dataframe
DIM_CARD_TYPE.select('*').count()

In [None]:
# renaming the colums as required
fact_loc = df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

In [None]:
# joining the df
fact_loc = fact_loc.join(DIM_LOCATION, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")

In [None]:
# viewing the columns
fact_loc.columns

In [None]:
# renaming the colums as required
fact_loc = fact_loc.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')

In [None]:
# joining the df
fact_atm = fact_loc.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

In [None]:
# performing necessary transformation
fact_atm = fact_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

In [None]:
# joining the df
fact_date = fact_atm.join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

In [None]:
# Validating the count of the df at the end of Stage 4
fact_atm_trans.select('*').count()

In [None]:
# creating primary key of fact table and viewing 1st record of the table
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
FACT_ATM_TRANS = fact_atm_trans.withColumn("trans_id", row_number().over(w))
FACT_ATM_TRANS.show(1, True)

In [None]:
# viewing the list of columns
FACT_ATM_TRANS.columns

In [None]:
# selecting and arranging only the required columns according to the target model
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 
'clouds_all', 'weather_id', 'weather_main', 'weather_description')

In [None]:
# checking that all required columns are present and named correctly
FACT_ATM_TRANS.columns

AWS S3 storage in csv format

In [None]:
# writing data from pyspark df 'dim_location' in csv format to dim_location folder in S3 bucket 'etlprojectrohit'
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectrohit/dim_location', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_atm' in csv format to dim_atm folder in S3 bucket 'etlprojectrohit'
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectrohit/dim_atm', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_data' in csv format to dim_data folder in S3 bucket 'etlprojectrohit'
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectrohit/dim_date', mode='overwrite')

In [None]:
# writing data from pyspark df 'dim_card_type' in csv format to dim_card_type folder in S3 bucket 'etlprojectrohit'
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectrohit/dim_card_type', mode='overwrite')

In [None]:
# writing data from pyspark df 'fact_atm_trans' in csv format to fact_atm_trans folder in S3 bucket 'etlprojectrohit'
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectrohit/fact_atm_trans', mode='overwrite')