## Previous steps performed
Data has been ingested to HDFS using the "sqoop import" statement.

## PySpark Code
Read data in HDFS to Spark

#### Required Imports

In [1]:
import os
import sys

In [2]:
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql import functions as sf

In [4]:
conf = SparkConf().setAppName("jupyter_Spark").setMaster("yarn-client")
sc = SparkContext.getOrCreate(conf=conf)
sc

#### Creation of Schema

In [5]:
fileSchema = StructType([StructField('year', IntegerType(),False),
                        StructField('month', StringType(),False),
                        StructField('day', IntegerType(),False),
                        StructField('weekday', StringType(),False),
                        StructField('hour', IntegerType(),False),
                        StructField('atm_status', StringType(),False),
                        StructField('atm_id', StringType(),False),
                        StructField('atm_manufacturer', StringType(),False),
                        StructField('atm_location', StringType(),False),
                        StructField('atm_streetname', StringType(),False),
                        StructField('atm_street_number', IntegerType(),False),
                        StructField('atm_zipcode', IntegerType(),False),
                        StructField('atm_lat', DoubleType(),False),
                        StructField('atm_lon', DoubleType(),False),
                        StructField('currency', StringType(),False),
                        StructField('card_type', StringType(),False),
                        StructField('transaction_amount', IntegerType(),False), 
                        StructField('service', StringType(),False),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),False),
                        StructField('weather_lon', DoubleType(),False),
                        StructField('weather_city_id', IntegerType(),False),
                        StructField('weather_city_name', StringType(),False), 
                        StructField('temp', DoubleType(),False),
                        StructField('pressure', IntegerType(),False), 
                        StructField('humidity', IntegerType(),False), 
                        StructField('wind_speed', IntegerType(),False), 
                        StructField('wind_deg', IntegerType(),False), 
                        StructField('rain_3h', DoubleType(),True), 
                        StructField('clouds_all', IntegerType(),False), 
                        StructField('weather_id', IntegerType(),False), 
                        StructField('weather_main', StringType(),False), 
                        StructField('weather_description', StringType(),False), 
                        ])

In [6]:
spark = SparkSession.builder.appName('jupyter_Spark').master("local").getOrCreate()
spark

#### Import data from HDFS and describe the data

In [7]:
files = spark.read.csv("hdfs:/user/root/etlassignment", header = False, schema = fileSchema)
files.count()

2468572

In [8]:
files.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [9]:
files.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

#### Creation of Location Dimension table

In [10]:
# Identifying Location Dimension table fields
dim1_loc= files.select('atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','atm_id')
#De-duplicate data based on Location parameters
DIM_LOCS = dim1_loc.dropDuplicates((['atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']))
dim1_loc.show()

+------------------+--------------------+-----------------+-----------+-------+-------+------+
|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_id|
+------------------+--------------------+-----------------+-----------+-------+-------+------+
|        NÃƒÂ¦stved|         Farimagsvej|                8|       4700| 55.233| 11.763|     1|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|     2|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|     2|
|             Ikast| RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|     3|
|        Svogerslev|        BrÃƒÂ¸nsager|                1|       4000| 55.634| 12.018|     4|
|              Nibe|              Torvet|                1|       9240| 56.983|  9.639|     5|
|        Fredericia|    SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|     6|
|         Hjallerup|   Hjallerup Centret|         

In [11]:
DIM_LOCS.count()

109

In [12]:
DIM_LOC = DIM_LOCS.withColumn(
    "location_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [13]:
#Sorting location dimension table by location id
DIM_LOC.orderBy('location_id', ascending=True)

DataFrame[atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, atm_id: string, location_id: int]

In [14]:
DIM_LOC.show(5)

+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_id|location_id|
+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|    33|          1|
|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|    31|          2|
|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|     6|          3|
|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|   108|          4|
|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|    19|          5|
+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
only showing top 5 rows



#### Creation of ATM Dimension table

In [15]:
dim2_atm= files.select('atm_lat','atm_lon','atm_id','atm_manufacturer')
#De-Duplicating Data based on ATM parameters
DIM_ATM = dim2_atm.dropDuplicates((['atm_id']))

In [16]:
DIM_ATM.count()

113

In [17]:
dim2_atm.registerTempTable("atm")
DIM_LOC.registerTempTable("loc")

In [18]:
## Creating a temporary table to join with location table to copy the atm and location dimension tables.
DIM_ATM_1= spark.sql("select atm.atm_id,atm.atm_manufacturer,loc.*from atm left join loc on atm.atm_lat==loc.atm_lat and atm.atm_lon==loc.atm_lon").distinct()

In [19]:
DIM_ATM_1.count()

156

In [20]:
DIM_ATM_2 =  DIM_ATM_1.withColumn(
    "atm_prim_id",row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [21]:
#Sorting ATM dimension table by primary index
DIM_ATM_2.orderBy('atm_prim_id', ascending=True)

DataFrame[atm_id: string, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, atm_id: string, location_id: int, atm_prim_id: int]

In [22]:
DIM_ATM_2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- atm_prim_id: integer (nullable = true)



In [23]:
DIM_ATM = DIM_ATM_2.select('atm_prim_id','atm.atm_id','atm_manufacturer','location_id')
DIM_ATM.count()

156

In [24]:
DIM_ATM.show()

+-----------+------+----------------+-----------+
|atm_prim_id|atm_id|atm_manufacturer|location_id|
+-----------+------+----------------+-----------+
|          1|    56| Diebold Nixdorf|         61|
|          2|    53|             NCR|         60|
|          3|     6|             NCR|          3|
|          4|    87|             NCR|         83|
|          5|    21|             NCR|         25|
|          6|    43|             NCR|          6|
|          7|    73|             NCR|         85|
|          8|    36|             NCR|        109|
|          9|    22|             NCR|         54|
|         10|    93|             NCR|         55|
|         11|    85| Diebold Nixdorf|         10|
|         12|    64|             NCR|         62|
|         13|    40| Diebold Nixdorf|         92|
|         14|    98|             NCR|         94|
|         15|   102|             NCR|         66|
|         16|    17|             NCR|         48|
|         17|    44|             NCR|         57|


#### Creation of Date Dimension table

In [25]:
# Identifying DATE dimension columns from source data
dim3_date= files.select('year','month','day','hour','weekday')
#Converting Month from String to Numeric
date_conv = dim3_date.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))

In [26]:
date_conv.show()

+----+-----+---+----+-------+
|year|month|day|hour|weekday|
+----+-----+---+----+-------+
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
+----+-----+---+----+-------+
only showing top 20 rows



In [27]:
date_conv = date_conv.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))
date_conv.show()

+----+-----+---+----+-------+-----------------+
|year|month|day|hour|weekday|   full_date_time|
+----+-----+---+----+-------+-----------------+
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0

In [28]:
date_conv2 = date_conv.withColumn('full_date_time', unix_timestamp(date_conv['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))
date_conv2.show()

+----+-----+---+----+-------+-------------------+
|year|month|day|hour|weekday|     full_date_time|
+----+-----+---+----+-------+-------------------+
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|


In [29]:
date_conv2.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- full_date_time: timestamp (nullable = true)



In [30]:
#De-Duplicating Data based on DATE parameters
DIM_DATE = date_conv2.dropDuplicates((['full_date_time']))
DIM_DATE.count()

8685

In [31]:
#Adding primary key
DIM_DATE = DIM_DATE.withColumn(
    "date_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)
DIM_DATE.count()

8685

In [32]:
DIM_DATE.show()

+----+-----+---+----+---------+-------------------+-------+
|year|month|day|hour|  weekday|     full_date_time|date_id|
+----+-----+---+----+---------+-------------------+-------+
|2017|   01|  5|   1| Thursday|2017-01-05 01:00:00|      1|
|2017|   01| 11|   8|Wednesday|2017-01-11 08:00:00|      2|
|2017|   01| 21|  22| Saturday|2017-01-21 22:00:00|      3|
|2017|   02|  7|  11|  Tuesday|2017-02-07 11:00:00|      4|
|2017|   02|  9|   7| Thursday|2017-02-09 07:00:00|      5|
|2017|   02|  9|  22| Thursday|2017-02-09 22:00:00|      6|
|2017|   02| 16|   5| Thursday|2017-02-16 05:00:00|      7|
|2017|   02| 23|  18| Thursday|2017-02-23 18:00:00|      8|
|2017|   02| 25|  11| Saturday|2017-02-25 11:00:00|      9|
|2017|   02| 26|  13|   Sunday|2017-02-26 13:00:00|     10|
|2017|   03| 12|  13|   Sunday|2017-03-12 13:00:00|     11|
|2017|   04|  1|   8| Saturday|2017-04-01 08:00:00|     12|
|2017|   04|  6|  14| Thursday|2017-04-06 14:00:00|     13|
|2017|   04| 21|  17|   Friday|2017-04-2

#### Creation of Card Type Dimension table

In [33]:
# Identifying CARD TYPE Dimension table From source data
dim4_card= files.select('card_type')
#De-Duplicating Data based on DATE parameters
DIM_CARD = dim4_card.dropDuplicates((['card_type']))

In [34]:
DIM_CARD = DIM_CARD.withColumn(
    "card_type_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)
DIM_CARD.orderBy('card_type_id', ascending=True)

DataFrame[card_type: string, card_type_id: int]

In [35]:
DIM_CARD.count()

12

In [36]:
DIM_CARD.show()

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           1|
|              CIRRUS|           2|
|         HÃƒÂ¦vekort|           3|
|                VISA|           4|
|  Mastercard - on-us|           5|
|             Maestro|           6|
|Visa Dankort - on-us|           7|
|        Visa Dankort|           8|
|            VisaPlus|           9|
|          MasterCard|          10|
|             Dankort|          11|
| HÃƒÂ¦vekort - on-us|          12|
+--------------------+------------+



In [37]:
ff = files.join(DIM_ATM_1,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')
ff.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [38]:
ff.count()

2468572

In [39]:
ff.filter(ff.location_id.isNotNull()).count()

2468572

In [40]:
#Renaming location_id in ATM dimension table for convenience to avoid confusion
ff = ff.withColumnRenamed('location_id','location_id_atm')

In [41]:
ff_1 = ff.join(DIM_ATM_2,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')

In [42]:
ff_1.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [43]:
ff_1.count()

2468572

In [44]:
ff_1.filter(ff_1.atm_prim_id.isNotNull()).count()

2468572

In [45]:
ff_1 = ff_1.drop('location_id_atm')
ff_1.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [46]:
ff1 = ff_1.withColumnRenamed('card_type','card_types')
files_index = ff1.join(DIM_CARD, ff1.card_types == DIM_CARD.card_type,how='LEFT')
files_index.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_types: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

In [47]:
files_index.select('year', 'month', 'card_type', 'card_type_id','card_types').show(5)

+----+-------+---------------+------------+---------------+
|year|  month|      card_type|card_type_id|     card_types|
+----+-------+---------------+------------+---------------+
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
+----+-------+---------------+------------+---------------+
only showing top 5 rows



In [48]:
files_index = files_index.drop('card_type')
files_index= files_index.withColumnRenamed('card_types','card_type')
files_index.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

#### Check point to make sure no previous data is disturbed.

In [49]:
DIM_LOC.printSchema()

root
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- location_id: integer (nullable = true)



In [50]:
DIM_LOC.count()

109

In [51]:
files_index2 = files_index.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))
files_index2 = files_index2.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))
files_index2 = files_index2.withColumn('full_date_time', unix_timestamp(files_index2['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [52]:
files_index2 = files_index2.withColumnRenamed('year','ryear')
files_index2 = files_index2.withColumnRenamed('month','rmonth')
files_index2 = files_index2.withColumnRenamed('day','rday')
files_index2 = files_index2.withColumnRenamed('weekday','rweekday')
files_index2 = files_index2.withColumnRenamed('hour','rhour')
files_index2 = files_index2.withColumnRenamed('full_date_time','rfull_date_time')
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [53]:
files_index2 = files_index2.join(DIM_DATE, files_index2.rfull_date_time == DIM_DATE.full_date_time,how='LEFT') 
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [54]:
files_index2.count()

2468572

In [55]:
files_index2 = files_index2.drop('year','month','day','weekday','hour','full_date_time')
files_index2 = files_index2.withColumnRenamed('ryear','year')
files_index2 = files_index2.withColumnRenamed('rmonth','month')
files_index2 = files_index2.withColumnRenamed('rday','day')
files_index2 = files_index2.withColumnRenamed('rweekday','weekday')
files_index2 = files_index2.withColumnRenamed('rhour','hour')
files_index2 = files_index2.withColumnRenamed('rfull_date_time','full_date_time')
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [56]:
files_index2 = files_index2.drop('full_date_time')
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [57]:
files_index2 = files_index2.withColumn(
    "trans_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)
FACT_ATM_TRANS = files_index2.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text',
                                     'rain_3h','clouds_all','weather_id','weather_main','weather_description')
FACT_ATM_TRANS.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_prim_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



In [58]:
FACT_ATM_TRANS.count()

2468572

In [59]:
FACT_ATM_TRANS.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status').show(10)

+--------+-----------+-----------+-------+------------+----------+
|trans_id|atm_prim_id|location_id|date_id|card_type_id|atm_status|
+--------+-----------+-----------+-------+------------+----------+
|       1|         27|         99|      1|          10|    Active|
|       2|         63|         16|      1|          10|    Active|
|       3|         41|         32|      2|           3|    Active|
|       4|         36|         65|      2|           5|    Active|
|       5|         74|         22|      2|           5|    Active|
|       6|        100|         87|      2|           5|    Active|
|       7|        100|         87|      2|           5|    Active|
|       8|        155|         33|      2|           5|    Active|
|       9|         27|         99|      2|           5|    Active|
|      10|         63|         16|      2|           5|    Active|
+--------+-----------+-----------+-------+------------+----------+
only showing top 10 rows



In [60]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.atm_prim_id.isNotNull()).count()

2468572

In [61]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.location_id.isNotNull()).count()

2468572

In [62]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.date_id.isNotNull()).count()

2468572

In [63]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.card_type_id.isNotNull()).count()

2468572

### Copying FACT & DIMS to S3 bucket "redshiftetlproject"

In [64]:
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId","AKIA476ZL7KITD6C2ESI")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey","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")

In [69]:
DIM_LOC.write.format('csv').option('header','true').save('s3://sgetlproject/ETL/DIM_LOC/DIM_LOC.csv',mode='overwrite')

In [70]:
DIM_ATM.write.format('csv').option('header','true').save('s3://sgetlproject/ETL/DIM_ATM/DIM_ATM.csv',mode='overwrite')

In [71]:
DIM_DATE.write.format('csv').option('header','true').save('s3://sgetlproject/ETL/DIM_DATE/DIM_DATE.csv',mode='overwrite')

In [72]:
DIM_CARD.write.format('csv').option('header','true').save('s3://sgetlproject/ETL/DIM_CARD/DIM_CARD.csv',mode='overwrite')

In [73]:
FACT_ATM_TRANS.coalesce(2).write.format('csv').save('s3://sgetlproject/ETL/FACT_ATM_TRANS/FACT_ATM_TRANS.csv',header='true')