## Building environment  

In [1]:
## Importing necessary modules/lib
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, FloatType
from pyspark.sql.functions import col,lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import concat
from pyspark.sql.functions import lpad

In [3]:
# Starting/checking session with app name "elt_project"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETL').master("local").getOrCreate()
spark


In [4]:
# reading dataframe from hdfs
# naming dataframe as df
df = spark.read.csv("/user/root/etl_atm_project/part-m-00000", inferSchema = True)

In [5]:
# checking first row 
df.show(1)

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

### Inferences 
- As seen above no headers for columns 

In [6]:
# Checking schema
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

In [7]:
# Checking count thus vaildating from the document provided  
df.count()

2468572

In [8]:
# creating new schema as 'dfschema' with respective structypes and naming the coloumns as well
dfschema = StructType([StructField('year', IntegerType(),True),
                                    StructField('month', StringType(),True),
                                    StructField('day', IntegerType(),True),
                                    StructField('weekday', StringType(),True),
                                    StructField('hour', IntegerType(),True),
                                    StructField('atm_status', StringType(),True),
                                    StructField('atm_id', StringType(),True),
                                    StructField('atm_manufacturer', StringType(),True),
                                    StructField('atm_location', StringType(),True),
                                    StructField('atm_streetname', StringType(),True),
                                    StructField('atm_street_number', IntegerType(),True),
                                    StructField('atm_zipcode', IntegerType(),True),
                                    StructField('atm_lat', FloatType(),True),
                                    StructField('atm_lon', FloatType(),True),
                                    StructField('currency', StringType(),True),
                                    StructField('card_type', StringType(),True),
                                    StructField('transaction_amount', IntegerType(),True),
                                    StructField('service', StringType(),True),
                                    StructField('message_code', StringType(),True),
                                    StructField('message_text', StringType(),True),
                                    StructField('weather_lat', FloatType(),True),
                                    StructField('weather_lon', FloatType(),True),
                                    StructField('weather_city_id', IntegerType(),True),
                                    StructField('weather_city_name', StringType(),True),
                                    StructField('temp', FloatType(),True),
                                    StructField('pressure', IntegerType(),True),
                                    StructField('humidity', IntegerType(),True),
                                    StructField('wind_speed', IntegerType(),True),
                                    StructField('wind_deg', IntegerType(),True),
                                    StructField('rain_3h', FloatType(),True),
                                    StructField('clouds_all', IntegerType(),True),
                                    StructField('weather_id', IntegerType(),True),
                                    StructField('weather_main', StringType(),True),
                                    StructField('weather_description', StringType(),True),])

                                    
                                    

In [9]:
# Reading the file with dfschema
input_df = spark.read.csv("/user/root/etl_atm_project/part-m-00000", schema = dfschema)

In [10]:
# checking top 5 rows
input_df.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [11]:
# Checking list of columns
input_df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [12]:
# Rechecking schema
input_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string

In [13]:
## Checking for null values in weather_main
input_df.filter(input_df.weather_main.isNull()).count()

8087

In [14]:
# craeting a function to check null values in dataframe 
import pyspark.sql.functions as F
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [15]:
# checking null values 
count_missings(input_df)

Unnamed: 0,count
message_text,2459010
message_code,2459009
pressure,8087
weather_lat,8087
weather_lon,8087
weather_city_id,8087
weather_city_name,8087
temp,8087
weather_description,8087
wind_speed,8087


### Inferences 
- As seen above few columns contains null values but data dict says only message columns will have null values depicting no error was there but apart from that i suppose thats data quality issue beacsue same problem is faced by everyone 

# Creating dimension tables 

### Creating location dimension


In [16]:
# Reading respective columns for location dimension from input_df

dim_location = input_df.select([input_df.atm_location.alias("location"),input_df.atm_streetname.alias("streetname"),input_df.atm_street_number.alias("street_number"),input_df.atm_zipcode.alias("zipcode"),input_df.atm_lat.alias("lat"),input_df.atm_lon.alias("lon")]).distinct()

In [17]:
# Checking top two rows 

dim_location.show(2)

+--------------------+-----------+-------------+-------+------+-----+
|            location| streetname|street_number|zipcode|   lat|  lon|
+--------------------+-----------+-------------+-------+------+-----+
|Aalborg Storcente...|   Hobrovej|          452|   9200|57.005|9.876|
|            Hasseris|Hasserisvej|          113|   9000|57.044|9.898|
+--------------------+-----------+-------------+-------+------+-----+
only showing top 2 rows



In [18]:
# Checking count of location table
dim_location.count()

109

In [19]:
# checking top 5 rows
dim_location.show(5, truncate = False)

+--------------------------+-----------+-------------+-------+------+------+
|location                  |streetname |street_number|zipcode|lat   |lon   |
+--------------------------+-----------+-------------+-------+------+------+
|Aalborg Storcenter indg. D|Hobrovej   |452          |9200   |57.005|9.876 |
|Hasseris                  |Hasserisvej|113          |9000   |57.044|9.898 |
|Bispensgade               |Bispensgade|35           |9800   |57.453|9.996 |
|HolbÃƒÂ¦k                 |Slotsvolden|7            |4300   |55.718|11.704|
|Bindslev                  |NÃƒÂ¸rrebro|18           |9881   |57.541|10.2  |
+--------------------------+-----------+-------------+-------+------+------+
only showing top 5 rows



In [20]:
# checking schema
dim_location.printSchema()

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)



In [21]:
# adding column for primary key
dim_location= dim_location.withColumn("new_column",lit("ABC"))

In [22]:
# creating primary key 
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_location= dim_location.withColumn("atm_location_id", row_number().over(w)).drop("new_column")

In [23]:
# checking schema
dim_location.printSchema()

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- atm_location_id: integer (nullable = true)



In [24]:
# Checking rows 
dim_location.show(2)

+--------------------+-----------+-------------+-------+------+-----+---------------+
|            location| streetname|street_number|zipcode|   lat|  lon|atm_location_id|
+--------------------+-----------+-------------+-------+------+-----+---------------+
|Aalborg Storcente...|   Hobrovej|          452|   9200|57.005|9.876|              1|
|            Hasseris|Hasserisvej|          113|   9000|57.044|9.898|              2|
+--------------------+-----------+-------------+-------+------+-----+---------------+
only showing top 2 rows



In [25]:
# Checking count
dim_location.count()

109

In [26]:
dim_location.filter(dim_location['atm_location_id'] > 106).collect()

[Row(location=u'Hj\xc3\u0192\xc2\xb8rring', streetname=u'\xc3\u0192\xcb\u0153stergade', street_number=8, zipcode=9800, lat=57.45899963378906, lon=9.98799991607666, atm_location_id=107),
 Row(location=u'Vodskov', streetname=u'Vodskovvej', street_number=27, zipcode=9310, lat=57.104000091552734, lon=10.027000427246094, atm_location_id=108),
 Row(location=u'Vadum', streetname=u'Ellehammersvej', street_number=43, zipcode=9430, lat=57.11800003051758, lon=9.861000061035156, atm_location_id=109)]

In [27]:
# Rearrange dim_location
location = dim_location.select("atm_location_id","location","streetname","street_number","zipcode","lat","lon")

In [28]:
# #Finding the count for validation hence verified 
location.count()

109

### Creating card_type dimension 

In [29]:
# creating card_type dimension
dim_card_type = input_df.select([input_df.card_type.alias("card_type")]).distinct()

In [30]:
 #checking count for validation hence verified
dim_card_type.count()

12

In [31]:
# creating primary key or card_type_id column
dim_card_type= dim_card_type.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_card_type= dim_card_type.withColumn("card_type_id", row_number().over(w)).drop("new_column")

In [32]:
 # Checking dataframe's top 5 rows
dim_card_type.show(5)

+------------------+------------+
|         card_type|card_type_id|
+------------------+------------+
|   Dankort - on-us|           1|
|            CIRRUS|           2|
|       HÃƒÂ¦vekort|           3|
|              VISA|           4|
|Mastercard - on-us|           5|
+------------------+------------+
only showing top 5 rows



In [33]:
# creating dim_card_type as card_type
card_type = dim_card_type.select("card_type_id","card_type")

In [34]:
#Finding the count for validation
card_type.count()

12

In [35]:
# checking schema
card_type.printSchema()

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)



### Craeting date dimension

In [36]:
# Creating date data fame
dim_date = input_df.select([input_df.year.alias("year"),input_df.month.alias("month"),input_df.day.alias("day"),input_df.hour.alias("hour"),input_df.weekday.alias("weekday")]).distinct()

In [37]:
#Finding the count for validation hence verified

dim_date.count()

8685

In [38]:
# Checking top 20 rows
dim_date.show()

+----+--------+---+----+--------+
|year|   month|day|hour| weekday|
+----+--------+---+----+--------+
|2017| January|  1|   9|  Sunday|
|2017| January|  3|   5| Tuesday|
|2017| January|  8|  19|  Sunday|
|2017| January| 21|   3|Saturday|
|2017| January| 23|  21|  Monday|
|2017|February|  2|  19|Thursday|
|2017|February|  5|  16|  Sunday|
|2017|February| 21|  15| Tuesday|
|2017|   March|  2|   8|Thursday|
|2017|   April|  2|   2|  Sunday|
|2017|   April|  6|   8|Thursday|
|2017|   April| 30|  10|  Sunday|
|2017|     May|  2|   2| Tuesday|
|2017|     May| 20|  16|Saturday|
|2017|     May| 21|  19|  Sunday|
|2017|    June| 27|   0| Tuesday|
|2017|    July| 18|   9| Tuesday|
|2017|    July| 18|  22| Tuesday|
|2017|    July| 20|   0|Thursday|
|2017|    July| 21|  19|  Friday|
+----+--------+---+----+--------+
only showing top 20 rows



In [39]:
from pyspark.sql.functions  import date_format
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date

In [40]:
# Creating New Month column with Integer value. 
dim_date=dim_date.withColumn('month_new', date_format(to_date(col('month'),'MMMMM'),'MM').cast(IntegerType()))


In [41]:
## adding new Month, day and hours columns with Zeroes 
dim_date=dim_date.withColumn('month_new', lpad(col('month_new'),2,'0')).withColumn('day_new', lpad(col('day'),2,'0')).withColumn('hour_new', lpad(col('hour'),2,'0'))


In [42]:
# Create a new column Full_Date_time by combining Year, new month, day, hour and "00" value to create timestamp in YYYYMMDDMI24HHMI format.
dim_date_final=dim_date.withColumn("full_date_time",concat(col('year'),col('month_new'),col('day_new'),col('hour_new'),lit('00')))

In [43]:
# # creating full_date by concating other columns
# dim_date_final = dim_date.withColumn("full_date",from_unixtime(unix_timestamp(concat(dim_date.year.cast(StringType()),dim_date.month.cast(StringType()),lpad(dim_date.day.cast(StringType()),2,'0'),lpad(dim_date.hour.cast(StringType()),2,'0')),'yyyyMMMMMddHH'),'YYYY-MM-DD HH:mm:SS'))

In [44]:
# checking newly created columns
dim_date_final.show(10)

+----+--------+---+----+--------+---------+-------+--------+--------------+
|year|   month|day|hour| weekday|month_new|day_new|hour_new|full_date_time|
+----+--------+---+----+--------+---------+-------+--------+--------------+
|2017| January|  1|   9|  Sunday|       01|     01|      09|  201701010900|
|2017| January|  3|   5| Tuesday|       01|     03|      05|  201701030500|
|2017| January|  8|  19|  Sunday|       01|     08|      19|  201701081900|
|2017| January| 21|   3|Saturday|       01|     21|      03|  201701210300|
|2017| January| 23|  21|  Monday|       01|     23|      21|  201701232100|
|2017|February|  2|  19|Thursday|       02|     02|      19|  201702021900|
|2017|February|  5|  16|  Sunday|       02|     05|      16|  201702051600|
|2017|February| 21|  15| Tuesday|       02|     21|      15|  201702211500|
|2017|   March|  2|   8|Thursday|       03|     02|      08|  201703020800|
|2017|   April|  2|   2|  Sunday|       04|     02|      02|  201704020200|
+----+------

In [45]:
#Finding the count for validation hence verified 

dim_date_final.count()

8685

In [46]:
# Creating primary key for the dimension with name date_id
dim_date_final= dim_date_final.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
dim_date_final= dim_date_final.withColumn("date_id", row_number().over(w)).drop("new_column")

In [47]:
# checking newly created column
dim_date_final.show()

+----+--------+---+----+--------+---------+-------+--------+--------------+-------+
|year|   month|day|hour| weekday|month_new|day_new|hour_new|full_date_time|date_id|
+----+--------+---+----+--------+---------+-------+--------+--------------+-------+
|2017| January|  1|   9|  Sunday|       01|     01|      09|  201701010900|      1|
|2017| January|  3|   5| Tuesday|       01|     03|      05|  201701030500|      2|
|2017| January|  8|  19|  Sunday|       01|     08|      19|  201701081900|      3|
|2017| January| 21|   3|Saturday|       01|     21|      03|  201701210300|      4|
|2017| January| 23|  21|  Monday|       01|     23|      21|  201701232100|      5|
|2017|February|  2|  19|Thursday|       02|     02|      19|  201702021900|      6|
|2017|February|  5|  16|  Sunday|       02|     05|      16|  201702051600|      7|
|2017|February| 21|  15| Tuesday|       02|     21|      15|  201702211500|      8|
|2017|   March|  2|   8|Thursday|       03|     02|      08|  201703020800| 

In [48]:
# creating data from dim_date_final
date = dim_date_final.select("date_id","full_date_time","year","month","day","hour","weekday")

In [49]:
#Finding the count for validation
date.count()

8685

### Creating Atm dimension

In [50]:
#creating new data fame with atm related columns
dim_atm = input_df.select([input_df.atm_id.alias("atm_number"),input_df.atm_manufacturer.alias("atm_manufacturer"),input_df.atm_lat.alias("lat"),input_df.atm_lon.alias("lon")])

In [51]:
# Checking top 3 rows
dim_atm.show(3)

+----------+----------------+------+------+
|atm_number|atm_manufacturer|   lat|   lon|
+----------+----------------+------+------+
|         1|             NCR|55.233|11.763|
|         2|             NCR|57.043|  9.95|
|         2|             NCR|57.043|  9.95|
+----------+----------------+------+------+
only showing top 3 rows



In [52]:
# checking count
dim_atm.count()

2468572

In [53]:
#To add atm_location_id of dim_location df as a foreign key to the atm table, adding left join to the atm table and locatino table.
dim_atm = dim_atm.join(location, on = ["lat","lon"],how = "leftouter")


In [54]:
# Taking distinct values to avoid repeated values 
atm_distinct =dim_atm.distinct()

In [55]:
# Checking dataframe 
atm_distinct.show(4)

+------+------+----------+----------------+---------------+--------------+----------------+-------------+-------+
|   lat|   lon|atm_number|atm_manufacturer|atm_location_id|      location|      streetname|street_number|zipcode|
+------+------+----------+----------------+---------------+--------------+----------------+-------------+-------+
|56.448| 9.401|        18| Diebold Nixdorf|             13|        Viborg|       Toldboden|            3|   8800|
|55.705| 9.532|       101|             NCR|              6|Bryggen  Vejle|SÃƒÂ¸nderbrogade|            2|   7100|
|56.716|10.114|         9| Diebold Nixdorf|             25|       Hadsund|       Storegade|           12|   9560|
|55.859| 9.854|        64|             NCR|             88|       Horsens| GrÃƒÂ¸nlandsvej|            5|   8700|
+------+------+----------+----------------+---------------+--------------+----------------+-------------+-------+
only showing top 4 rows



In [56]:
# Checking count for validation hence verified
atm_distinct.count()

156

In [57]:
#adding our primary key to the 156 sets of data
atm_distinct= atm_distinct.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
atm_distinct= atm_distinct.withColumn("atm_id", row_number().over(w)).drop("new_column")

In [58]:
# Checking newly created primary key 
atm_distinct.show(2)

+------+-----+----------+----------------+---------------+--------------+----------------+-------------+-------+------+
|   lat|  lon|atm_number|atm_manufacturer|atm_location_id|      location|      streetname|street_number|zipcode|atm_id|
+------+-----+----------+----------------+---------------+--------------+----------------+-------------+-------+------+
|56.448|9.401|        18| Diebold Nixdorf|             13|        Viborg|       Toldboden|            3|   8800|     1|
|55.705|9.532|       101|             NCR|              6|Bryggen  Vejle|SÃƒÂ¸nderbrogade|            2|   7100|     2|
+------+-----+----------+----------------+---------------+--------------+----------------+-------------+-------+------+
only showing top 2 rows



In [59]:
# creating atm from atm_distinct
atm = atm_distinct.select('atm_id','atm_number','atm_manufacturer','atm_location_id')

In [60]:
# Checking top rows of atm
atm.show(3)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|        18| Diebold Nixdorf|             13|
|     2|       101|             NCR|              6|
|     3|         9| Diebold Nixdorf|             25|
+------+----------+----------------+---------------+
only showing top 3 rows



In [61]:
#Rechecking the count for validation

atm.count()

156

###  Creating fact table


In [62]:
# Creating alias 
input_df = input_df.alias('input_df')
date = date.alias('date')
dim_card_type = dim_card_type.alias('dim_card_type')
dim_location = dim_location.alias('dim_location')
atm = atm.alias('atm')


#### - Creating fact table will take 4 steps by outer left joining the input table with dimension tables
#### - Dropping columns as required except primary keys of dimension table as they will act as foreign key

In [65]:
# Creating firts_df by left join of date dimension on input data frame and dropping columns 
first_df =input_df.join(date, on = ['year','month','day','hour','weekday'],how='left').select('input_df.*','date.date_id').drop(*['year','month','day','hour','weekday'])

In [66]:
#  Creating alias for first
first_df = first_df.alias("first_df")

In [67]:
# Checking count for first step for validation 
first_df.count()

2468572

#### Creating second_df

In [68]:
# Creating second_df by joining card_type dimension with first_df
second_df = first_df.join(dim_card_type, on = ['card_type'], how = 'left').select('first_df.*','dim_card_type.card_type_id').drop(*['card_type'])

In [69]:
# Checking count for validation at each step
second_df.count()

2468572

In [70]:
# Checking schema
second_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- ra

In [71]:
#  Creating alias
second_df = second_df.alias('second_df')

#### Creating third_df

In [72]:
# Creating third_df by joining location dimension with second_df by performing outer join
third_df = second_df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').join(dim_location, on = ['location','lat','lon','streetname','street_number','zipcode'],how = 'left').select('second_df.*','dim_location.atm_location_id').drop(*['location','lat','lon','streetname','street_number','zipcode'])

In [73]:
# checking count again
third_df.count()

2468572

In [74]:
# Creating alias
third_df= third_df.alias('third_df')

In [75]:
# Creating alias 
third_df.show(2)

+----------+------+----------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+---------------+
|atm_status|atm_id|atm_manufacturer|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|atm_location_id|
+----------+------+----------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+---------------+
|    Active|     6|             NCR|     DKK|              5622|Withdrawal|        nu

In [76]:
# Checking schema
third_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- car

In [77]:
# Renaming atm_id as atm_number as atm_id imported from input df
third_df = third_df.withColumnRenamed('atm_id',"atm_number")

In [78]:
# Checking schema
third_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |--

#### Creating fourth data frame 

In [79]:
# Craeting fourth_df by left joining of third with atm dimension
fourth_df= third_df.join(atm,on =['atm_number','atm_manufacturer','atm_location_id'],how ='left').select('third_df.*','atm.atm_id').drop(*['atm_manufacturer','atm_nummber'])

In [80]:
# Checking count for validation hence verifed 
fourth_df.count()

2468572

In [81]:
# Checking schema
fourth_df.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 

In [82]:
# top 3 rows 
fourth_df.show(3)

+---------------+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+------+
|atm_location_id|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|atm_id|
+---------------+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+------+
|             57|  Inactive|     DKK|              1764|Withdrawal|        null|        null|     57.165|     10.146|        2620275|   

In [83]:
# Rechecking count
fourth_df.count()

2468572

### Final step creating fact table fact_atm_trans 

In [84]:
# creating fact table from fourth_df
fact_atm_trans = fourth_df.alias("fact_atm_trans")

In [85]:
# Checking count and hence verified 
fact_atm_trans.count()

2468572

In [86]:
#adding our primary key to fact table
fact_atm_trans= fact_atm_trans.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
fact_atm_trans= fact_atm_trans.withColumn("trans_id", row_number().over(w)).drop("new_column")

In [87]:
# checking schema of fact table
fact_atm_trans.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 

In [88]:
 # dropping irrelevant columns as per schema 
fact_atm_trans = fact_atm_trans.drop('weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure','humidity','wind_speed','wind_deg')

In [89]:
# Renaming atm_location_id as weather_loaction_id as per schema 
fact_atm_trans = fact_atm_trans.withColumnRenamed("atm_location_id","weather_loc_id")

In [90]:
# Checking final schema
fact_atm_trans.printSchema()

root
 |-- weather_loc_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- trans_id: integer (nullable = true)



In [96]:
fact_atm_trans1 = fact_atm_trans.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [97]:
fact_atm_trans1 = fact_atm_trans1.alias('fact_atm_trans1')


In [98]:
fact_atm_trans1.show(2)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|    28|            57|   2559|           1|  Inactive|     DKK|Withdrawal|              1764|        null|        null|    0.0|        40|       802|      Clouds|    scattered clouds|
|       2|     4|            88|   3357|           8|    Active|     DKK|Withdrawal|               845|        null|        null|    0.0|        92|       300|     Drizzle|light intensity d...|
+--------+------+-------------

In [99]:
fact_atm_trans1.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



## Saving the tables to s3

### Saving fact table as csv

In [100]:
fact_atm_trans1.coalesce(1).write.save("s3a://saavnetl/fact-table",format='csv',header='false')

## dim tables

### 1.dim-atm

In [92]:
#dim-atm/
atm.coalesce(1).write.save("s3a://saavnetl/dim-atm",format='csv',header='false'),

(None,)

### 2 dim-card-type

In [93]:
### dim-card-type/
card_type.coalesce(1).write.save("s3a://saavnetl/dim-card-type",format='csv',header='false'),

(None,)

### 3 dim-date

In [94]:
#dim-date
date.coalesce(1).write.save("s3a://saavnetl/dim-date",format='csv',header='false'),

(None,)

### 4 dim-location

In [95]:
#dim-location
location.coalesce(1).write.save("s3a://saavnetl/dim-location",format='csv',header='false'),

(None,)