# Initialize Spark

In [1]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')
spark=SparkSession.builder.appName('ETL Project').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/07 00:20:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/07 00:20:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
spark

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.window import Window
import pyspark.sql.functions as f

# Set input and O/P Paths

In [4]:
input_data_path = "/user/hadoop/data_extract/part*"
dim_date_path = "s3://girish-722691534181/atm_data/warehouse/dim_date/"
dim_location_path = "s3://girish-722691534181/atm_data/warehouse/dim_location/"
dim_card_type_path = "s3://girish-722691534181/atm_data/warehouse/dim_card_type/"
dim_atm_path = "s3://girish-722691534181/atm_data/warehouse/dim_atm/"
fact_atm_trans_path = "s3://girish-722691534181/atm_data/warehouse/fact_atm_trans/"

# Function to Save DF

In [5]:
def write_to_s3(df, location):
    # print("writing to parquet")
    # df.write.parquet(location)
    print(f"writing {df.count()} rows to s3(csv)")
    df.write.mode("overwrite").csv(location)
    print("write complete")

## Define Schema and Read Data

In [6]:
schema = StructType([StructField('year',IntegerType(),False),
                     StructField('month',StringType(), False),
                     StructField('day', IntegerType(), False),
                     StructField('weekday', StringType(), False),
                     StructField('hour',IntegerType(),False),
                     StructField('atm_status',StringType(), False),
                     StructField('atm_id', StringType(), False),
                     StructField('atm_manufacturer', StringType(), False),
                     StructField('atm_location', StringType(), False),
                     StructField('atm_streetname',StringType(),False),
                     StructField('atm_street_number', IntegerType(), False),
                     StructField('atm_zipcode',IntegerType(), False),
                     StructField('atm_lat', FloatType(), False),
                     StructField('atm_lon', FloatType(), False),
                     StructField('currency', StringType(), False),
                     StructField('card_type',StringType(),False),
                     StructField('transaction_amount',IntegerType(), False),
                     StructField('service', StringType(), False),
                     StructField('message_code', StringType(), True),
                     StructField('message_text', StringType(), True),
                     StructField('weather_lat',FloatType(),False),
                     StructField('weather_lon',FloatType(), False),
                     StructField('weather_city_id', IntegerType(), False),
                     StructField('weather_city_name', StringType(), False),
                     StructField('temp', FloatType(), False),
                     StructField('pressure',IntegerType(),False),
                     StructField('humidity',IntegerType(), False),
                     StructField('wind_speed', IntegerType(), False),
                     StructField('wind_deg', IntegerType(), False),
                     StructField('rain_3h', FloatType(), True),
                     StructField('clouds_all', IntegerType(), False),
                     StructField('weather_id', IntegerType(), False),
                     StructField('weather_main', StringType(), False),
                     StructField('weather_description', StringType(), False)]
                     )

In [7]:
df = spark.read.csv(input_data_path, header=False, schema=schema, encoding="utf-8")

In [8]:
# Repartition and Cache Data to improve performance
df=df.repartition(4)
df.cache()

DataFrame[year: int, month: string, day: int, weekday: string, hour: int, atm_status: string, atm_id: string, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: float, atm_lon: float, currency: string, card_type: string, transaction_amount: int, service: string, message_code: string, message_text: string, weather_lat: float, weather_lon: float, weather_city_id: int, weather_city_name: string, temp: float, pressure: int, humidity: int, wind_speed: int, wind_deg: int, rain_3h: float, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

In [9]:
print(f"Input Count {df.count()}")

[Stage 1:>                                                          (0 + 4) / 4]

Input Count 2468572


                                                                                

## Create DIM_DATE

In [10]:
dates_dim_df = df.select(["year","month","day","weekday","hour"]).distinct()

In [11]:
dates_dim_df =  dates_dim_df.withColumn("day",f.when(f.col("day")>=10,f.col("day").cast(StringType())).otherwise(f.concat(f.lit("0"),
                                                                                                         f.col("day").cast(StringType()))))

In [12]:
dates_dim_df = dates_dim_df.withColumn("full_date_time",f.to_timestamp(f.concat(f.col("year").cast(StringType()),f.lit("-"),
                                                                    f.col("month").cast(StringType()),f.lit("-"),
                                                                     f.col("day").cast(StringType()),f.lit(" "),
                                                                     f.col("hour").cast(StringType())),
                                                            "yyyy-MMMM-dd H"))
                             

In [13]:
windowSpec  = Window.partitionBy().orderBy(f.col("full_date_time").asc())
dates_dim_df = dates_dim_df.withColumn("date_id", f.row_number().over(windowSpec))

In [14]:
dates_dim_df = dates_dim_df.select(["date_id","full_date_time","year","month","day","hour","weekday"])

In [15]:
dates_dim_df.cache()
print(f"dates_dim_df Count {dates_dim_df.count()}")

dates_dim_df Count 8685


In [16]:
dates_dim_df.show()

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January| 01|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January| 01|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January| 01|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January| 01|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January| 01|   4| Sunday|
|      6|2017-01-01 05:00:00|2017|January| 01|   5| Sunday|
|      7|2017-01-01 06:00:00|2017|January| 01|   6| Sunday|
|      8|2017-01-01 07:00:00|2017|January| 01|   7| Sunday|
|      9|2017-01-01 08:00:00|2017|January| 01|   8| Sunday|
|     10|2017-01-01 09:00:00|2017|January| 01|   9| Sunday|
|     11|2017-01-01 10:00:00|2017|January| 01|  10| Sunday|
|     12|2017-01-01 11:00:00|2017|January| 01|  11| Sunday|
|     13|2017-01-01 12:00:00|2017|January| 01|  12| Sunday|
|     14|2017-01-01 13:00:00|2017|Januar

## Create DIM_LOCATION 

In [17]:
location_dim_df = df.select(["atm_location","atm_streetname","atm_street_number",
                             "atm_zipcode","atm_lat","atm_lon"]).distinct()

In [18]:
windowSpec  = Window.partitionBy().orderBy(*[f.asc(c) for c in ["atm_location","atm_streetname","atm_street_number",
                             "atm_zipcode","atm_lat","atm_lon"]])
location_dim_df = location_dim_df.withColumn("location_id", f.row_number().over(windowSpec))
location_dim_df = location_dim_df.withColumnRenamed("atm_location","location")
location_dim_df = location_dim_df.withColumnRenamed("atm_streetname","streetname")
location_dim_df = location_dim_df.withColumnRenamed("atm_street_number","street_number")
location_dim_df = location_dim_df.withColumnRenamed("atm_zipcode","zipcode")
location_dim_df = location_dim_df.withColumnRenamed("atm_lat","lat")
location_dim_df = location_dim_df.withColumnRenamed("atm_lon","lon")

In [19]:
location_dim_df = location_dim_df.select(["location_id","location","streetname",
                                          "street_number","zipcode","lat","lon"])

In [20]:
print(f"location_dim_df Count {location_dim_df.count()}")

location_dim_df Count 109


In [21]:
location_dim_df.show()

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          1|             Aabybro|      ÃƒËœstergade|            6|   9440|57.162|  9.73|
|          2|      Aalborg Hallen|      Europa Plads|            4|   9000|57.044| 9.913|
|          3|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          4|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          5|         Aalborg Syd|          Hobrovej|          440|   9200|57.005| 9.881|
|          6|           AalbÃƒÂ¦k|        Centralvej|            5|   9982|57.593|10.412|
|          7|              Aarhus|        Ceres Byen|           75|   8000|56.157|10.194|
|          8|              Aarhus|    SÃƒÂ¸nder Alle|           11|   8000|56.153|10.206|
|         

## Create DIM_CARD_TYPE

In [22]:
card_type_dim_df = df.select(["card_type"]).distinct()
windowSpec  = Window.partitionBy().orderBy(f.col("card_type").asc())
card_type_dim_df = card_type_dim_df.withColumn("card_type_id", f.row_number().over(windowSpec))
card_type_dim_df = card_type_dim_df.select(["card_type_id","card_type"])

In [23]:
card_type_dim_df.cache()
print(f"card_type_dim_df Count {card_type_dim_df.count()}")

card_type_dim_df Count 12


In [24]:
card_type_dim_df.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|         HÃƒÂ¦vekort|
|           5| HÃƒÂ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+



## Create DIM_ATM

In [25]:
atm_dim_df = df.select(["atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number",
                             "atm_zipcode","atm_lat","atm_lon"]).distinct()

In [26]:
atm_dim_df.count()

                                                                                

113

In [27]:
atm_dim_df = atm_dim_df.withColumnRenamed("atm_id","atm_number")

# rename cols to match location_dim_df
atm_dim_df = atm_dim_df.withColumnRenamed("atm_location","location")
atm_dim_df = atm_dim_df.withColumnRenamed("atm_streetname","streetname")
atm_dim_df = atm_dim_df.withColumnRenamed("atm_street_number","street_number")
atm_dim_df = atm_dim_df.withColumnRenamed("atm_zipcode","zipcode")
atm_dim_df = atm_dim_df.withColumnRenamed("atm_lat","lat")
atm_dim_df = atm_dim_df.withColumnRenamed("atm_lon","lon")

In [28]:
# Join with location_dim_df
atm_dim_df = atm_dim_df.join(location_dim_df,["location","streetname","street_number",
                             "zipcode","lat","lon"],"inner")

In [29]:
# Drop cols used for join, as location_id represents all the other dropped cols
atm_dim_df = atm_dim_df.drop(*["location","streetname","street_number",
                             "zipcode","lat","lon"])

In [30]:
atm_dim_df = atm_dim_df.withColumnRenamed("location_id","atm_location_id")

In [31]:
# Create Primary key
windowSpec  = Window.partitionBy().orderBy(*[f.asc("atm_number"),
                                             f.asc("atm_manufacturer"),
                                             f.asc("atm_location_id")])
atm_dim_df = atm_dim_df.withColumn("atm_id", f.row_number().over(windowSpec))

In [32]:
atm_dim_df = atm_dim_df.select(["atm_id","atm_number","atm_manufacturer","atm_location_id"])

In [33]:
atm_dim_df.show()

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             75|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       101|             NCR|             17|
|     5|       102|             NCR|              3|
|     6|       103| Diebold Nixdorf|            103|
|     7|       104|             NCR|             58|
|     8|       105| Diebold Nixdorf|             76|
|     9|       106|             NCR|             55|
|    10|       107| Diebold Nixdorf|             62|
|    11|       108|             NCR|             47|
|    12|       109| Diebold Nixdorf|              5|
|    13|        11|             NCR|             80|
|    14|       110| Diebold Nixdorf|             41|
|    15|       111| Diebold Nixdorf|              7|
|    16|       112| Diebold Nixdorf|          

In [34]:
print(f"atm_dim_df Count {atm_dim_df.count()}")

atm_dim_df Count 113


## Create FACT_ATM_TRANS

In [35]:
fact_atm_trans_df = df

In [36]:
dim_loc_cols = ["location","streetname","street_number", "zipcode","lat","lon"]
dim_atm_cols = ["atm_number","atm_manufacturer","atm_location_id"]

In [37]:
date_cols = ["year","month","day","weekday","hour"]
atm_location_cols = ["atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon"]
atm_details_cols = ["atm_id","atm_manufacturer"]
weather_cols = ["weather_city_name","weather_lat","weather_lon","weather_city_id"]

### Join with Dim Date to get "date_id"

In [38]:
fact_atm_trans_df = fact_atm_trans_df.join(dates_dim_df,date_cols,"inner")

In [39]:
fact_atm_trans_df = fact_atm_trans_df.drop(*date_cols)

In [40]:
print(f"facts Count after joining with date Count {fact_atm_trans_df.count()}")

facts Count after joining with date Count 2468572


### Join with Dim Location to get "weather_loc_id"

In [41]:
fact_atm_trans_df = fact_atm_trans_df.join(location_dim_df,
                                           (fact_atm_trans_df["weather_city_name"]==location_dim_df["location"]) &
                                           (fact_atm_trans_df["weather_lat"]==location_dim_df["lat"]) &
                                           (fact_atm_trans_df["weather_lon"]==location_dim_df["lon"]),"left")

In [42]:
fact_atm_trans_df = fact_atm_trans_df.drop(*weather_cols)
fact_atm_trans_df = fact_atm_trans_df.drop(*dim_loc_cols)
fact_atm_trans_df = fact_atm_trans_df.withColumnRenamed("location_id","weather_loc_id")

In [43]:
print(f"facts Count after joining with dim_location Count {fact_atm_trans_df.count()}")

                                                                                

facts Count after joining with dim_location Count 2468572


### Join with Dim Location and Dim ATM to get "atm_id"

In [44]:
atm_loc_df = atm_dim_df.join(location_dim_df,atm_dim_df["atm_location_id"]==location_dim_df["location_id"],"inner")
print(f"atm_loc_df Count {atm_loc_df.count()}")

atm_loc_df Count 113


In [45]:
fact_atm_trans_df = fact_atm_trans_df.withColumnRenamed("atm_id","atm_number")
fact_atm_trans_df = fact_atm_trans_df.join(atm_loc_df,
                                           (fact_atm_trans_df["atm_location"]==atm_loc_df["location"]) & 
                                           (fact_atm_trans_df["atm_streetname"]==atm_loc_df["streetname"]) &
                                           (fact_atm_trans_df["atm_street_number"]==atm_loc_df["street_number"]) & 
                                           (fact_atm_trans_df["atm_zipcode"]==atm_loc_df["zipcode"]) &
                                           (fact_atm_trans_df["atm_lat"]==atm_loc_df["lat"]) &
                                           (fact_atm_trans_df["atm_lon"]==atm_loc_df["lon"]) &
                                           (fact_atm_trans_df["atm_number"]==atm_loc_df["atm_number"]) &
                                           (fact_atm_trans_df["atm_manufacturer"]==atm_loc_df["atm_manufacturer"]),
                                          "inner")

In [46]:
print(f"facts Count after joining with atm_loc_df {fact_atm_trans_df.count()}")

                                                                                

facts Count after joining with atm_loc_df 2468572


### Join with Dim Card Type to get card_type_id

In [47]:
fact_atm_trans_df = fact_atm_trans_df.join(card_type_dim_df,["card_type"],"inner")
fact_atm_trans_df = fact_atm_trans_df.drop(*["card_type"])
print(f"facts Count after joining with dim_card_type {fact_atm_trans_df.count()}")

facts Count after joining with dim_card_type 2468572


In [48]:
len(fact_atm_trans_df.columns)

39

In [49]:
fact_atm_trans_df.columns

['atm_status',
 'atm_number',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'date_id',
 'full_date_time',
 'weather_loc_id',
 'atm_id',
 'atm_number',
 'atm_manufacturer',
 'atm_location_id',
 'location_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'card_type_id']

In [50]:
fact_cols = ["trans_id","atm_id","weather_loc_id","date_id","card_type_id",
             "atm_status","currency","service","transaction_amount","message_code",
            "message_text","rain_3h","clouds_all","weather_id",
            "weather_main","weather_description"]

In [51]:
windowSpec  = Window.partitionBy().orderBy(*[f.asc(c) for c in ["atm_id","weather_loc_id","date_id","card_type_id",
             "atm_status","currency","service","transaction_amount","message_code",
            "message_text","rain_3h","clouds_all","weather_id",
            "weather_main","weather_description"]])

In [52]:
fact_atm_trans_df = fact_atm_trans_df.withColumn("trans_id", f.row_number().over(windowSpec))

In [53]:
fact_atm_trans_df = fact_atm_trans_df.select(fact_cols)

In [54]:
fact_atm_trans_df.show()

                                                                                

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|     1|          null|      1|           7|    Active|     DKK|Withdrawal|              5643|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       2|     1|          null|      2|           7|    Active|     DKK|Withdrawal|              4586|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       3|     1|          null|   

# Save Dataframes to S3

In [55]:
write_to_s3(dates_dim_df, dim_date_path)
write_to_s3(location_dim_df, dim_location_path)
write_to_s3(atm_dim_df, dim_atm_path)
write_to_s3(card_type_dim_df, dim_card_type_path)
write_to_s3(fact_atm_trans_df, fact_atm_trans_path)

# Validating input and Transformed Data

In [56]:
df.select(f.sum("transaction_amount")).show()

+-----------------------+
|sum(transaction_amount)|
+-----------------------+
|            12352186559|
+-----------------------+



In [57]:
fact_atm_trans_df.select(f.sum("transaction_amount")).show()

                                                                                

+-----------------------+
|sum(transaction_amount)|
+-----------------------+
|            12352186559|
+-----------------------+



In [60]:
location_dim_df.filter(f.col("location")=='Vejgaard').show()

+-----------+--------+----------+-------------+-------+------+----+
|location_id|location|streetname|street_number|zipcode|   lat| lon|
+-----------+--------+----------+-------------+-------+------+----+
|        103|Vejgaard|Hadsundvej|           20|   9000|57.043|9.95|
+-----------+--------+----------+-------------+-------+------+----+



In [62]:
atm_dim_df.filter(f.col("atm_location_id")==103).show()

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     6|       103| Diebold Nixdorf|            103|
|    26|         2|             NCR|            103|
+------+----------+----------------+---------------+



In [67]:
df.filter(f.col("atm_id")==40).select("atm_location","atm_manufacturer").distinct().show()

+-------------+----------------+
| atm_location|atm_manufacturer|
+-------------+----------------+
|Frederikshavn| Diebold Nixdorf|
+-------------+----------------+

