# ETL Project- SparNord ATM usage analysis

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
#initiate session
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('ATM_TRANS').master("local").enableHiveSupport().getOrCreate()
spark#df = spark.read.csv("hdfs://user/root/atm_trans/part-m-00000", sep=",").toDF()

In [3]:
#importing required datatypes
from pyspark.sql.types import StructType,StructField,StructType,StringType,IntegerType,ByteType,DoubleType,ShortType,DateType,TimestampType
from pyspark.sql.functions import col,lit

df_schema= StructType([
        StructField('year',StringType(),True),
    StructField('month',StringType(),True),
    StructField('day',StringType(),True),
    StructField('weekday',StringType(),True),
    StructField('hour',StringType(),True),
    StructField('atm_status',StringType(),True),
    StructField('atm_id',ByteType(),True),
    StructField('atm_manufacturer',StringType(),True),
    StructField('atm_location',StringType(),True),
    StructField('atm_street_name',StringType(),True),
    StructField('atm_street_number',ShortType(),True),
    StructField('atm_zipcode',ShortType(),True),
    StructField('atm_lat',DoubleType(),True),
    StructField('atm_lon',DoubleType(),True),
    StructField('currency',StringType(),True),
    StructField('card_type',StringType(),True),
    StructField('transaction_amount',ShortType(),True),
    StructField('service',StringType(),True),
    StructField('message_code',StringType(),True),
    StructField('message_text',StringType(),True),
    StructField('weather_lat',DoubleType(),True),
    StructField('weather_lon',DoubleType(),True),
    StructField('weather_city_id',IntegerType(),True),
    StructField('weather_city_name',StringType(),True),
    StructField('temp',DoubleType(),True),
    StructField('pressure',ShortType(),True),
    StructField('humidity',ShortType(),True),
    StructField('wind_speed',ByteType(),True),
    StructField('wind_deg',ShortType(),True),
    StructField('rain_3h',DoubleType(),True),
    StructField('clouds_all',ByteType(),True),
    StructField('weather_id',IntegerType(),True),
    StructField('weather_main',StringType(),True),
    StructField('weather_description',StringType(),True)
    ])



In [4]:
df = spark.read.csv("/user/root/atm_trans/part-m-00000", sep=",",schema=df_schema) #.toDF('year','month','day','weekday','hour','atm_status','atm_id','atm_manufacturer','atm_location','atm_street_name','atm_street_number','atm_zipcode','atm_lat','atm_lon','currency','card_type','transaction_amount','service','message_code','message_text','weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure','humidity','wind_speed','wind_deg','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [5]:
df.count()

2468572

In [6]:
#creating cardtype dataframe
dim_cardtype=df.select('card_type').distinct()
dim_cardtype.show()

+--------------------+
|           card_type|
+--------------------+
|     Dankort - on-us|
|              CIRRUS|
|         HÃƒÂ¦vekort|
|                VISA|
|  Mastercard - on-us|
|             Maestro|
|Visa Dankort - on-us|
|        Visa Dankort|
|            VisaPlus|
|          MasterCard|
|             Dankort|
| HÃƒÂ¦vekort - on-us|
+--------------------+



In [7]:
#creating card_type dimention
import pandas as pd
import numpy as np
import pyarrow
pd_df= dim_cardtype.select("*").toPandas()
pd_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12 entries, 0 to 11
Data columns (total 1 columns):
card_type    12 non-null object
dtypes: object(1)
memory usage: 168.0+ bytes


In [8]:
pd_df['card_type_id'] = pd_df.index + 1
pd_df

Unnamed: 0,card_type,card_type_id
0,Dankort - on-us,1
1,CIRRUS,2
2,HÃƒÂ¦vekort,3
3,VISA,4
4,Mastercard - on-us,5
5,Maestro,6
6,Visa Dankort - on-us,7
7,Visa Dankort,8
8,VisaPlus,9
9,MasterCard,10


In [9]:
dim_cardtype = spark.createDataFrame(pd_df)
dim_cardtype.count()

12

In [10]:
#Creating location dimension
dim_loc=df.select('atm_location','atm_street_name','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()
dim_loc.show()

+--------------------+----------------+-----------------+-----------+-------+-------+
|        atm_location| atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+--------------------+----------------+-----------------+-----------+-------+-------+
|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|   HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|
|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|
|                 Fur|      StenÃƒÂ¸re|               19|       7884| 56.805|   9.02|
|            Hasseris|     Hasserisvej|              1

In [11]:
pd_loc= dim_loc.select("*").toPandas()

In [12]:
pd_loc['loc_id'] = pd_loc.index + 1
cols = pd_loc.columns.tolist()
col_new=cols[-1:]+cols[:-1]
pd_loc=pd_loc[col_new]
pd_loc.head()

Unnamed: 0,loc_id,atm_location,atm_street_name,atm_street_number,atm_zipcode,atm_lat,atm_lon
0,1,Vadum,Ellehammersvej,43,9430,57.118,9.861
1,2,Slagelse,Mariendals Alle,29,4200,55.398,11.342
2,3,Fredericia,SjÃƒÂ¦llandsgade,33,7000,55.564,9.757
3,4,Kolding,Vejlevej,135,6000,55.505,9.457
4,5,HÃƒÂ¸rning Hallen,Toftevej,53,8362,56.091,10.033


In [13]:
pd_loc.columns = ['loc_id', 'location', 'street_name', 'street_number', 'zipcode', 'lat', 'lon']
dim_loc = spark.createDataFrame(pd_loc)
dim_loc.count()

109

In [14]:
atm_trans=df.select('year','month','day','weekday','hour','atm_status','atm_id','atm_manufacturer','atm_location','atm_street_name','atm_street_number','atm_zipcode','atm_lat','atm_lon','currency','card_type','transaction_amount','service','message_code','message_text','temp','pressure','humidity','wind_speed','wind_deg','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [15]:
#merging location sid's to fact
from pyspark.sql.functions import *
atm_trans= atm_trans.join(dim_loc, (atm_trans.atm_location == dim_loc.location)&(atm_trans.atm_lat == dim_loc.lat)&(atm_trans.atm_lon== dim_loc.lon)&(atm_trans.atm_street_name== dim_loc.street_name)&(atm_trans.atm_street_number== dim_loc.street_number)&(atm_trans.atm_zipcode == dim_loc.zipcode),'left')\
  .select(*[atm_trans.card_type]+[atm_trans.year]+[atm_trans.month]+[atm_trans.day]+[atm_trans.weekday]+[atm_trans.hour]+[atm_trans.atm_id]+[atm_trans.atm_manufacturer]+[atm_trans.atm_status]+[atm_trans.currency]+[atm_trans.service]+[atm_trans.transaction_amount]+[atm_trans.message_code]+[atm_trans.message_text]+[atm_trans.rain_3h]+[atm_trans.clouds_all]+[atm_trans.weather_id]+[atm_trans.weather_main]+[atm_trans.weather_description]+[dim_loc.loc_id])
atm_trans.show()

+--------------------+----+-------+---+-------+----+------+----------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+------+
|           card_type|year|  month|day|weekday|hour|atm_id|atm_manufacturer|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|loc_id|
+--------------------+----+-------+---+-------+----+------+----------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+------+
|  Mastercard - on-us|2017|January|  1| Sunday|   0|    12|             NCR|  Inactive|     DKK|Withdrawal|              1329|        null|        null|   0.59|        92|       500|        Rain|         light rain|    25|
|  Mastercard - on-us|2017|January|  1| Sunday|   0|    12|             NCR|  Inactive|     DKK|Withdrawal| 

In [16]:
atm_trans.select('loc_id').count()

2468572

In [17]:
#creating atm dimension
dim_atm=atm_trans.select('atm_id','atm_manufacturer', 'loc_id').distinct()
pd_atm= dim_atm.select("*").toPandas()
pd_atm.head()

Unnamed: 0,atm_id,atm_manufacturer,loc_id
0,21,NCR,86
1,68,NCR,81
2,12,NCR,25
3,79,NCR,29
4,100,NCR,34


In [18]:
pd_atm.columns=['atm_number','atm_manufacturer','atm_locationid']
pd_atm['atm_sid'] = pd_atm.index + 1
#pd_atm= pd_atm['atm_id','atm_number','atm_manufacturer','atm_locationid']
pd_atm.head()

Unnamed: 0,atm_number,atm_manufacturer,atm_locationid,atm_sid
0,21,NCR,86,1
1,68,NCR,81,2
2,12,NCR,25,3
3,79,NCR,29,4
4,100,NCR,34,5


In [20]:
dim_atm = spark.createDataFrame(pd_atm)
dim_atm.count()

113

In [21]:
#creating date dimension
dim_time=atm_trans.select('year','month', 'day', 'hour', 'weekday').distinct()
pd_time= dim_time.select("*").toPandas()
pd_time.head()

Unnamed: 0,year,month,day,hour,weekday
0,2017,January,10,8,Tuesday
1,2017,January,15,11,Sunday
2,2017,January,26,12,Thursday
3,2017,January,31,14,Tuesday
4,2017,February,11,17,Saturday


In [22]:

pd_time.loc[pd_time['month'] == 'January', 'MM'] = '01'
pd_time.loc[pd_time['month'] == 'February', 'MM'] = '02'
pd_time.loc[pd_time['month'] == 'March', 'MM'] = '03'
pd_time.loc[pd_time['month'] == 'April', 'MM'] = '04'
pd_time.loc[pd_time['month'] == 'May', 'MM'] = '05'
pd_time.loc[pd_time['month'] == 'June', 'MM'] = '06'
pd_time.loc[pd_time['month'] == 'July', 'MM'] = '07'
pd_time.loc[pd_time['month'] == 'August', 'MM'] = '08'
pd_time.loc[pd_time['month'] == 'September', 'MM'] = '09'
pd_time.loc[pd_time['month'] == 'October', 'MM'] = '10'
pd_time.loc[pd_time['month'] == 'November', 'MM'] = '11'
pd_time.loc[pd_time['month'] == 'December', 'MM'] = '12'

In [23]:
pd_time['DD']=pd_time['day']
pd_time.loc[pd_time['day'] == '1', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '2', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '3', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '4', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '5', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '6', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '7', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '8', 'DD'] = '0'+ pd_time['day']
pd_time.loc[pd_time['day'] == '9', 'DD'] = '0'+ pd_time['day']

pd_time['HH']=pd_time['hour']
pd_time.loc[pd_time['hour'] == '0', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '1', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '2', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '3', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '4', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '5', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '6', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '7', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '8', 'HH'] = '0'+ pd_time['hour']
pd_time.loc[pd_time['hour'] == '9', 'HH'] = '0'+ pd_time['hour']


In [24]:
pd_time['timestamp']= pd_time['year']+'-'+pd_time['MM']+'-'+pd_time['DD']+' '+pd_time['HH']+':00:00'
from datetime import datetime
pd_time['timestamp'] = pd.to_datetime(pd_time['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
pd_time.head()

Unnamed: 0,year,month,day,hour,weekday,MM,DD,HH,timestamp
0,2017,January,10,8,Tuesday,1,10,8,2017-01-10 08:00:00
1,2017,January,15,11,Sunday,1,15,11,2017-01-15 11:00:00
2,2017,January,26,12,Thursday,1,26,12,2017-01-26 12:00:00
3,2017,January,31,14,Tuesday,1,31,14,2017-01-31 14:00:00
4,2017,February,11,17,Saturday,2,11,17,2017-02-11 17:00:00


In [25]:
pd_time= pd_time[['year', 'month', 'day', 'hour', 'weekday','timestamp']]
pd_time.sort_values('timestamp',inplace=True)
pd_time.reset_index(inplace=True)
pd_time= pd_time[['year', 'month', 'day', 'hour', 'weekday','timestamp']]
pd_time['time_sid']=pd_time.index + 1
pd_time.head()

Unnamed: 0,year,month,day,hour,weekday,timestamp,time_sid
0,2017,January,1,0,Sunday,2017-01-01 00:00:00,1
1,2017,January,1,1,Sunday,2017-01-01 01:00:00,2
2,2017,January,1,2,Sunday,2017-01-01 02:00:00,3
3,2017,January,1,3,Sunday,2017-01-01 03:00:00,4
4,2017,January,1,4,Sunday,2017-01-01 04:00:00,5


In [27]:
pd_time= pd_time[['time_sid','timestamp','year', 'month', 'day', 'hour', 'weekday']]
dim_time = spark.createDataFrame(pd_time)
dim_time.count()

8685

In [28]:
#merging time sid to fact
atm_trans= atm_trans.join(dim_time, (atm_trans.year == dim_time.year) & (atm_trans.month==dim_time.month)& (atm_trans.day==dim_time.day)& (atm_trans.hour==dim_time.hour),'left')\
  .select(*[atm_trans.card_type]+[atm_trans.atm_id]+[atm_trans.atm_manufacturer]+[atm_trans.atm_status]+[atm_trans.currency]+[atm_trans.service]+[atm_trans.transaction_amount]+[atm_trans.message_code]+[atm_trans.message_text]+[atm_trans.rain_3h]+[atm_trans.clouds_all]+[atm_trans.weather_id]+[atm_trans.weather_main]+[atm_trans.weather_description]+[atm_trans.loc_id]+[dim_time.time_sid])
atm_trans.show()

+--------------------+------+----------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------+--------+
|           card_type|atm_id|atm_manufacturer|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|loc_id|time_sid|
+--------------------+------+----------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------+--------+
|  Mastercard - on-us|    12|             NCR|  Inactive|     DKK|Withdrawal|              8480|        null|        null|   2.47|        92|       500|        Rain|          light rain|    25|    2696|
|  Mastercard - on-us|    43|             NCR|    Active|     DKK|Withdrawal|              9164|        null|        null|    0.0|        90|       500|        Rain|          light rain|  

In [29]:
#merging atm dimension
atm_trans= atm_trans.join(dim_atm, (atm_trans.atm_id == dim_atm.atm_number) & (atm_trans.atm_manufacturer == dim_atm.atm_manufacturer) & (atm_trans.loc_id == dim_atm.atm_locationid),'left')\
  .select(*[atm_trans.card_type]+[atm_trans.atm_status]+[atm_trans.currency]+[atm_trans.service]+[atm_trans.transaction_amount]+[atm_trans.message_code]+[atm_trans.message_text]+[atm_trans.rain_3h]+[atm_trans.clouds_all]+[atm_trans.weather_id]+[atm_trans.weather_main]+[atm_trans.weather_description]+[atm_trans.loc_id]+[atm_trans.time_sid]+[dim_atm.atm_sid])

In [31]:
#creating transaction sid
from pyspark.sql import functions as F
atm_trans= atm_trans.withColumn("trans_sid", F.monotonically_increasing_id())
atm_trans.show()

+--------------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------+--------+-------+---------+
|           card_type|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|loc_id|time_sid|atm_sid|trans_sid|
+--------------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------+--------+-------+---------+
|          MasterCard|    Active|     DKK|Withdrawal|              1501|        null|        null|    0.0|        75|       520|        Rain|light intensity s...|    81|    2804|      2|        0|
|  Mastercard - on-us|    Active|     DKK|Withdrawal|              7344|        null|        null|    0.0|        75|       520|        Rain|light intensity s...|    81|    2804|      2|        1|
|  Mastercard -

In [32]:
#merging cardtype sid
fact_atm_trans= atm_trans.join(dim_cardtype, (atm_trans.card_type == dim_cardtype.card_type),'left')\
  .select(*[atm_trans.trans_sid]+[atm_trans.atm_sid]+[atm_trans.loc_id]+[atm_trans.time_sid]+[dim_cardtype.card_type_id]+[atm_trans.atm_status]+[atm_trans.currency]+[atm_trans.service]+[atm_trans.transaction_amount]+[atm_trans.message_code]+[atm_trans.message_text]+[atm_trans.rain_3h]+[atm_trans.clouds_all]+[atm_trans.weather_id]+[atm_trans.weather_main]+[atm_trans.weather_description])
fact_atm_trans.show()

+---------+-------+------+--------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_sid|atm_sid|loc_id|time_sid|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+---------+-------+------+--------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       75|      2|    81|    5801|           1|    Active|     DKK|Withdrawal|              5191|        null|        null|    0.0|        40|       520|        Rain|light intensity s...|
|       88|      2|    81|    8121|           1|    Active|     DKK|Withdrawal|              3335|        null|        null|    0.0|        40|       521|        Rain|proximity shower ...|
|       95|      2|    81|     701|           1|    Act

In [33]:
#final count
fact_atm_trans.select('*').count()

2468572

In [43]:
#dimension file upload
dim_time.write.csv("s3a://atmtransbucket/time")
dim_loc.write.csv("s3a://atmtransbucket/loc")
dim_atm.write.csv("s3a://atmtransbucket/atm")
dim_cardtype.write.csv("s3a://atmtransbucket/cardtype")

In [42]:
#fact upload
fact_atm_trans.write.option("sep","|").csv("s3a://atmtransbucket/fact_text")