## ETL Project (Spar Nord Bank ATM Data Mart)

In [4]:
### Importing necessary libraries and setting up SparkSession
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, FloatType
from pyspark.sql.functions import col,lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import concat
from pyspark.sql.functions import lpad

ModuleNotFoundError: No module named 'pyspark'

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETL').master("local").getOrCreate()
spark

ModuleNotFoundError: No module named 'pyspark'

In [8]:
# reading dataframe from hdfs
# naming dataframe as etl
etl = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, inferSchema = True)

NameError: name 'spark' is not defined

In [None]:
# checking first row 
df.show(1)

### Creating custom input schema using StrucType and reading the data

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [None]:
# creating new schema as 'Schema' with respective structypes
Schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

In [None]:
# Reading the file with Schema
etl_df = spark.read.csv("/user/root/etl_atm_project/part-m-00000", schema = Schema)

In [None]:
# checking top 5 rows
etl_df.show(5)

In [None]:
# Verifying the count of the records loaded into the Dataframe
etl_df.select('*').count()

In [None]:
# Checking list of columns
etl_df.columns

In [None]:
# Checking schema
etl_df.printSchema()

In [None]:
# checking null values 
count_missings(etl_df)

##### As seen above few columns contains null values but data dict says only message columns will have null values depicting no error was there

## Creating the Dimension and Fact tables

### Creating a dataframe for Location Dimension according to Target Dimension Model

In [None]:
# Reading respective columns for location dimension from etl_df and renaming the colums as per requirement
dim_location = etl_df.select([etl_df.atm_location.alias("location"),etl_df.atm_streetname.alias("streetname"),etl_df.atm_street_number.alias("street_number"),etl_df.atm_zipcode.alias("zipcode"),etl_df.atm_lat.alias("lat"),etl_df.atm_lon.alias("lon")]).distinct()

In [None]:
# Checking count of location table
dim_location.count()

In [None]:
# checking top 5 rows
dim_location.show(5, truncate = False)

In [None]:
# checking schema
dim_location.printSchema()

In [None]:
# adding column for primary key
dim_location= dim_location.withColumn("new_column",lit("ABC"))

In [None]:
# creating primary key 
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_location= dim_location.withColumn("atm_location_id", row_number().over(w)).drop("new_column")

In [None]:
# checking schema
dim_location.printSchema()

In [None]:
dim_location.filter(dim_location['atm_location_id'] > 106).collect()

In [None]:
# Rearrange dim_location
location = dim_location.select("atm_location_id","location","streetname","street_number","zipcode","lat","lon")

In [None]:
# checking that all required columns are present and named correctly
location.columns

In [None]:
# validating the count of the dataframe
location.count()

In [None]:
# checking schema
location.printSchema()

### Creating a dataframe for ATM Dimension according to Target Dimension Model

In [None]:
#creating new data frame with atm related columns
dim_atm = etl_df.select([etl_df.atm_id.alias("atm_number"),etl_df.atm_manufacturer.alias("atm_manufacturer"),etl_df.atm_lat.alias("lat"),etl_df.atm_lon.alias("lon")])

In [None]:
#To add atm_location_id of dim_location df as a foreign key to the atm table, adding left join to the atm table and location table.
dim_atm = dim_atm.join(location, on = ["lat","lon"],how = "leftouter")

In [None]:
# Taking distinct values to avoid repeated values 
atm_distinct =dim_atm.distinct()

In [None]:
#adding our primary key to the 156 sets of data
atm_distinct= atm_distinct.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
atm_distinct= atm_distinct.withColumn("atm_id", row_number().over(w)).drop("new_column")

In [None]:
# Checking newly created primary key 
atm_distinct.show(2)

In [None]:
# creating atm from atm_distinct
atm = atm_distinct.select('atm_id','atm_number','atm_manufacturer','atm_location_id')

In [None]:
# Checking top rows of atm
atm.show(5)

In [None]:
# validating the count of the dataframe
atm.count()

In [None]:
# checking schema
atm.printSchema()

### Creating a dataframe for Date Dimension according to Target Dimension Model

In [None]:
# Creating date data fame
dim_date = etl_df.select([etl_df.year.alias("year"),etl_df.month.alias("month"),etl_df.day.alias("day"),etl_df.hour.alias("hour"),etl_df.weekday.alias("weekday")]).distinct()

In [None]:
from pyspark.sql.functions  import date_format
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date

In [None]:
# Creating New Month column with Integer value. 
dim_date=dim_date.withColumn('month_new', date_format(to_date(col('month'),'MMMMM'),'MM').cast(IntegerType()))

In [None]:
# adding new Month, day and hours columns with Zeroes 
dim_date=dim_date.withColumn('month_new', lpad(col('month_new'),2,'0')).withColumn('day_new', lpad(col('day'),2,'0')).withColumn('hour_new', lpad(col('hour'),2,'0'))

In [None]:
# Create a new column Full_Date_time by combining Year, new month, day, hour and "00" value to create timestamp
dim_date_final=dim_date.withColumn("full_date_time",concat(col('year'),col('month_new'),col('day_new'),col('hour_new'),lit('00')))

In [None]:
# creating full_date by concating other columns
dim_date_final = dim_date.withColumn("full_date",from_unixtime(unix_timestamp(concat(dim_date.year.cast(StringType()),dim_date.month.cast(StringType()),lpad(dim_date.day.cast(StringType()),2,'0'),lpad(dim_date.hour.cast(StringType()),2,'0')),'yyyyMMMMMddHH'),'YYYY-MM-DD HH:mm:SS'))

In [None]:
# Creating primary key for the dimension with name date_id
dim_date_final= dim_date_final.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_date_final= dim_date_final.withColumn("date_id", row_number().over(w)).drop("new_column")

In [None]:
# creating data from dim_date_final
date = dim_date_final.select("date_id","full_date_time","year","month","day","hour","weekday")

In [None]:
# checking newly created column
date.show(5)

In [None]:
#Finding the count for validation
date.count()

In [None]:
# checking schema
date.printSchema()

### Creating a dataframe for Card Type Dimension according to Target Dimension Model

In [9]:
# creating card_type dimension
dim_card_type = etl_df.select([etl_df.card_type.alias("card_type")]).distinct()

NameError: name 'etl_df' is not defined

In [None]:
# creating primary key or card_type_id column
dim_card_type= dim_card_type.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_card_type= dim_card_type.withColumn("card_type_id", row_number().over(w)).drop("new_column")

In [None]:
# creating dim_card_type as card_type
card_type = dim_card_type.select("card_type_id","card_type")

In [None]:
#Finding the count for validation
card_type.count()

In [None]:
# Checking dataframe's top 5 rows
card_type.show(5)

In [None]:
# checking schema
card_type.printSchema()

### Creating the Transaction Fact Table according to Target Model

In [None]:
# Creating alias 
etl_df = etl_df.alias('etl_df')
date = date.alias('date')
dim_card_type = dim_card_type.alias('dim_card_type')
dim_location = dim_location.alias('dim_location')
atm = atm.alias('atm')

- Creating fact table will take 4 steps by outer left joining the input table with dimension tables
- Dropping columns as required except primary keys of dimension table as they will act as foreign key

In [None]:
# Creating firts_df by left join of date dimension on input data frame and dropping columns 
first_df =etl_df.join(date, on = ['year','month','day','hour','weekday'],how='left').select('etl_df.*','date.date_id').drop(*['year','month','day','hour','weekday'])

In [None]:
#  Creating alias for first
first_df = first_df.alias("first_df")

In [None]:
# Checking schema
first_df.printSchema()

In [None]:
# Checking count for first step for validation 
first_df.count()

In [None]:
# Creating second_df by joining card_type dimension with first_df
second_df = first_df.join(dim_card_type, on = ['card_type'], how = 'left').select('first_df.*','dim_card_type.card_type_id').drop(*['card_type'])

In [None]:
#  Creating alias
second_df = second_df.alias('second_df')

In [None]:
# Checking schema
second_df.printSchema()

In [None]:
# Checking count
second_df.count()

In [None]:
# Creating third_df by joining location dimension with second_df by performing outer join
third_df = second_df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').join(dim_location, on = ['location','lat','lon','streetname','street_number','zipcode'],how = 'left').select('second_df.*','dim_location.atm_location_id').drop(*['location','lat','lon','streetname','street_number','zipcode'])

In [None]:
# Creating alias
third_df= third_df.alias('third_df')

In [None]:
# Checking schema
third_df.printSchema()

In [None]:
# checking count
third_df.count()

In [None]:
# Renaming atm_id as atm_number as atm_id imported from input df
third_df = third_df.withColumnRenamed('atm_id',"atm_number")

In [None]:
# Creating fourth_df by left joining of third with atm dimension
fourth_df= third_df.join(atm,on =['atm_number','atm_manufacturer','atm_location_id'],how ='left').select('third_df.*','atm.atm_id').drop(*['atm_manufacturer','atm_nummber'])

In [None]:
# Checking schema
fourth_df.printSchema()

In [None]:
# Checking count
fourth_df.count()

In [None]:
# creating fact table from fourth_df
fact_atm_trans = fourth_df.alias("fact_atm_trans")

In [None]:
#adding our primary key to fact table
fact_atm_trans= fact_atm_trans.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
fact_atm_trans= fact_atm_trans.withColumn("trans_id", row_number().over(w)).drop("new_column")

In [None]:
# checking schema of fact table
fact_atm_trans.printSchema()

In [None]:
# dropping irrelevant columns as per schema 
fact_atm_trans = fact_atm_trans.drop('weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure','humidity','wind_speed','wind_deg')

In [None]:
# Renaming atm_location_id as weather_loaction_id as per schema 
fact_atm_trans = fact_atm_trans.withColumnRenamed("atm_location_id","weather_loc_id")

In [None]:
# Checking final schema
fact_atm_trans.printSchema()

In [None]:
fact_atm_trans1 = fact_atm_trans.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [None]:
fact_atm_trans1 = fact_atm_trans1.alias('fact_atm_trans1')

In [None]:
fact_atm_trans1.show(5)

In [None]:
fact_atm_trans1.printSchema()

## Writing the PySpark Dataframes to AWS S3 Storage in csv format

### dim-atm

In [None]:
atm.coalesce(1).write.save("s3a://moushamk/dim-atm",format='csv',header='false'),

### dim-card-type

In [None]:
card_type.coalesce(1).write.save("s3a://moushamk/dim-card-type",format='csv',header='false'),

### dim-date

In [None]:
date.coalesce(1).write.save("s3a://moushamk/dim-date",format='csv',header='false'),

### dim-location

In [None]:
location.coalesce(1).write.save("s3a://moushamk/dim-location",format='csv',header='false'),