In [15]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SRC_ATM_TRANS').master("local").getOrCreate()
spark

In [18]:
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "your_access_key")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "your_secret_key")
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.us-east-1.amazonaws.com')

In [19]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, TimestampType
from pyspark.sql.functions import *

In [20]:
fileSchema = StructType([
    StructField('year', IntegerType(), True),
    StructField('month', StringType(), True),
    StructField('day', IntegerType(), True),
    StructField('weekday', StringType(), True),
    StructField('hour', IntegerType(), True),
    StructField('atm_status', StringType(), True),
    StructField('atm_id', IntegerType(), True),
    StructField('atm_manufacturer', StringType(), True),
    StructField('atm_location', StringType(), True),
    StructField('atm_streetname', StringType(), True),
    StructField('atm_street_number', IntegerType(), True),
    StructField('atm_zipcode', IntegerType(), True),
    StructField('atm_lat', DoubleType(), True),
    StructField('atm_lon', DoubleType(), True),
    StructField('currency', StringType(), True),
    StructField('card_type', StringType(), True),
    StructField('transaction_amount', IntegerType(), True),
    StructField('service', StringType(), True),
    StructField('message_code', StringType(), True),
    StructField('message_text', StringType(), True),
    StructField('weather_lat', DoubleType(), True),
    StructField('weather_lon', DoubleType(), True),
    StructField('weather_city_id', IntegerType(), True),
    StructField('weather_city_name', StringType(), True),
    StructField('temp', DoubleType(), True),
    StructField('pressure', IntegerType(), True),
    StructField('humidity', IntegerType(), True),
    StructField('wind_speed', IntegerType(), True),
    StructField('wind_deg', IntegerType(), True),
    StructField('rain_3h', DoubleType(), True),
    StructField('clouds_all', IntegerType(), True),
    StructField('weather_id', IntegerType(), True),
    StructField('weather_main', StringType(), True),
    StructField('weather_description', StringType(), True),    
    ])

In [21]:
df=spark.read.csv("hdfs://10.0.0.227:8020/user/root/SRC_ATM_TRANS_to_HDFS/part-m-00000", schema = fileSchema, header=False)
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

### **** Check count after importing data into a dataframe. ****

In [22]:
#Question: Check count after importing data into a dataframe
#Solution: After importing data into dataframe got the count of 2468572 records.

df.count()

2468572

### **** Check count for the Location Dimension. ****

In [23]:
#Question: Check count for the Location Dimension
#Solution: Count for Location Dimension is 109 records and is shown in the below result.

# DIM_LOCATION

df_loc1 = df.select('atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()
df_loc2 = df_loc1.rdd.zipWithIndex().toDF()
df_loc3 = df_loc2.select(col("_1.*"),col("_2").alias('location_id'))
df_loc = df_loc3.withColumnRenamed("atm_location","location").withColumnRenamed("atm_streetname","streetname").withColumnRenamed("atm_street_number","street_number").withColumnRenamed("atm_zipcode","zipcode").withColumnRenamed("atm_lat","lat").withColumnRenamed("atm_lon","lon")
df_loc.show()
df_loc.count()

+--------------------+----------------+-------------+-------+------+------+-----------+
|            location|      streetname|street_number|zipcode|   lat|   lon|location_id|
+--------------------+----------------+-------------+-------+------+------+-----------+
|               Vadum|  Ellehammersvej|           43|   9430|57.118| 9.861|          0|
|            Slagelse| Mariendals Alle|           29|   4200|55.398|11.342|          1|
|          Fredericia|SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|          2|
|             Kolding|        Vejlevej|          135|   6000|55.505| 9.457|          3|
|   HÃƒÂ¸rning Hallen|        Toftevej|           53|   8362|56.091|10.033|          4|
|                Aars| Himmerlandsgade|           70|   9600|56.803| 9.518|          5|
|     Aarhus Lufthavn| Ny Lufthavnsvej|           24|   8560|56.308|10.627|          6|
|                 Fur|      StenÃƒÂ¸re|           19|   7884|56.805|  9.02|          7|
|            Hasseris|     Hasse

109

### **** Check count for the ATM Dimension. ****

In [24]:
#Question: Check count for the ATM Dimension
#Solution: Count for ATM Dimension is 156 records and is shown in the below table.

# DIM_ATM

left_join = df.join(df_loc, [(df_loc.lat == df.atm_lat) & (df_loc.lon == df.atm_lon)], how='left').select('atm_id','atm_manufacturer', 'location_id')
df_atm1 = left_join.withColumnRenamed("atm_id", "atm_number").withColumnRenamed("location_id", "atm_location_id").distinct()
df_atm2 = df_atm1.rdd.zipWithIndex().toDF()
df_atm = df_atm2.select(col("_1.*"),col("_2").alias('atm_id'))
df_atm.show()
df_atm.count()

+----------+----------------+---------------+------+
|atm_number|atm_manufacturer|atm_location_id|atm_id|
+----------+----------------+---------------+------+
|       100|             NCR|             69|     0|
|        73|             NCR|             84|     1|
|        11|             NCR|             76|     2|
|        97|             NCR|             86|     3|
|        13|             NCR|             13|     4|
|        20|             NCR|             95|     5|
|        64|             NCR|             61|     6|
|       109| Diebold Nixdorf|             34|     7|
|       113| Diebold Nixdorf|              1|     8|
|        88|             NCR|             92|     9|
|        34|             NCR|             42|    10|
|        97|             NCR|             45|    11|
|        63|             NCR|             77|    12|
|        14|             NCR|             94|    13|
|        88|             NCR|             65|    14|
|        17|             NCR|             47| 

156

### **** Check count for the Date Dimension. ****

In [25]:
#Question : Check count for the Date Dimension
#Solution : Count for Date Dimension is 8685 records and is shown in the below table.

# DIM_DATE

df_date1 = df.select('year','month','day','hour','weekday').distinct()
df_date2 = df_date1.rdd.zipWithIndex().toDF()
df_date = df_date2.select(col("_1.*"),col("_2").alias('date_id'))
df_date.show()
df_date.count()

+----+--------+---+----+--------+-------+
|year|   month|day|hour| weekday|date_id|
+----+--------+---+----+--------+-------+
|2017| January|  1|   9|  Sunday|      0|
|2017| January|  3|   5| Tuesday|      1|
|2017| January|  8|  19|  Sunday|      2|
|2017| January| 21|   3|Saturday|      3|
|2017| January| 23|  21|  Monday|      4|
|2017|February|  2|  19|Thursday|      5|
|2017|February|  5|  16|  Sunday|      6|
|2017|February| 21|  15| Tuesday|      7|
|2017|   March|  2|   8|Thursday|      8|
|2017|   April|  2|   2|  Sunday|      9|
|2017|   April|  6|   8|Thursday|     10|
|2017|   April| 30|  10|  Sunday|     11|
|2017|     May|  2|   2| Tuesday|     12|
|2017|     May| 20|  16|Saturday|     13|
|2017|     May| 21|  19|  Sunday|     14|
|2017|    June| 27|   0| Tuesday|     15|
|2017|    July| 18|   9| Tuesday|     16|
|2017|    July| 18|  22| Tuesday|     17|
|2017|    July| 20|   0|Thursday|     18|
|2017|    July| 21|  19|  Friday|     19|
+----+--------+---+----+--------+-

8685

### **** Check count for the Card Type Dimension. ****

In [26]:
#Question : Check count for the Card Type Dimension
#Solution : COunt for Card type dimension is 12 records and is shown in the below table.

# DIM_CARD_TYPE

df_ct1 = df.select('card_type').distinct()
df_ct2 = df_ct1.rdd.zipWithIndex().toDF()
df_ct = df_ct2.select(col("_1.*"),col("_2").alias('card_type_id'))
df_ct.show()
df_ct.count()

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           0|
|              CIRRUS|           1|
|         HÃƒÂ¦vekort|           2|
|                VISA|           3|
|  Mastercard - on-us|           4|
|             Maestro|           5|
|Visa Dankort - on-us|           6|
|        Visa Dankort|           7|
|            VisaPlus|           8|
|          MasterCard|           9|
|             Dankort|          10|
| HÃƒÂ¦vekort - on-us|          11|
+--------------------+------------+



12

### **** Check count for the all the Stages in the creation of Transaction Fact table. ****

In [27]:
#Question : Check count for the all the Stages in the creation of Transaction Fact table
#Solution : Count for Transaction fact tables is 2468572 and is shown in the below table.

# FACT_ATM_TRANS 

left_join_fact1 = df.join(df_loc, [(df_loc.location == df.atm_location) & (df_loc.streetname == df.atm_streetname) & (df_loc.street_number == df.atm_street_number) & (df_loc.zipcode == df.atm_zipcode) & (df_loc.lat == df.atm_lat) & (df_loc.lon == df.atm_lon)], how='left')
left_join_fact2 = left_join_fact1.join(df_date, [(df_date.year == df.year) & (df_date.month == df.month) & (df_date.day == df.day) & (df_date.weekday == df.weekday) & (df_date.hour == df.hour)], how='left')
left_join_fact3 = left_join_fact2.join(df_ct, [(df_ct.card_type == df.card_type)], how='left')
left_join_fact4 = left_join_fact3.join(df_atm, [(df_atm.atm_number == df.atm_id) & (df_atm.atm_manufacturer == df.atm_manufacturer) & (df_atm.atm_location_id == df_loc.location_id)], how='left')
left_join_fact5 = left_join_fact4.select('atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description','atm_location_id','date_id','card_type_id',df_atm.atm_id)
left_join_fact6 = left_join_fact5.withColumnRenamed("atm_location_id", "weather_loc_id")

df_fact2 = left_join_fact6.rdd.zipWithIndex().toDF(sampleRatio=0.01)
df_fact = df_fact2.select(col("_1.*"),col("_2").alias('trans_id'))
df_fact.show()
df_fact.count()

+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------------+-------+------------+------+--------+
|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|weather_loc_id|date_id|card_type_id|atm_id|trans_id|
+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------------+-------+------------+------+--------+
|    Active|     DKK|Withdrawal|              2041|        null|        null|    0.0|        44|       802|      Clouds|   scattered clouds|            23|   2235|           0|   143|       0|
|    Active|     DKK|Withdrawal|              7831|        null|        null|    0.0|        44|       802|      Clouds|   scattered clouds|            23|   2235|           0|   143|       1|
|    Active|     DKK|Withdrawal|   

2468572

In [35]:
df_ct.write.mode("overwrite").option("header","false").csv("s3a://my-redshift-bucket-24/card_type")

In [36]:
df_loc.write.mode("overwrite").option("header","false").csv("s3a://my-redshift-bucket-24/location")

In [37]:
df_atm.write.mode("overwrite").option("header","false").csv("s3a://my-redshift-bucket-24/atm")

In [38]:
df_date.write.mode("overwrite").option("header","false").csv("s3a://my-redshift-bucket-24/date")

In [28]:
df_fact.write.mode("overwrite").option("header","false").csv("s3a://my-redshift-bucket-24/trans")