## ETL Pipelines ##
***

Call RAW Transaction.csv file, as well as output file path.

In [70]:
RAW_FILE_PATH = '../../data/raw/Transaction.csv'
OUTPUT_PATH = '../../data/'

Run schema definitions

In [71]:
%run '../../schema_definition/schema.py'

Import schema definitions

In [72]:
import sys  
sys.path.insert(0, '../../')

from schema_definition.schema import *

Import Pyspark related libraries

In [101]:
from pyspark.shell import spark
from pyspark.sql.types import *
from pyspark.sql import functions as F

Create a user defined function to add timestamp to ouput folder names.

In [102]:
from datetime import datetime

def file_processed_date():
    return datetime.now().strftime("%Y-%m-%d")

spark.udf.register("file_processed_date", file_processed_date, TimestampType())

21/10/16 01:02:27 WARN SimpleFunctionRegistry: The function file_processed_date replaced a previously registered function.


<function __main__.file_processed_date()>

This variable enables legacy TimeStamp datatype conversion

In [103]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

### BRONZE ###

This paragraph will read/extract the data from the Transaction.csv file, force the schema to the default 17 columns and write onto a
tab separated csv file.

In [117]:
raw_df = spark.read.csv(RAW_FILE_PATH, sep=',', header=True, schema=bronze_schema, enforceSchema=True)
raw_df.write.format("parquet").mode("overwrite").save(OUTPUT_PATH + 'bronze/' + 'date' + '=' + file_processed_date())

21/10/16 01:18:02 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 19, schema size: 17
CSV file: file:///Users/arturogonzalez/DataspellProjects/transactions_notebooks/data/raw/Transaction.csv


### SILVER ###

This paragraph will:
- Augment the data with a hash key.
- Filter out the questionable data.
- Apply DataTypes.


In [119]:
bronze_df = spark.read.parquet(OUTPUT_PATH + "bronze", mergeSchema=True)

augment = bronze_df.withColumn("HashKey", F.sha2(F.concat_ws("||", *bronze_df.columns), 256))

convert_to_timestamp = augment \
    .withColumn("AccountID",F.col("AccountID").cast(IntegerType())) \
    .withColumn("CODE",F.col("CODE").cast(IntegerType())) \
    .withColumn("ActiveIndicator",F.col("ActiveIndicator").cast(IntegerType())) \
    .withColumn("ImplementedDate", F.unix_timestamp("ImplementedDate", "d/MM/yyyy HH:mm").cast(TimestampType())) \
    .withColumn("RequestDate", F.to_timestamp("RequestDate", 'dd/MM/yyyy HH:mm').cast(TimestampType())) \
    .withColumn("StatusCode",F.col("StatusCode").cast(IntegerType())) \
    .withColumn("Amount",F.col("Amount").cast(DoubleType())) \
    .withColumn("AgentID",F.col("AgentID").cast(IntegerType())) \
    .withColumn("LastUpdatedDate", F.to_timestamp("LastUpdatedDate", 'dd/MM/yyyy HH:mm').cast(TimestampType())) \
    .withColumn("PostCode",F.col("PostCode").cast(IntegerType()))


calculate_response = convert_to_timestamp.withColumn("Response", F.datediff(F.col("RequestDate"), F.col("ImplementedDate")))
calculate_response.createOrReplaceTempView("fastest_response")

fastest_response_query = """
                        SELECT * 
                        FROM fastest_response 
                        ORDER BY Response DESC
                        """
fastest_response_df = spark.sql(fastest_response_query)

drop_nulls = fastest_response_df.na.drop(subset=["AccountID"])
filter_out = drop_nulls.filter(~drop_nulls.Fibre.startswith('2.67E'))
drop_date_column  = filter_out.drop('date')
drop_date_column.write.format("parquet").mode("overwrite").save(OUTPUT_PATH + 'silver/' + 'date' + '=' + file_processed_date())

### GOLD ###

This paragraph will:

- When the number of events reach 1000, output the events to a JSON file.
- The output filenames should have a batch number e.g. the second 1000 records will go into a file
called 2.json and so on.

In [121]:
silver_df = spark.read.parquet(OUTPUT_PATH + "silver", mergeSchema=True)
drop_date_column  = silver_df.drop('date')
drop_date_column.coalesce(1).write.option("maxRecordsPerFile", 1000).json(OUTPUT_PATH + 'gold', mode='overwrite')

root
 |-- AccountID: integer (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- ImplementedDate: timestamp (nullable = true)
 |-- ActiveIndicator: integer (nullable = true)
 |-- AccountType: string (nullable = true)
 |-- Service: string (nullable = true)
 |-- BU: string (nullable = true)
 |-- RequestDate: timestamp (nullable = true)
 |-- AccountStatus: string (nullable = true)
 |-- StatusCode: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Version: string (nullable = true)
 |-- AgentID: integer (nullable = true)
 |-- Fibre: string (nullable = true)
 |-- LastUpdatedDate: timestamp (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- PostCode: integer (nullable = true)
 |-- HashKey: string (nullable = true)
 |-- Response: integer (nullable = true)

