In [1]:
import os
import sys
from pathlib import Path
curr_path = str(Path(os.getcwd()).parent)
sys.path.append(curr_path)
from scripts.sa2_age_allocation import *
from scripts.constants import *
from scripts.load import *
from scripts.transform import *
from scripts.read import *
from scripts.misc_changes import *
from scripts.external_etl import *
from scripts.join import *
from scripts.plotting import *
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql.functions import *
from pyspark.sql.column import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import geopandas as gpd
import pandas as pd 
import numpy as np
import math
import re
import random
import json

# start a spark session
spark = create_spark()
PREFIX = "."

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/01 15:10:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
all_transactions = read_curated_transactions_all(spark, PREFIX)
external_data = pd.read_csv("../data/curated/external_joined_data.csv")
consumer_external = spark.read.parquet("../data/curated/consumer_external_join.parquet/")
# external_data = read_curated_external_join(spark, PREFIX)
# external_data = external_data.astype({SA2_CODE: "str", POSTCODE: "str"})
# external_data[POSTCODE] = external_data[POSTCODE].astype(int)
# sa2_2021_mapping = pd.read_csv("../data/raw/CG_SA2_2016_SA2_2021.csv")
# postcodes_to_sa2_2016 = pd.read_csv("../data/raw/australian_postcodes.csv")
# consumer_external = read_curated_consumer_external_join(spark, PREFIX)
# consumer_data = read_curated_consumer_join(spark, PREFIX)
# external_data

                                                                                

In [3]:
merchant_fraud = read_curated_merchant_fraud(spark, PREFIX)

In [4]:
merchant_fraud

PAR1�\a�\a����<z����C��Ӊ�����K\a����� S����W%+���;�׃���zu�s�����ȶ����_�����`�(�W�����U��\a
���M��G���to���...
C�(�ײ����lT...
���<;!g\b5Z�N...
���2021-11-28\f...
8�1�1T.*�\f1-21
*\f1-102b��0.8�0...
*T.���02��6T�\b2...
b\v~\b2-0T...
.&�2��1T5�'...
��oo�A@��#\aw�E@�...


In [None]:
raw_transactions = round_dollar_values(spark.read.parquet(PREFIX+RAW_TRANSACTIONS_PATH))

In [None]:
raw_transactions.groupBy(MERCHANT_ABN, ORDER_DATETIME).agg(sum(DOLLAR_VALUE))

### Fraud Probability

In [None]:
PREFIX = "."
consumer_fraud = read_raw_consumer_fraud(spark, PREFIX)
consumer_joined = read_curated_consumer_join(spark, PREFIX)
raw_transactions = spark.read.parquet("../data/raw/raw_transactions/")
merchant_fraud = read_raw_merchant_fraud(spark, PREFIX)
tbl_merchants = encode_revenue_level(read_mapped_industry_data(spark, PREFIX))
tbl_merchants = tbl_merchants.withColumn(TAKE_RATE, col(TAKE_RATE).cast(FloatType()))

In [None]:
def extract_date_features(df: DataFrame) -> DataFrame:
    df = df.withColumns({
        ORDER_DAY_OF_MONTH: dayofmonth(col(ORDER_DATETIME)),
        ORDER_MONTH: month(col(ORDER_DATETIME)),
        ORDER_YEAR: year(col(ORDER_DATETIME))
    })
    
    return df

def rename_fraud_prob_column(df:DataFrame, new_column: str) -> DataFrame:
    return df.withColumnRenamed(FRAUD_PROBABILITY, new_column)

# perform log-transformation for fraud probabilities
consumer_fraud = consumer_fraud.withColumn(FRAUD_PROBABILITY, log(col(FRAUD_PROBABILITY)))
merchant_fraud = merchant_fraud.withColumn(FRAUD_PROBABILITY, log(col(FRAUD_PROBABILITY)))

# rename fraud probability columns to relevant dataset name
merchant_fraud = rename_fraud_prob_column(merchant_fraud, MERCHANT_FRAUD_PROB)
consumer_fraud = rename_fraud_prob_column(consumer_fraud, CONSUMER_FRAUD_PROB)

# extract the date features
merchant_fraud = extract_date_features(merchant_fraud)
consumer_fraud = extract_date_features(consumer_fraud)

# transform the transaction columns to preserve only 2 dp
# raw_transactions = raw_transactions.groupBy([MERCHANT_ABN, ORDER_DATETIME]).agg(avg(DOLLAR_VALUE).alias(DOLLAR_VALUE))
raw_transactions = round_dollar_values(raw_transactions)
raw_transactions = extract_date_features(raw_transactions)

In [None]:
# spark.read.csv("../data/curated/postcode_to_sa2_map.csv", header=True)

In [None]:
# specify columns to drop and that are categorical
drop_cols = (NAME, ORDER_ID, USER_ID, ORDER_DATETIME)
cat_cols = (MERCHANT_ABN, ORDER_YEAR, ORDER_MONTH, ORDER_DAY_OF_MONTH, INDUSTRY_TAGS)
# cat_cols = (MERCHANT_ABN, INDUSTRY_TAGS)
MERCHANT_JOIN_COLS = [MERCHANT_ABN, ORDER_YEAR, ORDER_MONTH, ORDER_DAY_OF_MONTH]
# MERCHANT_JOIN_COLS = [MERCHANT_ABN, ORDER_DATETIME]
LABEL = "label"
LOWER_BOUND = 50.0
UPPER_BOUND = 80.0
FEATURES = "features"

In [None]:
# join all the merchant-related columns with the transactions
merchant_transactions = raw_transactions.join(merchant_fraud, on=MERCHANT_JOIN_COLS, how=OUTER_JOIN)\
    .join(tbl_merchants, on=[MERCHANT_ABN], how=INNER_JOIN)

# drop uninformative columns
merchant_transactions = merchant_transactions.drop(*drop_cols)

In [None]:
# cast the merchant ABN, order datetime, and user ID as strings for string indexing 
merchant_transactions = merchant_transactions.withColumns({
    MERCHANT_ABN: col(MERCHANT_ABN).cast(StringType()),
    ORDER_DAY_OF_MONTH: col(ORDER_DAY_OF_MONTH).cast(StringType()),
    ORDER_MONTH: col(ORDER_MONTH).cast(StringType()),
    ORDER_YEAR: col(ORDER_YEAR).cast(StringType())
})

# merchant_transactions = merchant_transactions.withColumn(LABEL,
#                         when(col(MERCHANT_FRAUD_PROB) <= LOWER_BOUND, NO)\
#                         .when(col(MERCHANT_FRAUD_PROB) >= UPPER_BOUND, YES)\
#                         .when(col(MERCHANT_FRAUD_PROB).isNull(), None)\
#                         .otherwise(MAYBE))


In [None]:
# merchant_transactions.show(5) 

In [None]:
merchant_transactions.printSchema()

In [None]:
def predict_merchant_fraud_probability(merchant_transactions: DataFrame):
    """
    Predict the remaining unknown merchant fraud probabilities based on transactions and dollar value
    Args:
        merchant_transactions (DataFrame): Dataframe containing aggregated 
    Returns:
        rf: Initialised Random Forest Regressor model
        rf_model: Fitted Random Forest Regressor model
        pred_df: Dataframe of predicted models
    """
    INDEXED_COL = "_indexed"
    DIFFERENCE = "difference"
    PREDICTION = "prediction"
    
    LOWER_BOUND = 50.0
    UPPER_BOUND = 80.0
    cat_cols = (ORDER_YEAR, ORDER_MONTH, ORDER_DAY_OF_MONTH, INDUSTRY_TAGS)
    input_assembler_cols = [DOLLAR_VALUE, TAKE_RATE, REVENUE_LEVEL, 
                            "order_day_of_month_indexed", "order_month_indexed", "order_year_indexed", "industry_tags_indexed"]

    # merchant_abns = merchant_transactions.select(MERCHANT_ABN)
    # merchant_abns = list(merchant_abns.distinct().rdd.flatMap(lambda x: x).collect())

    print("PERFORM STRING INDEXING")
    for column in cat_cols:
        col_indexer = StringIndexer(inputCol=column, outputCol=column+INDEXED_COL)
        merchant_transactions = col_indexer.fit(merchant_transactions).transform(merchant_transactions)

    print("PERFORM VECTOR ASSEMBLING")
    assembler = VectorAssembler(inputCols=input_assembler_cols, outputCol="features")
    merchant_fraud_transactions = assembler.transform(merchant_transactions)
 
    print("SPLIT DATA INTO KNOWN AND UNKNOWN FRAUD PROBABILITIES")
    train_test_merchants = merchant_fraud_transactions.where(col(MERCHANT_FRAUD_PROB).isNotNull())
    to_predict_merchants = merchant_fraud_transactions.where(col(MERCHANT_FRAUD_PROB).isNull())

    print("SPLIT KNOWN PROBABILITIES INTO TRAIN-TEST SET")
    train_merchants, test_merchants = train_test_merchants.randomSplit([0.9, 0.1], seed=42)
    # print(train_merchants.count())
    # print(test_merchants.count())

    print("INITIALISE RFR MODEL")    
    rf = RandomForestRegressor(featuresCol=FEATURES, labelCol=MERCHANT_FRAUD_PROB)
    print("FIT RFR MODEL WITH TRAIN SET")
    rf_model = rf.fit(train_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
    print("TRANSFORM AND PREDICT RFR MODEL WITH TEST SET")
    predictions = rf_model.transform(test_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))

    print("TRANSFORM THE PREDICTIONS TO EXP OF PREDICTIONS")
    pred_df = predictions.withColumns({
        PREDICTION: exp(col(PREDICTION)),
        MERCHANT_FRAUD_PROB: exp(col(MERCHANT_FRAUD_PROB))})
    pred_df = pred_df.withColumn(DIFFERENCE, col(PREDICTION) - col(MERCHANT_FRAUD_PROB))

    print("RETURN ALL VALUES NEEDED")
    return rf, rf_model, pred_df, to_predict_merchants

In [None]:
rf, rf_model, diff, predicting_merchants = predict_merchant_fraud_probability(merchant_transactions)

In [None]:
PREDICTION = "prediction"

In [None]:
diff

In [None]:
mae_evaluator = RegressionEvaluator(labelCol=MERCHANT_FRAUD_PROB, predictionCol=PREDICTION, metricName="mae")
mae_evaluator.evaluate(diff)

In [None]:
r2_evaluator = RegressionEvaluator(labelCol=MERCHANT_FRAUD_PROB, predictionCol=PREDICTION, metricName="r2")
r2_evaluator.evaluate(diff)

In [None]:
pred_df = rf_model.transform(predicting_merchants)
pred_df

In [None]:
pred_df = pred_df.withColumn("prediction", exp(col("prediction")))
pred_df = pred_df.drop(MERCHANT_FRAUD_PROB)
pred_df = pred_df.withColumnRenamed(existing=PREDICTION, new=MERCHANT_FRAUD_PROB)
pred_df

In [None]:
# input_assembler_cols = [DOLLAR_VALUE, TAKE_RATE, REVENUE_LEVEL, f"{MERCHANT_ABN}_indexed", "order_day_of_month_indexed", "order_month_indexed", "order_year_indexed", "industry_tags_indexed"]
# input_assembler_cols = [DOLLAR_VALUE, TAKE_RATE, REVENUE_LEVEL, "merchant_abn_indexed", "industry_tags_indexed"]

In [None]:
drop_cols = [col for col in pred_df.columns if "_indexed" in col]
drop_cols

In [None]:
new_predictions = pred_df.drop(*drop_cols)
new_predictions = new_predictions.drop(FEATURES)
new_predictions

In [None]:
merchant_transactions.printSchema()

In [None]:
fraud_transactions = merchant_transactions.where(col(MERCHANT_FRAUD_PROB).isNotNull())
fraud_transactions = fraud_transactions.withColumn(MERCHANT_FRAUD_PROB, exp(col(MERCHANT_FRAUD_PROB)))

In [None]:
all_transactions = fraud_transactions.union(new_predictions)
all_transactions

In [None]:
print(all_transactions.count())

In [None]:
all_transactions = all_transactions.where(col(MERCHANT_FRAUD_PROB) < 60.0)

#### Begin Modelling Merchant Fraud Probability

In [None]:
train_test_merchants = merchant_fraud_transactions.where(col(MERCHANT_FRAUD_PROB).isNotNull())
to_predict_merchants = merchant_fraud_transactions.where(col(MERCHANT_FRAUD_PROB).isNull())

In [None]:
train_merchants, test_merchants = train_test_merchants.randomSplit([0.9, 0.1], seed=42)

In [None]:
train_merchants

In [None]:
# # train and test using logistic regression model
# lr = LogisticRegression(featuresCol=FEATURES, labelCol=MERCHANT_FRAUD_PROB)
# lr_model = lr.fit(train_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
# predictions = lr.transform(test_merchants(FEATURES, MERCHANT_FRAUD_PROB))
# predictions

In [None]:
# train and test using decision tree model
dt = DecisionTreeClassifier()
dt = DecisionTreeRegressor(featuresCol=FEATURES, labelCol=MERCHANT_FRAUD_PROB)
dt_model = dt.fit(train_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
dt_predictions = dt_model.transform(test_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
dt_predictions

In [None]:
# train and test using random forest
rf = RandomForestRegressor(featuresCol=FEATURES, labelCol=MERCHANT_FRAUD_PROB)
rf_model = rf.fit(train_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
predictions = rf_model.transform(test_merchants.select(FEATURES, MERCHANT_FRAUD_PROB))
predictions

In [None]:
# print(math.exp(3.3171763572147634))
# print(math.exp(3.3656943036441676))

In [None]:
dt_model.featureImportances

In [None]:
diff = predictions.withColumn("difference", col("prediction") - col(MERCHANT_FRAUD_PROB))
diff = diff.withColumn("")

In [None]:
# diff = dt_predictions.withColumn("difference", col("prediction") - col(MERCHANT_FRAUD_PROB))
# diff