In [39]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [42]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType , TimestampType

In [43]:
from pyspark import SparkContext, SparkConf


In [44]:
spark = SparkSession.builder.appName('demo').master("local").enableHiveSupport().getOrCreate()
spark

In [45]:
transformation_schema1 = StructType([StructField('year',IntegerType(),True),
                        StructField('month',StringType(),True),
                        StructField('day',IntegerType(),True),
                        StructField('weekday',StringType(),True),
                        StructField('hour',IntegerType(),True),
                        StructField('atm_status',StringType(),True),
                        StructField('atm_id',StringType(),True),
                        StructField('atm_manufacturer',StringType(),True),
                        StructField('atm_location',StringType(),True),
                        StructField('atm_streetname',StringType(),True),
                        StructField('atm_street_number',IntegerType(),True),
                        StructField('atm_zipcode',IntegerType(),True),
                        StructField('atm_lat',DoubleType(),True),
                        StructField('atm_lon',DoubleType(),True),
                        StructField('currency',StringType(),True),
                        StructField('card_type',StringType(),True),
                        StructField('transaction_amount',IntegerType(),True), 
                        StructField('service',StringType(),True),
                        StructField('message_code',StringType(),True),
                        StructField('message_text',StringType(),True),
                        StructField('weather_lat',DoubleType(),True),
                        StructField('weather_lon',DoubleType(),True),
                        StructField('weather_city_id',IntegerType(),True),
                        StructField('weather_city_name',StringType(),True), 
                        StructField('temp',DoubleType(),True),
                        StructField('pressure',IntegerType(),True),
                        StructField('humidity',IntegerType(),True),
                        StructField('wind_speed',IntegerType(),True),
                        StructField('wind_deg',IntegerType(),True),
                        StructField('rain_3h',DoubleType(),True),
                        StructField('clouds_all',IntegerType(),True),
                        StructField('weather_id',IntegerType(),True),
                        StructField('weather_main',StringType(),True),
                        StructField('weather_description',StringType(),True)])

In [46]:
df=spark.read.format("csv").option("quote", "").option("ignoreLeadingWhiteSpace","true").load("/user/root/SRC_ATM_TRANS/part-m-00000",schema=transformation_schema1)

In [47]:
df

DataFrame[year: int, month: string, day: int, weekday: string, hour: int, atm_status: string, atm_id: string, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, currency: string, card_type: string, transaction_amount: int, service: string, message_code: string, message_text: string, weather_lat: double, weather_lon: double, weather_city_id: int, weather_city_name: string, temp: double, pressure: int, humidity: int, wind_speed: int, wind_deg: int, rain_3h: double, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

In [48]:
df.show()


+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------

In [49]:
df.count()

2468572

In [50]:
df.distinct()

DataFrame[year: int, month: string, day: int, weekday: string, hour: int, atm_status: string, atm_id: string, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, currency: string, card_type: string, transaction_amount: int, service: string, message_code: string, message_text: string, weather_lat: double, weather_lon: double, weather_city_id: int, weather_city_name: string, temp: double, pressure: int, humidity: int, wind_speed: int, wind_deg: int, rain_3h: double, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

In [51]:
# Create dimen_location df
dimen_location = df[df['atm_location'],df['atm_streetname'],df['atm_street_number'],df['atm_zipcode'],df['atm_lat'],df['atm_lon']]
dimen_location = dimen_location.withColumnRenamed('atm_location', 'location')\
.withColumnRenamed('atm_streetname', 'streetname').withColumnRenamed('atm_street_number', 'street_number')\
.withColumnRenamed('atm_zipcode', 'zipcode').withColumnRenamed('atm_lat', 'lat').withColumnRenamed('atm_lon', 'lon')

In [52]:
#drop the duplicates
from pyspark.sql.functions import monotonically_increasing_id
dimen_location = dimen_location.dropDuplicates(['location', 'streetname', 'street_number', 'zipcode'])
dimen_location = dimen_location.withColumn('location_id', monotonically_increasing_id())

In [53]:
dimen_location.head()

Row(location=u'Horsens', streetname=u'Gr\xc3\u0192\xc2\xb8nlandsvej', street_number=5, zipcode=8700, lat=55.859, lon=9.854, location_id=0)

In [54]:
#check the count of records
dimen_location.count()

109

In [55]:
# Create dimen_card_type df
dimen_card_type = df[['card_type']]
dimen_card_type = dimen_card_type.dropDuplicates()

In [56]:
#adding the primary key
from pyspark.sql.functions import monotonically_increasing_id
dimen_card_type = dimen_card_type.withColumn('card_type_id',monotonically_increasing_id())

In [57]:
dimen_card_type.head()

Row(card_type=u'Dankort - on-us', card_type_id=17179869184)

In [58]:
#check the count for dimen_card_type
dimen_card_type.count()

12

In [59]:
#create dimen_atm df
atm_temp = df[df['atm_id'],df['atm_manufacturer']]
cond = [df.atm_lat == dimen_location.lat, df.atm_lon == dimen_location.lon]
atm_temp = df.join(dimen_location, cond)
dimen_atm = atm_temp[atm_temp['atm_id'],atm_temp['atm_manufacturer'], atm_temp['location_id']]
dimen_atm = dimen_atm.withColumn('atm_number', dimen_atm['atm_id'])
dimen_atm = dimen_atm.withColumnRenamed('location_id', 'atm_location_id')

In [60]:
dimen_atm = dimen_atm.drop_duplicates()

In [61]:
#check count for dimen_atm
dimen_atm.count()

156

In [62]:
dimen_atm.head()

Row(atm_id=u'104', atm_manufacturer=u'NCR', atm_location_id=309237645312, atm_number=u'104')

In [63]:
#create dimen_date df
from pyspark.sql.functions import *
dimen_date = df[df['year'], df['month'], df['day'], df['hour'], df['weekday']]
dimen_date = dimen_date.dropDuplicates()
dimen_date = dimen_date.withColumn('full_date_time',to_date(concat_ws('-', df.year, df.month,df.day)))
dimen_date = dimen_date.withColumn('date_id', monotonically_increasing_id())

In [64]:
dimen_date.count()

8685

In [65]:
dimen_date.head()

Row(year=2017, month=u'January', day=1, hour=9, weekday=u'Sunday', full_date_time=None, date_id=0)

In [66]:
# Create FACT_ATM_TRANS
fact_atm_trans = df
fact_atm_trans = fact_atm_trans.withColumn('trans_id', monotonically_increasing_id())

In [67]:
# Join with Dimen_location
dim_cond = [df.atm_lat == dimen_location.lat, df.atm_lon == dimen_location.lon]
fact_atm_trans = fact_atm_trans.join(dimen_location, dim_cond, 'left')


In [68]:
# Rename location_id to weather_loc_id
fact_atm_trans = fact_atm_trans.withColumnRenamed('location_id', 'weather_loc_id')

In [69]:
fact_atm_trans = fact_atm_trans.dropDuplicates(['trans_id'])

In [70]:
# Join with Dimen_atm
fact_atm_trans = fact_atm_trans.join(dimen_atm[['atm_number', 'atm_manufacturer', 'atm_location_id']], fact_atm_trans.atm_id == dimen_atm.atm_number, 'left')

In [71]:
fact_atm_trans = fact_atm_trans.dropDuplicates(['trans_id'])

In [72]:
# Join with dimen_date
date_cond = [fact_atm_trans.year == dimen_date.year, fact_atm_trans.month == dimen_date.month, \
            fact_atm_trans.day == dimen_date.day, fact_atm_trans.hour == dimen_date.hour, \
            fact_atm_trans.weekday == dimen_date.weekday]
fact_atm_trans = fact_atm_trans.join(dimen_date, date_cond, 'left')

In [73]:
fact_atm_trans = fact_atm_trans.dropDuplicates(['trans_id'])

In [74]:
# Join with Dimen_card_type
fact_atm_trans = fact_atm_trans.join(dimen_card_type, fact_atm_trans.card_type == dimen_card_type.card_type, 'left')

In [75]:
fact_atm_trans = fact_atm_trans.dropDuplicates(['trans_id'])

In [76]:
# Remove unnecessary columns
fact_atm_trans_temp = fact_atm_trans[['trans_id','atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', \
                                'currency', 'service', 'transaction_amount', 'message_code', 'message_text', \
                                'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']]

In [77]:
fact_atm_trans_temp.head(5)

[Row(trans_id=26, atm_id=u'20', weather_loc_id=841813590016, date_id=1649267441664, card_type_id=1056561954816, atm_status=u'Active', currency=u'DKK', service=u'Withdrawal', transaction_amount=7573, message_code=None, message_text=None, rain_3h=1.26, clouds_all=92, weather_id=500, weather_main=u'Rain', weather_description=u'light rain'),
 Row(trans_id=29, atm_id=u'21', weather_loc_id=309237645312, date_id=1649267441664, card_type_id=326417514496, atm_status=u'Active', currency=u'DKK', service=u'Withdrawal', transaction_amount=4607, message_code=None, message_text=None, rain_3h=0.59, clouds_all=92, weather_id=500, weather_main=u'Rain', weather_description=u'light rain'),
 Row(trans_id=474, atm_id=u'27', weather_loc_id=609885356032, date_id=1494648619008, card_type_id=566935683072, atm_status=u'Active', currency=u'DKK', service=u'Withdrawal', transaction_amount=3124, message_code=None, message_text=None, rain_3h=0.0, clouds_all=75, weather_id=301, weather_main=u'Drizzle', weather_descrip

# LOADING TABLES TO S3

In [96]:
#loading dimen_location to S3
dimen_location.write.csv("s3a://malus.bucket/dimen_location", sep="|",mode='overwrite')

In [97]:
#loading dimen_card_type to S3
dimen_card_type.write.csv("s3a://malus.bucket/dimen_card_type", sep="|",mode='overwrite')

In [99]:
#loading dimen_atm to S3
dimen_atm.write.csv("s3a://malus.bucket/dimen_atm", sep="|",mode='overwrite')

In [100]:
#loading dimen_date to S3
dimen_date.write.csv("s3a://malus.bucket/dimen_date", sep="|",mode='overwrite')

In [101]:
#loading fact_atm_trans table to S3
fact_atm_trans_temp.write.csv("s3a://malus.bucket/fact_atm_trans", sep="|",mode='overwrite')