In [1]:
import os
import sys 

os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] +"/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
# conf = SparkConf().setAppName('atm_trans').setMaster('local')
# sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('atm_trans').master('local').getOrCreate()
spark

# Loading Data

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

In [5]:
table_schema = StructType(
    [
      StructField("year", IntegerType(), True),
      StructField("month", StringType(), True),
      StructField("day", IntegerType(), True),
      StructField("weekday", StringType(), True),
      StructField("hour", IntegerType(), True),
      StructField("atm_status", StringType(), True),
      StructField("atm_id", IntegerType(), True),
      StructField("atm_manufacturer", StringType(), True),
      StructField("atm_location", StringType(), True),
      StructField("atm_streetname", StringType(), True),
      StructField("atm_street_number", IntegerType(), True),
      StructField("atm_zipcode", IntegerType(), True),
      StructField("atm_lat",FloatType(), True),
      StructField("atm_lon", FloatType(), True),
      StructField("currency", StringType(), True),
      StructField("card_type", StringType(), True),
      StructField("transaction_amount", IntegerType(), True),
      StructField("service", StringType(), True),
      StructField("message_code", StringType(), True),
      StructField("message_text", StringType(), True),
      StructField("weather_lat", FloatType(), True),
      StructField("weather_lon", FloatType(), True),
      StructField("weather_city_id", StringType(), True),
      StructField("weather_city_name", StringType(), True),
      StructField("temp", FloatType(), True),
      StructField("pressure", IntegerType(), True),
      StructField("humidity", IntegerType(), True),
      StructField("wind_speed", IntegerType(), True),
      StructField("wind_deg", IntegerType(), True),
      StructField("rain_3h", FloatType(), True),
      StructField("clouds_all", IntegerType(), True),
      StructField("weather_id", IntegerType(), True),
      StructField("weather_main", StringType(), True),
      StructField("weather_description", StringType(), True),
     ]
 )

In [6]:
polished_df = spark.read.csv('hdfs:///user/root/atm_trans', schema=table_schema)

In [7]:
polished_df.where('weather_lon is null').show()

+----+-------+---+-------+----+----------+------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+--------------------+-----------+-----------+---------------+-----------------+----+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|        atm_location|   atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|        message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+--------------------+-----------------+-----------------+-----------+-------+-------+--------+--------------------

In [8]:
polished_df.show(10)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|         card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+------------------+------------------+----------+---

In [9]:
polished_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: string (nullable = true)
 |-- weather_city_name: string

In [10]:
polished_df.cache()

DataFrame[year: int, month: string, day: int, weekday: string, hour: int, atm_status: string, atm_id: int, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: float, atm_lon: float, currency: string, card_type: string, transaction_amount: int, service: string, message_code: string, message_text: string, weather_lat: float, weather_lon: float, weather_city_id: string, weather_city_name: string, temp: float, pressure: int, humidity: int, wind_speed: int, wind_deg: int, rain_3h: float, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

# Create dimension schema

In [11]:
polished_df.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------

In [12]:
polished_df.select(['year', 'month', 'day', 'weekday', 'hour']).distinct().count()

8685

In [13]:
polished_df.select(['atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']).distinct().count()

109

In [14]:
polished_df.select(['card_type']).distinct().count()

12

In [15]:
polished_df.select(['atm_manufacturer', 'atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'atm_id']).distinct().count()

113

In [16]:
polished_df.count()

2468572

Create date dimension

In [17]:
dim_date = polished_df.select(['year', 'month', 'day', 'weekday', 'hour']).distinct()

In [18]:
dim_date.show(10)

+----+--------+---+--------+----+
|year|   month|day| weekday|hour|
+----+--------+---+--------+----+
|2017| January|  1|  Sunday|   9|
|2017| January|  3| Tuesday|   5|
|2017| January|  8|  Sunday|  19|
|2017| January| 21|Saturday|   3|
|2017| January| 23|  Monday|  21|
|2017|February|  2|Thursday|  19|
|2017|February|  5|  Sunday|  16|
|2017|February| 21| Tuesday|  15|
|2017|   March|  2|Thursday|   8|
|2017|   April|  2|  Sunday|   2|
+----+--------+---+--------+----+
only showing top 10 rows



In [19]:
from datetime import datetime

# add full date
merge_date = udf(lambda day, month, year, hour: "{}/{}/{} {}".format(day, datetime.strptime(month, '%B').month, year, hour))

dim_date = dim_date.withColumn('full_date', merge_date('day', 'month', 'year', 'hour'))
dim_date = dim_date.withColumn('full_date', to_timestamp("full_date", format="d/M/yyyy H")).orderBy('full_date')

In [20]:
increasing_id = Window.orderBy(monotonically_increasing_id())
dim_date = dim_date.withColumn('date_id', row_number().over(increasing_id))
dim_date = dim_date.select([dim_date.columns[-1]] + dim_date.columns[:-1])

In [21]:
dim_date.show()

+-------+----+-------+---+-------+----+-------------------+
|date_id|year|  month|day|weekday|hour|          full_date|
+-------+----+-------+---+-------+----+-------------------+
|      1|2017|January|  1| Sunday|   0|2017-01-01 00:00:00|
|      2|2017|January|  1| Sunday|   1|2017-01-01 01:00:00|
|      3|2017|January|  1| Sunday|   2|2017-01-01 02:00:00|
|      4|2017|January|  1| Sunday|   3|2017-01-01 03:00:00|
|      5|2017|January|  1| Sunday|   4|2017-01-01 04:00:00|
|      6|2017|January|  1| Sunday|   5|2017-01-01 05:00:00|
|      7|2017|January|  1| Sunday|   6|2017-01-01 06:00:00|
|      8|2017|January|  1| Sunday|   7|2017-01-01 07:00:00|
|      9|2017|January|  1| Sunday|   8|2017-01-01 08:00:00|
|     10|2017|January|  1| Sunday|   9|2017-01-01 09:00:00|
|     11|2017|January|  1| Sunday|  10|2017-01-01 10:00:00|
|     12|2017|January|  1| Sunday|  11|2017-01-01 11:00:00|
|     13|2017|January|  1| Sunday|  12|2017-01-01 12:00:00|
|     14|2017|January|  1| Sunday|  13|2

In [22]:
dim_date.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- full_date: timestamp (nullable = true)



Create location dimension

In [23]:
dim_location = polished_df.select(['atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']).distinct()

In [24]:
dim_location.show()

+--------------------+--------------------+-----------------+-----------+-------+-------+
|        atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+--------------------+--------------------+-----------------+-----------+-------+-------+
|         SÃƒÂ¦by Syd|Trafikcenter SÃƒÂ...|                1|       9300| 57.313|  10.45|
|         GlyngÃƒÂ¸re|         FÃƒÂ¦rgevej|                1|       7870| 56.762|  8.867|
|      Skelagervej 15|         Skelagervej|               15|       9000| 57.023|  9.891|
|               Durup|              Torvet|                4|       7870| 56.745|  8.949|
|           Svendborg|  Sankt Nicolai Gade|                1|       5700| 55.058| 10.609|
|             Herning|          Dalgasgade|               30|       7400| 56.135|  8.971|
|            Roskilde|    KÃƒÂ¸benhavnsvej|               65|       4000| 55.642| 12.106|
|Aalborg Storcente...|            Hobrovej|              452|       9200| 57.005|  9.876|
|         

In [25]:
# add sequential id to the locaiton dimension
# rename column names
increasing_id = Window.orderBy(monotonically_increasing_id())
dim_location = dim_location.withColumn('location_id', row_number().over(increasing_id))
dim_location = dim_location.select([dim_location.columns[-1]] + dim_location.columns[:-1])
dim_location = dim_location.withColumnRenamed('atm_location', 'location') \
  .withColumnRenamed('atm_streetname', 'streetname') \
  .withColumnRenamed('atm_street_number', 'street_number') \
  .withColumnRenamed('atm_zipcode', 'zipcode') \
  .withColumnRenamed('atm_lat', 'lat') \
  .withColumnRenamed('atm_lon', 'lon')

In [26]:
dim_location.show()

+-----------+--------------------+--------------------+-------------+-------+------+------+
|location_id|            location|          streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+--------------------+-------------+-------+------+------+
|          1|         SÃƒÂ¦by Syd|Trafikcenter SÃƒÂ...|            1|   9300|57.313| 10.45|
|          2|         GlyngÃƒÂ¸re|         FÃƒÂ¦rgevej|            1|   7870|56.762| 8.867|
|          3|      Skelagervej 15|         Skelagervej|           15|   9000|57.023| 9.891|
|          4|               Durup|              Torvet|            4|   7870|56.745| 8.949|
|          5|           Svendborg|  Sankt Nicolai Gade|            1|   5700|55.058|10.609|
|          6|             Herning|          Dalgasgade|           30|   7400|56.135| 8.971|
|          7|            Roskilde|    KÃƒÂ¸benhavnsvej|           65|   4000|55.642|12.106|
|          8|Aalborg Storcente...|            Hobrovej|          452|   9200|57.

In [27]:
dim_location.count()

109

Create ATM dimension

In [28]:
id_and_location = list(dim_location.select(['location_id', 'location']).collect())
id_location_dict = dict()

for location_id, location in id_and_location:
  id_location_dict[location] = location_id

In [29]:
add_location_id = udf(lambda location: id_location_dict[location])

dim_atm = polished_df.select(['atm_id', 'atm_manufacturer', 'atm_location']).withColumn('location_id', add_location_id(polished_df.atm_location)).distinct()
dim_atm = dim_atm.drop('atm_location').orderBy('atm_id')

In [30]:
dim_atm = dim_atm.withColumn('atm_number', dim_atm['atm_id'])
dim_atm = dim_atm.select([dim_atm.columns[0]] + [dim_atm.columns[-1]] + [dim_atm.columns[1]] + [dim_atm.columns[2]])
dim_atm = dim_atm.withColumn('atm_number', col('atm_number').cast(StringType()))

In [31]:
dim_atm.show()

+------+----------+----------------+-----------+
|atm_id|atm_number|atm_manufacturer|location_id|
+------+----------+----------------+-----------+
|     1|         1|             NCR|         81|
|     2|         2|             NCR|         38|
|     3|         3|             NCR|         84|
|     4|         4|             NCR|         18|
|     5|         5|             NCR|         35|
|     6|         6|             NCR|        105|
|     7|         7| Diebold Nixdorf|         16|
|     8|         8|             NCR|          2|
|     9|         9| Diebold Nixdorf|         94|
|    10|        10|             NCR|         46|
|    11|        11|             NCR|         23|
|    12|        12|             NCR|         13|
|    13|        13|             NCR|         88|
|    14|        14|             NCR|         47|
|    15|        15|             NCR|         83|
|    16|        16|             NCR|         58|
|    17|        17|             NCR|        107|
|    18|        18| 

In [32]:
dim_atm.printSchema()

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: string (nullable = true)



In [33]:
dim_atm.select('atm_number').count()

113

Create card type dimension

In [34]:
dim_card_type = polished_df.select(['card_type']).distinct()
increasing_id = Window.orderBy(monotonically_increasing_id())
dim_card_type = dim_card_type.withColumn('card_type_id', row_number().over(increasing_id))
dim_card_type = dim_card_type.select([dim_card_type.columns[-1]] + dim_card_type.columns[:-1])

In [35]:
dim_card_type.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [36]:
dim_card_type.printSchema()

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)



Create transaction fact table

In [37]:
transaction_fact = polished_df

In [38]:
transaction_fact.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------

In [39]:
# modify card type column

# get the pairs 
card_type_list = list(dim_card_type.select(['card_type_id', 'card_type']).collect())
card_type_dict = dict()

for card in card_type_list:
  card_type_dict[card.card_type] = card.card_type_id

get_card_type_id = udf(lambda card: card_type_dict[card])
transaction_fact = transaction_fact.withColumn('card_type', get_card_type_id('card_type')) \
  .withColumnRenamed('card_type', 'card_type_id')

In [40]:
# modify weather city name
location_list = list(dim_location.select(['location_id', 'location']).collect())
location_dict = dict()

for loc in location_list:
  location_dict[loc.location] = loc.location_id

get_weather_location_id = udf(lambda location: location_dict[location] if location in location_dict.keys() else None)
transaction_fact = transaction_fact.withColumn('weather_city_name', get_weather_location_id('weather_city_name')) \
  .withColumnRenamed('weather_city_name', 'weather_loc_id')

In [41]:
transaction_fact.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+------------+------------------+----------+------------+------------+-----------+-----------+---------------+--------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|card_type_id|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_loc_id|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+------------+------------------+----------+------

In [42]:
get_date_id = list(dim_date.select(['date_id', 'full_date']).collect())
date_id_dict = dict()

for date in get_date_id:
  date_id_dict[date.full_date] = date.date_id

In [43]:
transaction_merge_date = udf(lambda day, month, year, hour: "{}/{}/{} {}".format(day, datetime.strptime(month, '%B').month, year, hour))
set_date_id = udf(lambda full_date: date_id_dict[full_date])

transaction_fact = transaction_fact.withColumn('full_date', transaction_merge_date('day', 'month', 'year', 'hour')) \
  .withColumn('full_date', to_timestamp("full_date", format="d/M/yyyy H")) \
  .drop('year', 'month', 'day', 'weekday', 'hour') \
  .withColumn('full_date', set_date_id('full_date')) \
  .withColumnRenamed('full_date', 'date_id')

transaction_fact = transaction_fact.select([transaction_fact.columns[-1]] + transaction_fact.columns[:-1])

In [44]:
transaction_fact.show()

+-------+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+------------+------------------+----------+------------+------------+-----------+-----------+---------------+--------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|date_id|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|card_type_id|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_loc_id|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+-------+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+------------+------------------+----------+------------+------------+-----------+-----------+---------------+------

In [45]:
# 'atm_location', 'atm_streetname', 'atm_manufacturer', 'atm_zipcode', 'atm_street_number', 'atm_lat', 'atm_lon', 'weather_lat', 'weather_lon', 'weather_city_id, 'pressure'
transaction_fact = transaction_fact.drop('atm_location', 'atm_streetname', 'atm_manufacturer', 'atm_zipcode', 'atm_street_number', 'atm_lat', 'atm_lon', 'weather_lat', 'weather_lon', 'weather_city_id', 'temp', 'humidity', 'wind_speed', 'wind_deg', 'pressure')

In [46]:
increasing_id = Window.orderBy(monotonically_increasing_id())
transaction_fact = transaction_fact.withColumn('trans_id', row_number().over(increasing_id))
transaction_fact = transaction_fact.select([transaction_fact.columns[-1]] + transaction_fact.columns[:-1])
transaction_fact = transaction_fact.withColumn('trans_id', col('trans_id').cast(LongType()))

In [47]:
transaction_fact.show()

+--------+-------+----------+------+--------+------------+------------------+----------+------------+------------+--------------+-------+----------+----------+------------+--------------------+
|trans_id|date_id|atm_status|atm_id|currency|card_type_id|transaction_amount|   service|message_code|message_text|weather_loc_id|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+-------+----------+------+--------+------------+------------------+----------+------------+------------+--------------+-------+----------+----------+------------+--------------------+
|       1|      1|    Active|     1|     DKK|          10|              5643|Withdrawal|        null|        null|          null|  0.215|        92|       500|        Rain|          light rain|
|       2|      1|  Inactive|     2|     DKK|          10|              1764|Withdrawal|        null|        null|            46|   0.59|        92|       500|        Rain|          light rain|
|       3|      1|  Inactive| 

In [48]:
transaction_fact.printSchema()

root
 |-- trans_id: long (nullable = true)
 |-- date_id: string (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type_id: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_loc_id: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



# Export dimension tables

In [49]:
dim_date.write.format('csv').save('s3a://atmtransdata/atmtrans_v2/dim_date.csv', mode='overwrite')

In [50]:
dim_location.write.format('csv').save('s3a://atmtransdata/atmtrans_v2/dim_location.csv', mode='overwrite')

In [51]:
dim_card_type.write.format('csv').save('s3a://atmtransdata/atmtrans_v2/dim_card_type.csv', mode='overwrite')

In [52]:
dim_atm.write.format('csv').save('s3a://atmtransdata/atmtrans_v2/dim_atm.csv', mode='overwrite')

In [53]:
transaction_fact.write.format('csv').save('s3a://atmtransdata/atmtrans_v2/transaction_fact.csv', mode='overwrite')

# Conclusion of the dimensions

In [54]:
dim_date.columns

['date_id', 'year', 'month', 'day', 'weekday', 'hour', 'full_date']

In [55]:
dim_location.columns

['location_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon']

In [56]:
dim_card_type.columns

['card_type_id', 'card_type']

In [57]:
dim_atm.columns

['atm_id', 'atm_number', 'atm_manufacturer', 'location_id']

In [58]:
transaction_fact.columns

['trans_id',
 'date_id',
 'atm_status',
 'atm_id',
 'currency',
 'card_type_id',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_loc_id',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']