In [580]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as sqlf 
#col, lit, udf,sum,avg,max,min,mean,count, udf 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, LongType

In [581]:
spark = SparkSession.builder.appName('Stage - Ingest').getOrCreate()
conf = SparkConf().setAppName('Stage - Ingest')
sc = SparkContext.getOrCreate(conf=conf)

In [582]:
import json
import logging
from pandas import DataFrame
import os
import pendulum
from datetime import datetime

def get_subdirectories(path: str) -> list:
    """
    Function that searchs for sub-directories inside a directory 
    then returns a list of all the directories names.
    """
    if path.split('/')[7] in ['customer', 'transaction']:
        subdirectories = [f.path for f in os.scandir(path) if f.is_dir()]
        subdirectories = [x.split('/')[8] for x in subdirectories]
        csv_directories = [directory.split('_')[1] for directory in subdirectories]
        return csv_directories
    else:
        subdirectories = [f.path for f in os.scandir(path) if f.is_dir()]
        subdirectories = [x.split('/')[7] for x in subdirectories]
        return [datetime.strptime(directory, '%Y-%m-%d').date() for directory in subdirectories] 
        

def get_latest_folder(folders: list) -> str:
    """
    Returns the latest date from the list.
    """
    try:
        return str(max(folders))
    except:
        return logging.critical('Couldn\'t find any sub-directory.')


def loadJsonData(json_path: str) -> DataFrame:
    """
    Function that returns a dataframe from a valid directory that contains jsonlines files.
    """
    latest = get_latest_folder(get_subdirectories(json_path))
    if os.path.exists(os.path.dirname(os.path.join(json_path, latest))):
        df = spark.read.json(os.path.join(json_path, latest))
        return df
    else:
        return logging.critical('Path to jsonl files doesn\'t exist')


def loadParquetData(parquet_path: str) -> DataFrame:
    """
    Function that returns a dataframe from a valid directory that contains parquet files.
    """
    latest = get_latest_folder(get_subdirectories(parquet_path))
    if os.path.exists(os.path.dirname(parquet_path)):
        df = spark.read.option('recursiveFileLookup', 'true').option('header', 'true').parquet(parquet_path)
        return df
    else:
        return logging.critical('Path to parquet file deosn\'t exist')


def loadCSVData(csv_path: str) -> DataFrame:
    """
    Function that returns a dataframe from a valid directory that contains csv files.
    """
    latest = get_latest_folder(get_subdirectories(csv_path))
    if os.path.exists(os.path.dirname(csv_path)):
        df = spark.read.option('recursiveFileLookup', 'true').option('header', 'true').csv(csv_path)
        return df
    else:
        return logging.critical('Path to csv file doesn\'t exist')

In [585]:
# Ingesting jsonlines data
jsonDF = loadJsonData(json_path='/Users/gonzo/Desktop/capstone_project/data_storage/json_storage/')
jsonDF = jsonDF.select('id', 'ts', 'customer_first_name', 'customer_last_name', 'amount', 'type')
# jsonDF.select(['id']).distinct().count()
jsonDF = jsonDF.repartition(2)
jsonDF.count()

100

In [586]:
# Ingesting parquet data
parquetDF = loadParquetData(parquet_path='/Users/gonzo/Desktop/capstone_project/data_storage/parquet_storage/')
parquetDF = parquetDF.repartition(2)
parquetDF.count()

100

In [602]:
# Ingesting RDBMS (PostgreSQL) data
customerDF = loadCSVData(csv_path='/Users/gonzo/Desktop/capstone_project/data_storage/pgdata/customer/')
customerDF = customerDF.repartition(2)
customerDF.count()

100

In [603]:
# Ingesting RDBMS (PostgreSQL) data
transactionDF = loadCSVData(csv_path='/Users/gonzo/Desktop/capstone_project/data_storage/pgdata/transaction/')
transactionDF = transactionDF.repartition(2)
transactionDF.count()

100

In [604]:
parquetDF = parquetDF.withColumnRenamed('First_name', 'customer_first_name')
parquetDF = parquetDF.withColumnRenamed('Last_name', 'customer_last_name')
parquetDF = parquetDF.withColumnRenamed('Amount', 'amount')
jsonDF = jsonDF.withColumnRenamed('ts', 'timestamp')
jsonDF = jsonDF.withColumnRenamed('Store_id', 'store_id')

In [605]:
# Parquet & Json
json_parquetDF = jsonDF.join(parquetDF, ['id', 'customer_first_name', 'customer_last_name', 'amount', 'timestamp'], "fullouter")
json_parquetDF = json_parquetDF.select('id', 'type', 'store_id', 'amount', 'customer_first_name', 'customer_last_name', 'timestamp')
json_parquetDF.show()

+---+----+--------+----------+-------------------+------------------+-------------------+
| id|type|store_id|    amount|customer_first_name|customer_last_name|          timestamp|
+---+----+--------+----------+-------------------+------------------+-------------------+
|  2|   1|    null|     $4.18|             Brenda|           Sanchez|2022-10-02T19:27:13|
|  4|   0|    null|$23,199.16|               Dawn|              Hill|2022-10-03T02:55:52|
|  6|   0|    null|$84,143.57|            Jeffrey|          Gonzales|2022-10-04T01:20:01|
|  7|null|      17|   $711.35|             Thomas|            Oliver|2022-10-05T10:52:27|
| 10|null|      20|    $67.40|            William|             Reyes|2022-10-04T10:15:45|
| 11|null|       2| $4,516.14|             Jeremy|             Moody|2022-10-03T13:18:38|
| 19|   0|    null|     $7.75|              Sarah|             Craig|2022-10-04T19:55:02|
| 23|null|      18|    $24.07|             Brandi|             Ortiz|2022-10-03T16:48:06|
| 28|null|

In [607]:
# Both dataframes from postgresql
postgresqlDF = customerDF.join(transactionDF, ['id'], "inner")
postgresqlDF = postgresqlDF.select('id', 'customer_id', 'amount', 'first_name', 'last_name', 'phone_number', 'address', 'transaction_ts')
postgresqlDF.show(100)

+---+-----------+----------+-----------+----------+------------+-----------------+-------------------+
| id|customer_id|    amount| first_name| last_name|phone_number|          address|     transaction_ts|
+---+-----------+----------+-----------+----------+------------+-----------------+-------------------+
|778|         39| $8,278.52|    Cynthia|   Johnson|  3253956605|      Veronicaton|2022-10-03 15:09:37|
| 33|        191| $2,917.29|       John|     Burns|  8671875822|Port Benjaminfurt|2022-10-03 05:58:39|
|482|        106| $7,042.53|      Carol| Rodriguez|  9808399017|      Veronicaton|2022-10-03 03:48:50|
|609|        759|    $41.25|    Cynthia| Hernandez|  1825888381|       Robertland|2022-10-03 22:08:51|
|459|        358|   $709.30|       Leah|      Ward|  2449557321|      Lake Karina|2022-10-03 02:52:32|
|414|         52|    $50.11|      David|      Barr|  2388757413|      Veronicaton|2022-10-04 16:07:52|
|556|        808|     $6.85|      Kelly|     Smith|  3880545614|        S

In [684]:
from tokenize import String


postgresqlDF = postgresqlDF.withColumnRenamed('first_name', 'customer_first_name')
postgresqlDF = postgresqlDF.withColumnRenamed('last_name', 'customer_last_name')
postgresqlDF = postgresqlDF.withColumnRenamed('transaction_ts', 'timestamp')
"""
Change id str -> long
customer_id str -> long
timestamp remove ' ' 
"""
def transform_timestampPSQL(x) -> str:
    return x.split(' ')[0]

timestampPSQLUDF = sqlf.udf(lambda x : transform_timestampPSQL(x), StringType())

postgresqlDF = postgresqlDF.withColumn('id', postgresqlDF['id'].cast(LongType()))
postgresqlDF = postgresqlDF.withColumn('customer_id', postgresqlDF['id'].cast(LongType()))
postgresqlDF = postgresqlDF.withColumn('timestamp', timestampPSQLUDF(postgresqlDF.timestamp))
postgresqlDF.show()

+---+-----------+----------+-------------------+------------------+------------+-----------------+----------+
| id|customer_id|    amount|customer_first_name|customer_last_name|phone_number|          address| timestamp|
+---+-----------+----------+-------------------+------------------+------------+-----------------+----------+
|778|        778| $8,278.52|            Cynthia|           Johnson|  3253956605|      Veronicaton|2022-10-03|
| 33|         33| $2,917.29|               John|             Burns|  8671875822|Port Benjaminfurt|2022-10-03|
|482|        482| $7,042.53|              Carol|         Rodriguez|  9808399017|      Veronicaton|2022-10-03|
|609|        609|    $41.25|            Cynthia|         Hernandez|  1825888381|       Robertland|2022-10-03|
|459|        459|   $709.30|               Leah|              Ward|  2449557321|      Lake Karina|2022-10-03|
|414|        414|    $50.11|              David|              Barr|  2388757413|      Veronicaton|2022-10-04|
|556|     

In [685]:
unified_model = json_parquetDF.join(postgresqlDF, ['id', 'amount', 'customer_first_name', 'customer_last_name', 'timestamp'], 'fullouter')
unified_model = unified_model.select('id', 'customer_id', 'store_id', 'type', 'amount', 'customer_first_name', 'customer_last_name', 'phone_number', 'address', 'timestamp')
unified_model.count()

300

In [611]:

# unified_model.coalesce(1)
# unified_model = unified_model.repartition(2)
# unified_model.write.csv('/Users/gonzo/Desktop/capstone_project/data_storage/storage/test', header=True)

In [686]:
"""
Transform amount str -> float
"""
from hashlib import new


def transform_amount(x) -> list:
    return float(x[1:].replace(',', ''))


def transform_timestamp(x) -> str:
    return x.split('T')[0]

transformUDF = sqlf.udf(lambda x : transform_amount(x), FloatType())
timestampUDF = sqlf.udf(lambda x : transform_timestamp(x), StringType())

dumpDF = unified_model.withColumn('float_amount', transformUDF(unified_model.amount))
dumpDF = dumpDF.withColumn('timestamp', timestampUDF(dumpDF.timestamp))

In [687]:
unified_model = dumpDF.select('id', 'customer_id', 'store_id', 'type', 'float_amount', 'customer_first_name', 'customer_last_name', 'phone_number', 'address', 'timestamp')
unified_model = unified_model.withColumnRenamed('float_amount', 'amount')

In [708]:
unified_model.show()

+---+-----------+--------+----+--------+-------------------+------------------+------------+-----------------+----------+
| id|customer_id|store_id|type|  amount|customer_first_name|customer_last_name|phone_number|          address| timestamp|
+---+-----------+--------+----+--------+-------------------+------------------+------------+-----------------+----------+
|  2|       null|    null|   1|    4.18|             Brenda|           Sanchez|        null|             null|2022-10-02|
|  4|       null|    null|   0|23199.16|               Dawn|              Hill|        null|             null|2022-10-03|
|  6|       null|    null|   0|84143.57|            Jeffrey|          Gonzales|        null|             null|2022-10-04|
|  7|       null|      17|null|  711.35|             Thomas|            Oliver|        null|             null|2022-10-05|
| 10|       null|      20|null|    67.4|            William|             Reyes|        null|             null|2022-10-04|
| 11|       null|       

# Count and sum amount transactions for each type for day - online

In [726]:
online = unified_model.select('type', 'timestamp', 'amount')
online = online.filter(online.type == 1).groupBy(online.type, online.timestamp).agg(sqlf.count('*').alias('total_transactions'), sqlf.sum('amount').alias('total'))
online.sort('timestamp').show()

+----+----------+------------------+------------------+
|type| timestamp|total_transactions|             total|
+----+----------+------------------+------------------+
|   1|2022-10-02|                10|171386.61967229843|
|   1|2022-10-03|                10| 69591.97856712341|
|   1|2022-10-04|                17| 80872.64190080762|
|   1|2022-10-05|                 9|185163.73170089722|
+----+----------+------------------+------------------+



# Count and sum amount transactions for each type for day - offline

In [725]:
offline = unified_model.select('type', 'timestamp', 'amount')
offline = offline.filter(offline.type == 0).groupBy(offline.type, offline.timestamp).agg(sqlf.count('*').alias('total_transactions'), sqlf.sum('amount').alias('total'))
offline.sort('timestamp').show()

+----+----------+------------------+------------------+
|type| timestamp|total_transactions|             total|
+----+----------+------------------+------------------+
|   0|2022-10-02|                 6|1485.7499787807465|
|   0|2022-10-03|                16|179331.11799812317|
|   0|2022-10-04|                25|231126.67954114825|
|   0|2022-10-05|                 7| 4441.390130549669|
+----+----------+------------------+------------------+



# Count and sum transaction for each store for day

In [724]:
store_metrics  = unified_model.groupBy(unified_model.store_id, unified_model.timestamp).agg(sqlf.count(unified_model.timestamp).alias('count'), sqlf.sum(unified_model.amount).alias('total'))
store_metrics.sort('timestamp').show()

+--------+----------+-----+------------------+
|store_id| timestamp|count|             total|
+--------+----------+-----+------------------+
|    null|2022-10-02|   29| 425310.2145335674|
|      18|2022-10-02|    2| 782.8299789428711|
|      12|2022-10-02|    1|  98.4000015258789|
|       8|2022-10-02|    1|  68.2699966430664|
|       3|2022-10-02|    1|     66788.5703125|
|       5|2022-10-02|    1|  1369.56005859375|
|      17|2022-10-02|    3| 718.2699794769287|
|      10|2022-10-02|    1| 435.4100036621094|
|      20|2022-10-02|    1| 58.08000183105469|
|       7|2022-10-02|    1| 1110.449951171875|
|      14|2022-10-02|    2| 831.7099990844727|
|       6|2022-10-02|    2| 9549.819726467133|
|    null|2022-10-03|   63| 613245.9728010893|
|      18|2022-10-03|    3| 7946.419731140137|
|       9|2022-10-03|    2| 10383.47998046875|
|      12|2022-10-03|    4| 90181.04906272888|
|       8|2022-10-03|    3|183019.29092407227|
|       1|2022-10-03|    4| 84645.54937458038|
|       3|202

# Count and sum amount transactions for each city for day

In [731]:
cities_transactions = unified_model.groupBy(unified_model.address, unified_model.timestamp).agg(sqlf.count('*').alias('count'), sqlf.sum(unified_model.amount).alias('total'))
cities_transactions.sort('timestamp').show()

+-----------------+----------+-----+------------------+
|          address| timestamp|count|             total|
+-----------------+----------+-----+------------------+
|        Erikville|2022-10-02|    1|    8222.150390625|
|             null|2022-10-02|   32|254683.73966097832|
|     Brownchester|2022-10-02|    1|382.70001220703125|
|        Smithberg|2022-10-02|    1| 6.599999904632568|
|       Robertland|2022-10-02|    1|       49262.71875|
|Port Benjaminfurt|2022-10-02|    1|    65182.94921875|
|         Lanefurt|2022-10-02|    2|            197.25|
|       Grahamstad|2022-10-02|    1|    52058.44921875|
|         Clayview|2022-10-02|    1|  8151.77978515625|
|       Averymouth|2022-10-02|    2| 2983.210006713867|
|    Michelleburgh|2022-10-02|    2| 65990.03750038147|
|        Hicksview|2022-10-03|    2|  4112.47998046875|
|             null|2022-10-03|   53| 716727.3829708099|
|     Edwardsburgh|2022-10-03|    1|  9.40999984741211|
|      Melissafurt|2022-10-03|    5|  54707.2892

# BACKFILLING ALL THE NULL VALUES FROM THE COLUMNS

In [558]:
from faker import Faker
import random

def backfillType(x) -> int:
    if x is None:
        return random.randint(0,1)
    else:
        return x    

def backfillStoreID(x) -> int:
    if x is None:
        return random.randint(1,20)
    else:
        return x

def backfillPhoneNumber(x) -> int:
    fake = Faker()
    if x is None:
        return fake.msisdn()[3:]
    else:
        return x

def backfillAddress(x) -> str:
    if x is None:
        return random.choice(['Allenton', 'Hicksview', 'Smithberg', 'Robertland', 'Veronicaton', 'Lake Jamesville', 'Port Benjaminfurt', 'Averymouth', 'Erikville', 'Port Loriview', 'Grahamstad', 'Edwardsburgh', 'New Marthaborough', 'Melissafurt', 'Lanefurt', 'Clayview', 'West Nichole', 'Brownchester', 'Lake Karina', 'Michelleburgh'])
    else:
        return x

def backfillCustomerID(x) -> int:
    if x is None:
        return random.randint(1, 900)
    else:
        return x

backfillType_udf = sqlf.udf(lambda x : backfillType(x), LongType())
backfillStoreId_udf = sqlf.udf(lambda x : backfillStoreID(x), IntegerType())
backfillPhoneNumber_udf = sqlf.udf(lambda x: backfillPhoneNumber(x), StringType())
backfillAddress_udf = sqlf.udf(lambda x: backfillAddress(x), StringType())
backfillCustomerID_udf = sqlf.udf(lambda x: backfillCustomerID(x), LongType())

backfilledDF = unified_model
backfilledDF.repartition(2)

backfilledDF = backfilledDF.withColumn('f_type', backfillType_udf(unified_model.type))
backfilledDF = backfilledDF.withColumn('f_store_id', backfillStoreId_udf(backfilledDF.store_id))
backfilledDF = backfilledDF.withColumn('f_phone_number', backfillPhoneNumber_udf(backfilledDF.phone_number))
backfilledDF = backfilledDF.withColumn('f_address', backfillAddress_udf(backfilledDF.address))
backfilledDF = backfilledDF.withColumn('f_customer_id', backfillCustomerID_udf(backfilledDF.customer_id))
backfilledDF = backfilledDF.select('id', 'f_customer_id', 'f_store_id', 'f_type', 'float_amount', 'customer_first_name', 'customer_last_name', 'f_phone_number', 'f_address', 'timestamp')
# backfilledDF.show()

[Stage 6079:>                                                       (0 + 1) / 1]

+---+-------------+----------+------+------------+-------------------+------------------+--------------+-----------------+-------------------+
| id|f_customer_id|f_store_id|f_type|float_amount|customer_first_name|customer_last_name|f_phone_number|        f_address|          timestamp|
+---+-------------+----------+------+------------+-------------------+------------------+--------------+-----------------+-------------------+
|  2|          815|         6|     1|        4.18|             Brenda|           Sanchez|    7904112847|       Grahamstad|2022-10-02T19:27:13|
|  4|          791|        13|     0|    23199.16|               Dawn|              Hill|    8456415789|New Marthaborough|2022-10-03T02:55:52|
|  6|          147|        19|     0|    84143.57|            Jeffrey|          Gonzales|    5167666735|Port Benjaminfurt|2022-10-04T01:20:01|
|  7|          101|        17|     1|      711.35|             Thomas|            Oliver|    3303626529|Port Benjaminfurt|2022-10-05T10:52:27|

                                                                                

In [677]:
x = '2022-10-03 12:56:18'
x = x.split(' ')
print(x[0])

2022-10-03
