# ETL Project

In [1]:
# setting environment for jupyter notebook 
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# impoting required function and libraries
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [3]:
conf = SparkConf().setAppName("ATM_TRANS").setMaster("yarn-client")
sc = SparkContext(conf=conf)
sc

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ATM_TRANS").master("local").getOrCreate()
spark

In [6]:
# create an input schema using StructType
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday',StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', FloatType(),True),
                        StructField('atm_lon', FloatType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', StringType(),True), 
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', FloatType(),True),
                        StructField('weather_lon', FloatType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True), 
                        StructField('temp', FloatType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),                        
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', FloatType(),True),
                        StructField('cloud_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True),
                        ])

In [7]:
#read the data using the input schema created
file1 = spark.read.csv("atm_trans/part-m-00000" ,header = True, schema = fileSchema)

In [8]:
# verifying the data using the count function 
spark.read.csv("atm_trans/part-m-00000" , schema = fileSchema).count()

2468572

In [9]:
file1.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+---------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+---------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location| atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|cloud_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+---------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+---

In [10]:
file1.count()

2468571

# Creating all the dimension and fact tables from the given data 


## creating location dimension table 

In [11]:
# selecting the required columns
loc_dim= file1.select(["atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon"]).distinct()

In [12]:
dim_loc=file1.select(["atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon"]).distinct().count()

In [13]:
#count of values
dim_loc

109

In [15]:
# adding location id to location dinmesion table 
loc_dim_rdd=loc_dim.rdd.zipWithIndex().map(lambda (row,rowId) : ( list(row) + [rowId+1]))
loc_dim= loc_dim.withColumn("location_id",lit("1"))
loc_dim.show(4)

+--------------+--------------+-----------------+-----------+-------+-------+-----------+
|  atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+--------------+--------------+-----------------+-----------+-------+-------+-----------+
|      NÃ¦stved|   Farimagsvej|                8|       4700| 55.233| 11.763|          1|
|Skelagervej 15|   Skelagervej|               15|       9000| 57.023|  9.891|          1|
|     Svenstrup|  GodthÃ¥bsvej|               14|       9230| 56.973|  9.851|          1|
|         Durup|        Torvet|                4|       7870| 56.745|  8.949|          1|
+--------------+--------------+-----------------+-----------+-------+-------+-----------+
only showing top 4 rows



In [16]:
# converting the rrd to dataframe
sqlContext = SQLContext(sc)
DIM_LOCATION = sqlContext.createDataFrame(loc_dim_rdd, schema= loc_dim.schema).select(["location_id","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon"])

In [17]:
DIM_LOCATION.show()

+-----------+--------------------+--------------------+-----------------+-----------+-------+-------+
|location_id|        atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+--------------------+--------------------+-----------------+-----------+-------+-------+
|          1|            NÃ¦stved|         Farimagsvej|                8|       4700| 55.233| 11.763|
|          2|      Skelagervej 15|         Skelagervej|               15|       9000| 57.023|  9.891|
|          3|           Svenstrup|        GodthÃ¥bsvej|               14|       9230| 56.973|  9.851|
|          4|               Durup|              Torvet|                4|       7870| 56.745|  8.949|
|          5|           Svendborg|  Sankt Nicolai Gade|                1|       5700| 55.058| 10.609|
|          6|           HjÃ¸rring|          Ã˜stergade|                8|       9800| 57.459|  9.988|
|          7|   Ã˜sterÃ¥  MÃ¸ller|            Ã˜sterÃ¥|               12|       90

In [18]:
# renaming column headers
dim_location = DIM_LOCATION.selectExpr("location_id as location_id","atm_location as location","atm_streetname as streetname","atm_street_number as street_number","atm_zipcode as zipcode","atm_lat as lat","atm_lon as lon")

In [19]:
dim_location.show(5)

+-----------+--------------+------------------+-------------+-------+------+------+
|location_id|      location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------+------------------+-------------+-------+------+------+
|          1|      NÃ¦stved|       Farimagsvej|            8|   4700|55.233|11.763|
|          2|     Svenstrup|      GodthÃ¥bsvej|           14|   9230|56.973| 9.851|
|          3|Skelagervej 15|       Skelagervej|           15|   9000|57.023| 9.891|
|          4|     Svendborg|Sankt Nicolai Gade|            1|   5700|55.058|10.609|
|          5|         Durup|            Torvet|            4|   7870|56.745| 8.949|
+-----------+--------------+------------------+-------------+-------+------+------+
only showing top 5 rows



## creating atm dimnesion table 

In [20]:
# selecting the required columns
atm_dim= file1.select(["atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon"]).distinct()

In [21]:
atm_dim.show(5)

+------+----------------+--------------------+--------------------+-----------------+-----------+-------+-------+
|atm_id|atm_manufacturer|        atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+------+----------------+--------------------+--------------------+-----------------+-----------+-------+-------+
|    45|             NCR|          Abildgaard|        HjÃ¸rringvej|              144|       9900| 57.447| 10.506|
|     4|             NCR|          Svogerslev|          BrÃ¸nsager|                1|       4000| 55.634| 12.018|
|    47|             NCR|       Frederiksberg|     Gammel Kongevej|              157|       1850| 55.677| 12.537|
|    72|             NCR|Daglig Brugsen Ã˜...|           Kystvejen|               51|       9560| 56.804| 10.271|
|    11|             NCR|           Sauersvej|Fridtjof Nansens Vej|                2|       9210| 57.023|   9.94|
+------+----------------+--------------------+--------------------+-----------------+---

In [22]:
# renaming the column
atm_dim1 = atm_dim.selectExpr("atm_id as atm_number","atm_manufacturer as atm_manufacturer","atm_location as location","atm_streetname as streetname","atm_street_number as street_number","atm_zipcode as zipcode","atm_lat as lat","atm_lon as lon")

In [23]:
atm_dim1.show(5)

+----------+----------------+--------------------+--------------------+-------------+-------+------+------+
|atm_number|atm_manufacturer|            location|          streetname|street_number|zipcode|   lat|   lon|
+----------+----------------+--------------------+--------------------+-------------+-------+------+------+
|        45|             NCR|          Abildgaard|        HjÃ¸rringvej|          144|   9900|57.447|10.506|
|         4|             NCR|          Svogerslev|          BrÃ¸nsager|            1|   4000|55.634|12.018|
|        47|             NCR|       Frederiksberg|     Gammel Kongevej|          157|   1850|55.677|12.537|
|        11|             NCR|           Sauersvej|Fridtjof Nansens Vej|            2|   9210|57.023|  9.94|
|        72|             NCR|Daglig Brugsen Ã˜...|           Kystvejen|           51|   9560|56.804|10.271|
+----------+----------------+--------------------+--------------------+-------------+-------+------+------+
only showing top 5 rows



In [24]:
# adding a atm id to atm dimension table 
atm_dim_rdd=atm_dim1.rdd.zipWithIndex().map(lambda (row,rowId) : ( list(row) + [rowId+1]))
atm_dim= atm_dim1.withColumn("atm_id",lit("1"))
atm_dim.show(5)

+----------+----------------+--------------------+--------------------+-------------+-------+------+------+------+
|atm_number|atm_manufacturer|            location|          streetname|street_number|zipcode|   lat|   lon|atm_id|
+----------+----------------+--------------------+--------------------+-------------+-------+------+------+------+
|         4|             NCR|          Svogerslev|          BrÃ¸nsager|            1|   4000|55.634|12.018|     1|
|        45|             NCR|          Abildgaard|        HjÃ¸rringvej|          144|   9900|57.447|10.506|     1|
|        47|             NCR|       Frederiksberg|     Gammel Kongevej|          157|   1850|55.677|12.537|     1|
|        72|             NCR|Daglig Brugsen Ã˜...|           Kystvejen|           51|   9560|56.804|10.271|     1|
|        11|             NCR|           Sauersvej|Fridtjof Nansens Vej|            2|   9210|57.023|  9.94|     1|
+----------+----------------+--------------------+--------------------+---------

In [25]:
# converting the rrd to dataframe
dim_atm_df= sqlContext.createDataFrame(atm_dim_rdd, schema= atm_dim.schema).select(["atm_id","atm_number","atm_manufacturer","location","streetname","street_number","zipcode","lat","lon"])

In [26]:
dim_atm_df.show()

+------+----------+----------------+--------------------+--------------------+-------------+-------+------+------+
|atm_id|atm_number|atm_manufacturer|            location|          streetname|street_number|zipcode|   lat|   lon|
+------+----------+----------------+--------------------+--------------------+-------------+-------+------+------+
|     1|         4|             NCR|          Svogerslev|          BrÃ¸nsager|            1|   4000|55.634|12.018|
|     2|        45|             NCR|          Abildgaard|        HjÃ¸rringvej|          144|   9900|57.447|10.506|
|     3|        47|             NCR|       Frederiksberg|     Gammel Kongevej|          157|   1850|55.677|12.537|
|     4|        11|             NCR|           Sauersvej|Fridtjof Nansens Vej|            2|   9210|57.023|  9.94|
|     5|        72|             NCR|Daglig Brugsen Ã˜...|           Kystvejen|           51|   9560|56.804|10.271|
|     6|        84|             NCR|           Svendborg|  Sankt Nicolai Gade|  

In [27]:
dim_atm = dim_atm_df.join(dim_location,on=['lat','lon'],how='left').select("atm_id","atm_number","atm_manufacturer","location_id").drop(*['location','streetname','street_number','zipcode','lat','lon']) 

In [28]:
dim_atm.show()

+------+----------+----------------+-----------+
|atm_id|atm_number|atm_manufacturer|location_id|
+------+----------+----------------+-----------+
|   105|        18| Diebold Nixdorf|         41|
|    29|       101|             NCR|         32|
|    84|         9| Diebold Nixdorf|         92|
|    24|        64|             NCR|         58|
|    93|        59| Diebold Nixdorf|         20|
|    93|        59| Diebold Nixdorf|         50|
|    99|        30|             NCR|         20|
|    99|        30|             NCR|         50|
|    62|        21|             NCR|         93|
|    62|        21|             NCR|         94|
|    62|        21|             NCR|          7|
|    94|        12|             NCR|         93|
|    94|        12|             NCR|         94|
|    94|        12|             NCR|          7|
|    53|       104|             NCR|         93|
|    53|       104|             NCR|         94|
|    53|       104|             NCR|          7|
|    54|        39| 

In [29]:
dim_atm.count() 

156

## creating data dimension table 

In [30]:
# selecting the required columns
date_df =file1.select(["year","month","day","weekday","hour"]).distinct()

In [31]:
date_df.count()

8685

In [32]:
date_df = date_df.withColumn("month_number",from_unixtime(unix_timestamp(col("month"),'MMMM'),'MM'))

In [33]:
date_df.show(5)

+----+------+---+--------+----+------------+
|year| month|day| weekday|hour|month_number|
+----+------+---+--------+----+------------+
|2017|  July| 18| Tuesday|   9|          07|
|2017|  July| 18| Tuesday|  22|          07|
|2017|  July| 20|Thursday|   0|          07|
|2017|  July| 21|  Friday|  19|          07|
|2017|August|  1| Tuesday|   4|          08|
+----+------+---+--------+----+------------+
only showing top 5 rows



In [34]:


dim_date = date_df.withColumn('full_date_time', F.concat('year',F.lit('-'),'month_number',F.lit('-'),'day',F.lit(' '),'hour',F.lit(':00'))).select("full_date_time","year","month","day","hour","weekday")

In [35]:
dim_date.show(5)

+----------------+----+------+---+----+--------+
|  full_date_time|year| month|day|hour| weekday|
+----------------+----+------+---+----+--------+
| 2017-07-18 9:00|2017|  July| 18|   9| Tuesday|
|2017-07-18 22:00|2017|  July| 18|  22| Tuesday|
| 2017-07-20 0:00|2017|  July| 20|   0|Thursday|
|2017-07-21 19:00|2017|  July| 21|  19|  Friday|
|  2017-08-1 4:00|2017|August|  1|   4| Tuesday|
+----------------+----+------+---+----+--------+
only showing top 5 rows



In [36]:
# adding a date id to date dimension table
w = Window.orderBy("year")
dim_date = dim_date.select(row_number().over(w).alias("date_id"),col("*"))

In [37]:
format = "yyyy-MM-dd HH:mm:ss"
dim_date = dim_date.withColumn("full-date_time",unix_timestamp("full_date_time", format).cast("timestamp"))

In [38]:
dim_date = dim_date.select(["date_id","full_date_time","year","month","day","weekday","hour"])

In [39]:
dim_date.count()

8685

In [40]:
dim_date.show(4)

+-------+----------------+----+-----+---+--------+----+
|date_id|  full_date_time|year|month|day| weekday|hour|
+-------+----------------+----+-----+---+--------+----+
|      1| 2017-07-18 9:00|2017| July| 18| Tuesday|   9|
|      2|2017-07-18 22:00|2017| July| 18| Tuesday|  22|
|      3| 2017-07-20 0:00|2017| July| 20|Thursday|   0|
|      4|2017-07-21 19:00|2017| July| 21|  Friday|  19|
+-------+----------------+----+-----+---+--------+----+
only showing top 4 rows



## creating card type dimension

In [41]:
# selecting the required columns
dim_card = file1.select(["card_type"]).distinct()

In [42]:
# adding a card id to card dimension table
w = Window.orderBy("card_type")
dim_card = dim_card.select(row_number().over(w).alias("card_type_id"),col("*"))

In [43]:
dim_card.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|           HÃ¦vekort|
|           5|   HÃ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+



In [44]:
dim_card.count()

12

### Creating Fact table for all atm transation 

In [45]:
# selecting the columns for creating fact table 
fact_atm_trans = file1.select(["year","month","day","weekday","hour","atm_status","atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","currency","card_type","transaction_amount","service","message_code","message_text","weather_lat","weather_lon","weather_city_id","weather_city_name","temp","pressure","humidity","wind_speed","wind_deg","rain_3h","cloud_all","weather_id","weather_main","weather_description"])

In [46]:
# count excluding the header 
fact_atm_trans.count()

2468571

In [47]:
#creating Trans_id for fact table 
fact_rdd=fact_atm_trans.rdd.zipWithIndex().map(lambda (row,rowId) : ( list(row) + [rowId+1]))
fact_atm_trans= fact_atm_trans.withColumn("trans_id",lit("1"))

In [48]:
# converting the rrd to a dataframe 
fact_atm_trans = sqlContext.createDataFrame(fact_rdd,schema= fact_atm_trans.schema).select(["trans_id","year","month","day","weekday","hour","atm_status","atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","currency","card_type","transaction_amount","service","message_code","message_text","weather_lat","weather_lon","weather_city_id","weather_city_name","temp","pressure","humidity","wind_speed","wind_deg","rain_3h","cloud_all","weather_id","weather_main","weather_description"])

In [49]:
# renaming the column header
facts_atm_trans = fact_atm_trans.withColumnRenamed("atm_id","atm_number") \
                .withColumnRenamed('atm_location','location') \
                .withColumnRenamed("atm_streetname","streetname") \
                .withColumnRenamed("atm_street_number","street_number") \
                .withColumnRenamed("atm_zipcode","zipcode") \
                .withColumnRenamed("atm_lat","lat") \
                .withColumnRenamed("atm_lon","lon") 

In [50]:
# joining loation dimension table with fact table 
facts_atm_trans= facts_atm_trans.join(dim_location,on=['location','streetname','street_number','zipcode','lat','lon'],how='left').select("*").drop(*['location','streetname','street_number','zipcode','lat','lon']) 

In [51]:
# joining atm dimension table with fact table 
facts_atm_trans= facts_atm_trans.join(dim_atm,on=["atm_number","atm_manufacturer","location_id"],how='left').select("*").drop(*["atm_number","atm_manufacturer"]) 

In [52]:
# joining date dimension table with fact table 
facts_atm_trans= facts_atm_trans.join(dim_date,on=["year","month","day","weekday","hour"],how='left').select("*").drop(*["full_date_time","year","month","day","weekday","hour"]) 

In [53]:
# joining card dimension table with fact table 
facts_atm_trans= facts_atm_trans.join(dim_card,on=["card_type"],how='left').select("*").drop(*["card_type"]) 

In [54]:
# rearranging the column and selecting required tables
fact_atm_trans = facts_atm_trans.select(["trans_id","atm_id","location_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","cloud_all","weather_id","weather_main","weather_description"])

In [55]:
fact_atm_trans.show(2)

+--------+------+-----------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+---------+----------+------------+-------------------+
|trans_id|atm_id|location_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|cloud_all|weather_id|weather_main|weather_description|
+--------+------+-----------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+---------+----------+------------+-------------------+
|  598777|    53|         94|   8616|           3|    Active|     DKK|Withdrawal|              7993|        null|        null|    0.0|       75|       803|      Clouds|      broken clouds|
|  598943|    53|         94|   8616|           3|    Active|     DKK|Withdrawal|               101|        null|        null|    0.0|       75|       803|      Clouds|      broken clouds|
+--------+------+-----------+-------+------------+-----

In [56]:
# final count of fact table excluding header
fact_atm_trans.count()

2468571

### Loading the dataframe to S3 buckets

In [None]:
dim_location.coalesce(1).write.csv("s3a://devilatom/dim_location/ ",mode='overwrite')

In [61]:
dim_atm.coalesce(1).write.csv("s3a://devilatom/dim_atm ",mode='overwrite')

In [62]:
dim_date.coalesce(1).write.csv("s3a://devilatom/dim_date",mode='overwrite')

In [63]:
dim_card.coalesce(1).write.csv("s3a://devilatom/dim_card ",mode='overwrite')

In [None]:
fact_atm_trans.coalesce(1).write.csv("s3a://devilatom/fact_atm_trans ",mode='overwrite')