<a href="https://colab.research.google.com/github/desininja/DE1_practice/blob/4-project-etl/ETL_Spark_Job.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [123]:
#!pip install pyspark

In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL Project").master('local').getOrCreate()

sc = spark.sparkContext

In [21]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,BooleanType, DoubleType,LongType,FloatType


In [22]:
fileSchema = StructType([StructField('year',IntegerType(),True),
                        StructField('month',StringType(),True),
                         StructField('day',IntegerType(),True),
                         StructField('weekday',StringType(),True),
                         StructField('hour',IntegerType(),True),
                         StructField('atm_status',StringType(),True),
                         StructField('atm_id',StringType(),True),
                         StructField('atm_manufacturer',StringType(),True),
                         StructField('atm_location',StringType(),True),
                         StructField('atm_streetname',StringType(),True),
                         StructField('atm_street_number',IntegerType(),True),
                         StructField('atm_zipcode',IntegerType(),True),
                         StructField('atm_lat',DoubleType(),True),
                         StructField('atm_lon',DoubleType(),True),
                         StructField('currency',StringType(),True),
                         StructField('card_type',StringType(),True),
                         StructField('transaction_amount',IntegerType(),True),
                         StructField('service',StringType(),True),
                         StructField('message_code',StringType(),True),
                         StructField('message_text',StringType(),True),
                         StructField('weather_lat',DoubleType(),True),
                         StructField('weather_lon',DoubleType(),True),
                         StructField('weather_city_id',IntegerType(),True),
                         StructField('weather_city_name',StringType(),True),
                         StructField('temp',DoubleType(),True),
                         StructField('pressure',IntegerType(),True),
                         StructField('humidity',IntegerType(),True),
                         StructField('wind_speed',IntegerType(),True),
                         StructField('wind_deg',IntegerType(),True),
                         StructField('rain_3h',DoubleType(),True),
                         StructField('clouds_all',IntegerType(),True),
                         StructField('weather_id',IntegerType(),True),
                         StructField('weather_main',StringType(),True),
                         StructField('weather_description',StringType(),True)])


In [135]:
#df1 = spark.read.csv('/content/atm_data_part1.csv',header=True,schema=fileSchema,sep=',')
#df2 = spark.read.csv('/content/atm_data_part2.csv',header=True,schema=fileSchema,sep=',')

df = spark.read.csv('/content/atm_transaction.csv',schema = fileSchema, sep=',')

In [129]:
df.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------

In [130]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [136]:
df.count()

2468572

In [133]:
df.count()

2455866

In [137]:
df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [138]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.functions import col, concat_ws


In [139]:
#Location dimension - This is a very important dimension containing all the location data including
#location name, street name, street number, zip code and even the latitude and longitude. This
#information will be very important for solving problems related to the particular location at which
#a transaction took place and can help banks in things like pinpointing locations where ATMs where
#demand is higher as compared to other locations. Combined with weather data in the transaction table,
# this can be used to further do analysis such as how weather affects the demand at ATMs
#at a particular location.


dim_location = df.select(col('atm_location').alias('location'),col('atm_streetname').alias('street_name'),col('atm_street_number').alias('street_number'),col('atm_zipcode').alias('zipcode'),col('atm_lat').alias('lat'),col('atm_lon').alias('lon')).distinct()


In [140]:
dim_location = dim_location.select('location','street_name','street_number','zipcode','lat','lon',F.row_number().over(Window.partitionBy().orderBy(dim_location['lon'])).alias('location_id'))
dim_location.show()


+--------------------+-------------------+-------------+-------+------+-----+-----------+
|            location|        street_name|street_number|zipcode|   lat|  lon|location_id|
+--------------------+-------------------+-------------+-------+------+-----+-----------+
|          SÃƒÂ¦dding|        Tarphagevej|           59|   6710|55.498|8.408|          1|
|             Esbjerg|       Strandbygade|           20|   6700|55.468| 8.44|          2|
|           Holstebro|        Hostrupsvej|            6|   7500|56.373|8.625|          3|
|            Vinderup|      SÃƒÂ¸ndergade|            5|   7830|56.481|8.779|          4|
|NykÃƒÂ¸bing Mors ...|        Kirketorvet|            1|   7900|56.795| 8.86|          5|
|    NykÃƒÂ¸bing Mors|        Kirketorvet|            1|   7900|56.795| 8.86|          6|
|         GlyngÃƒÂ¸re|        FÃƒÂ¦rgevej|            1|   7870|56.762|8.867|          7|
|               Durup|             Torvet|            4|   7870|56.745|8.949|          8|
|         

In [141]:
dim_location.count()

109

In [142]:
#ATM dimension - This dimension will have the data related to the various ATMs
#present in the dataset along with the ATM number(ATM ID in the original dataset),
#ATM manufacturer and a reference to the ATM location and is very important for
#solving analytical queries related where ATM data will be used.



dim_atm = df.select(col('atm_location').alias('location'),
                         col('atm_streetname').alias('street_name'),
                         col('atm_street_number').alias('street_number'),
                         col('atm_zipcode').alias('zipcode'),
                         col('atm_lat').alias('lat'),
                         col('atm_lon').alias('lon'),
                         col("atm_id").alias("atm_number"),
                         col("atm_manufacturer").alias("manufacturer")).distinct()


dim_atm = dim_atm.join(dim_location, on=['location','street_name','street_number','zipcode','lat','lon'],how='left').select('atm_number','manufacturer','location_id')

dim_atm.show()

+----------+---------------+-----------+
|atm_number|   manufacturer|location_id|
+----------+---------------+-----------+
|        45|            NCR|         84|
|       105|Diebold Nixdorf|         50|
|        46|Diebold Nixdorf|        109|
|        69|            NCR|         72|
|        67|            NCR|         16|
|        97|            NCR|         73|
|        48|Diebold Nixdorf|         56|
|        77|            NCR|         91|
|        30|            NCR|          6|
|       106|            NCR|        100|
|       101|            NCR|         24|
|        75|            NCR|         53|
|        80|            NCR|         68|
|        78|Diebold Nixdorf|         92|
|        51|            NCR|         61|
|         6|            NCR|         32|
|        85|Diebold Nixdorf|        106|
|        23|Diebold Nixdorf|         64|
|        65|            NCR|         70|
|        86|            NCR|        102|
+----------+---------------+-----------+
only showing top

In [143]:
dim_atm = dim_atm.select('atm_number','manufacturer','location_id',F.row_number().over(Window.partitionBy().orderBy(dim_atm['atm_number'])).alias("atm_id"))
dim_atm.show()
dim_atm.count()


+----------+---------------+-----------+------+
|atm_number|   manufacturer|location_id|atm_id|
+----------+---------------+-----------+------+
|         1|            NCR|         97|     1|
|        10|            NCR|         50|     2|
|       100|            NCR|         14|     3|
|       101|            NCR|         24|     4|
|       102|            NCR|         41|     5|
|       103|Diebold Nixdorf|         57|     6|
|       104|            NCR|         49|     7|
|       105|Diebold Nixdorf|         50|     8|
|       106|            NCR|        100|     9|
|       107|Diebold Nixdorf|         22|    10|
|       108|            NCR|         65|    11|
|       109|Diebold Nixdorf|         43|    12|
|        11|            NCR|         54|    13|
|       110|Diebold Nixdorf|         95|    14|
|       111|Diebold Nixdorf|         76|    15|
|       112|Diebold Nixdorf|        108|    16|
|       113|Diebold Nixdorf|         94|    17|
|        12|            NCR|         52|

113

In [144]:

from pyspark.sql.functions import *

In [145]:
#Date dimension - This is another very important dimension which is almost always present where data
#such as transactional data is being dealt with. This dimension includes fields such as the full date
#and time timestamp, year, month, day, hour as well as the weekday for a transaction. This all can help
#in analysing the transaction behaviour with respect to the time at which the transaction took place
#and also how the transaction activity varies between weekdays and weekends.



def month_to_numeric(month_name):
    # Extract the month name string from the Column object
    month_name_str = str(month_name)
    months = {
        "January": "01",
        "February": "02",
        "March": "03",
        "April": "04",
        "May": "05",
        "June": "06",
        "July": "07",
        "August": "08",
        "September": "09",
        "October": "10",
        "November": "11",
        "December": "12"
    }
    return months.get(month_name_str, "00")  # Return "00" for unknown months


month_to_numeric("January")



'01'

In [146]:
dim_date = df.select('year','month','day','hour','weekday').distinct()

In [147]:
res_date_df = dim_date.withColumn("full_date",from_unixtime(unix_timestamp(concat(dim_date.year.cast(StringType()),dim_date.month.cast(StringType()),lpad(dim_date.day.cast(StringType()),2,'0'),lpad(dim_date.hour.cast(StringType()),2,'0')),'yyyyMMMMddHH'),'yyyy-MM-dd HH:mm:ss'))

res_date_df.show()

+----+--------+---+----+--------+-------------------+
|year|   month|day|hour| weekday|          full_date|
+----+--------+---+----+--------+-------------------+
|2017| January|  1|   9|  Sunday|2017-01-01 09:00:00|
|2017| January|  3|   5| Tuesday|2017-01-03 05:00:00|
|2017| January|  8|  19|  Sunday|2017-01-08 19:00:00|
|2017| January| 21|   3|Saturday|2017-01-21 03:00:00|
|2017| January| 23|  21|  Monday|2017-01-23 21:00:00|
|2017|February|  2|  19|Thursday|2017-02-02 19:00:00|
|2017|February|  5|  16|  Sunday|2017-02-05 16:00:00|
|2017|February| 21|  15| Tuesday|2017-02-21 15:00:00|
|2017|   March|  2|   8|Thursday|2017-03-02 08:00:00|
|2017|   April|  2|   2|  Sunday|2017-04-02 02:00:00|
|2017|   April|  6|   8|Thursday|2017-04-06 08:00:00|
|2017| January|  5|  21|Thursday|2017-01-05 21:00:00|
|2017| January| 17|  17| Tuesday|2017-01-17 17:00:00|
|2017| January| 22|  15|  Sunday|2017-01-22 15:00:00|
|2017| January| 27|  22|  Friday|2017-01-27 22:00:00|
|2017|February| 11|   7|Satu

In [148]:
dim_date = res_date_df.select("full_date","year","month","day","hour",
                                    "weekday",
                                    F.row_number().over(Window.partitionBy().orderBy(res_date_df['full_date'])).alias("date_id"))
#F.row_number().over(Window.partitionBy().orderBy(dim_atm['atm_number'])).alias("atm_id"))
dim_date.show()

+-------------------+----+-------+---+----+-------+-------+
|          full_date|year|  month|day|hour|weekday|date_id|
+-------------------+----+-------+---+----+-------+-------+
|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|      1|
|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|      2|
|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|      3|
|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|      4|
|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|      5|
|2017-01-01 05:00:00|2017|January|  1|   5| Sunday|      6|
|2017-01-01 06:00:00|2017|January|  1|   6| Sunday|      7|
|2017-01-01 07:00:00|2017|January|  1|   7| Sunday|      8|
|2017-01-01 08:00:00|2017|January|  1|   8| Sunday|      9|
|2017-01-01 09:00:00|2017|January|  1|   9| Sunday|     10|
|2017-01-01 10:00:00|2017|January|  1|  10| Sunday|     11|
|2017-01-01 11:00:00|2017|January|  1|  11| Sunday|     12|
|2017-01-01 12:00:00|2017|January|  1|  12| Sunday|     13|
|2017-01-01 13:00:00|2017|January|  1|  

In [149]:
dim_date.count()

8685

In [150]:
#Card type dimension - This dimension has the information about the particular card type with which
#a particular transaction took place. This can help in performing analysis on how the number of
#transactions varies with respect to each different card type.

dim_card_type = df.select('card_type').distinct()

dim_card_type = dim_card_type.select('card_type',F.row_number().over(Window.partitionBy().orderBy(dim_card_type['card_type'])).alias("card_id"))
dim_card_type.show()
dim_card_type.count()

+--------------------+-------+
|           card_type|card_id|
+--------------------+-------+
|              CIRRUS|      1|
|             Dankort|      2|
|     Dankort - on-us|      3|
|         HÃƒÂ¦vekort|      4|
| HÃƒÂ¦vekort - on-us|      5|
|             Maestro|      6|
|          MasterCard|      7|
|  Mastercard - on-us|      8|
|                VISA|      9|
|        Visa Dankort|     10|
|Visa Dankort - on-us|     11|
|            VisaPlus|     12|
+--------------------+-------+



12

In [151]:
#Transaction fact - This is the actual fact table for the data set which contains all of the
#numerical data such as the currency of the transaction, service, transaction amount, message
#code and text as well as weather info such as description, weather id etc.


dim_atm.show(3)

+----------+------------+-----------+------+
|atm_number|manufacturer|location_id|atm_id|
+----------+------------+-----------+------+
|         1|         NCR|         97|     1|
|        10|         NCR|         50|     2|
|       100|         NCR|         14|     3|
+----------+------------+-----------+------+
only showing top 3 rows



In [152]:
dim_location.show(3)

+----------+------------+-------------+-------+------+-----+-----------+
|  location| street_name|street_number|zipcode|   lat|  lon|location_id|
+----------+------------+-------------+-------+------+-----+-----------+
|SÃƒÂ¦dding| Tarphagevej|           59|   6710|55.498|8.408|          1|
|   Esbjerg|Strandbygade|           20|   6700|55.468| 8.44|          2|
| Holstebro| Hostrupsvej|            6|   7500|56.373|8.625|          3|
+----------+------------+-------------+-------+------+-----+-----------+
only showing top 3 rows



In [153]:
dim_date.show(3)

+-------------------+----+-------+---+----+-------+-------+
|          full_date|year|  month|day|hour|weekday|date_id|
+-------------------+----+-------+---+----+-------+-------+
|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|      1|
|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|      2|
|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|      3|
+-------------------+----+-------+---+----+-------+-------+
only showing top 3 rows



In [154]:
dim_card_type.show(3)

+---------------+-------+
|      card_type|card_id|
+---------------+-------+
|         CIRRUS|      1|
|        Dankort|      2|
|Dankort - on-us|      3|
+---------------+-------+
only showing top 3 rows



In [155]:
stg1_df = df.join(dim_date, on=["year","month",'day',"hour","weekday"], how='left').select(*(df.columns+[dim_date.date_id])).drop(*['year','month','day','hour','weekday'])

In [156]:
stg1_df.show(1)

+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+
|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|
+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+---------

In [157]:


stg2_df = stg1_df.join(dim_card_type, on=['card_type'], how ='left').select(*(stg1_df.columns+[dim_card_type.card_id])).drop(*['card_type'])


In [158]:
stg2_df.show(2)
stg2_df.printSchema()

+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+
|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_id|
+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+------

In [159]:
# |location_id|
stg3_df = stg2_df.withColumnsRenamed({"atm_manufacturer":"manufacturer",
                                      "atm_location":"location",
                                      "atm_streetname":"street_name",
                                      "atm_street_number":"street_number",
                                      "atm_zipcode":"zipcode",
                                      "atm_lat":"lat",
                                      "atm_lon":"lon",
                                      "atm_id":"atm_number"})


stg3_df.show(3)

+----------+----------+------------+----------+-----------+-------------+-------+------+------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+
|atm_status|atm_number|manufacturer|  location|street_name|street_number|zipcode|   lat|   lon|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_id|
+----------+----------+------------+----------+-----------+-------------+-------+------+------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------

In [160]:
stg4_df = stg3_df.join(dim_location, on=["location","street_name","street_number","zipcode","lat","lon"], how='left').select(*(stg3_df.columns+[dim_location.location_id])).drop(*["location","street_name","street_number","zipcode","lat","lon"])

stg4_df.show(3)

+----------+----------+------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+
|atm_status|atm_number|manufacturer|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_id|location_id|
+----------+----------+------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+
|    Active|         1|         NCR|     DKK|              5643|Withdrawal|        NULL|        NULL|      55.23

In [161]:
#|atm_number|manufacturer|location_id|atm_id|

stg5_df = stg4_df.join(dim_atm, on =['atm_number',"manufacturer","location_id"],how='left').select(*(stg4_df.columns+[dim_atm.atm_id])).drop(*['atm_number',"manufacturer"])

stg5_df.show(2)

+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+------+
|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_id|location_id|atm_id|
+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+------+
|    Active|     DKK|              5643|Withdrawal|        NULL|        NULL|      55.23|     11.761|        2616038|         Naestved|281.15|    1014|      87|   

In [162]:
stg5_df.count()

2468572

In [163]:
df.count()

2468572

In [164]:

stg5_df.columns

['atm_status',
 'currency',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'date_id',
 'card_id',
 'location_id',
 'atm_id']

In [165]:
trans_fact = stg5_df.select('atm_status','currency','transaction_amount', 'service',
 'message_code', 'message_text', 'weather_lat', 'weather_lon',
 'weather_city_id', 'weather_city_name', 'temp', 'pressure',
 'humidity', 'wind_speed', 'wind_deg', 'rain_3h',
 'clouds_all', 'weather_id', 'weather_main', 'weather_description',
 'date_id', 'card_id', 'location_id', 'atm_id',F.row_number().over(Window.partitionBy().orderBy(stg5_df['atm_id'])).alias("trans_id"))

trans_fact.show(2)

+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+------+--------+
|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_id|location_id|atm_id|trans_id|
+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+-------+-----------+------+--------+
|    Active|     DKK|              3975|Withdrawal|        NULL|        NULL|      55.23|     11.761|        2616038|         Naestved|2

In [166]:
trans_fact.count()

2468572

In [None]:
trans_fact.coalesce(10).write.csv("address", mode="overwrite")

In [None]:

dim_location.coalesce(1).write.csv("address", mode="overwrite")
dim_atm.coalesce(1).write.csv("address", mode="overwrite")
dim_date.coalesce(1).write.csv("address",mode="overwrite")
dim_card_type.coalese(1).write.csv("address",mode="overwrite")

In [168]:

# Writing DataFrame to CSV
#trans_fact.write.csv("sample_data/transaction_fact_table.csv", header=True)
trans_fact.coalesce(10).write.csv("sample_data/transaction_fact_table_1.csv", header=True,mode="overwrite")


In [170]:
dim_location.coalesce(1).write.csv("sample_data/dim_location.csv", mode="overwrite")
dim_atm.coalesce(1).write.csv("sample_data/dim_atm.csv", mode="overwrite")
dim_date.coalesce(1).write.csv("sample_data/dim_date.csv",mode="overwrite")
dim_card_type.coalesce(1).write.csv("sample_data/dim_card_type.csv",mode="overwrite")