# Gradient Boosted Trees 

In [1]:
# Initialise a spark session
import pandas as pd
from collections import Counter
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor



spark = (
    SparkSession.builder.appName("GBT Model")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "16g")  # Increase driver memory
    .config("spark.executor.memory", "16g")  # Increase executor memory
    .config("spark.executor.instances", "4")  # Increase the number of executor instances
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()
)

24/09/20 14:36:18 WARN Utils: Your hostname, Melissas-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.3 instead (on interface en0)
24/09/20 14:36:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/20 14:36:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read transaction file
transactions = spark.read.parquet('../data/curated/flagged_fraud')
transactions = transactions.filter(F.col("is_fraud") != True) # Exclude transactions marked as fraud

In [3]:
# Aggregating monthly revenue for each merchant
monthly_revenue_df = transactions.groupBy('merchant_abn', 'order_month_year').agg(
    F.sum('dollar_value').alias('monthly_revenue'),
    F.count('order_id').alias('transaction_count'),
    F.avg('fraud_probability_merchant').alias('avg_fraud_probability_merchant'),
    F.first('tags').alias('merchant_tags')  # Assuming tags are constant per merchant
)
    
# Aggregating consumer-level features (most common state and gender for each merchant)

# Most common consumer state per merchant
consumer_state_mode = transactions.groupBy('merchant_abn', 'state_consumer').count() \
    .withColumn('row_num', F.row_number().over(Window.partitionBy('merchant_abn').orderBy(F.desc('count')))) \
    .filter(F.col('row_num') == 1) \
    .select('merchant_abn', 'state_consumer')

# Most common consumer gender per merchant
consumer_gender_mode = transactions.groupBy('merchant_abn', 'gender_consumer').count() \
    .withColumn('row_num', F.row_number().over(Window.partitionBy('merchant_abn').orderBy(F.desc('count')))) \
    .filter(F.col('row_num') == 1) \
    .select('merchant_abn', 'gender_consumer')

# Average Unemployment Rate per Merchant Month-Year
transactions = transactions.withColumn("unemployment_rate_numeric", F.col("unemployment_rate").cast("float"))

unemployment_agg = transactions.groupBy('merchant_abn', 'order_month_year').agg(
    F.avg('unemployment_rate_numeric').alias('avg_unemployment_rate')
)

In [4]:
# Joining Datasets
monthly_revenue_df = monthly_revenue_df.join(consumer_state_mode, on='merchant_abn', how='left') \
                                      .join(consumer_gender_mode, on='merchant_abn', how='left')

# Join with unemployment data on both 'merchant_abn' and 'order_month_year'
monthly_revenue_df = monthly_revenue_df.join(unemployment_agg, on=['merchant_abn', 'order_month_year'], how='left')

# Show the final dataframe
monthly_revenue_df.show(5)

                                                                                

+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+
|merchant_abn|order_month_year|   monthly_revenue|transaction_count|avg_fraud_probability_merchant|       merchant_tags|state_consumer|gender_consumer|avg_unemployment_rate|
+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+
| 10023283211|          Mar-21| 9076.307821688919|               40|             56.40749878739966|((furniture, home...|           NSW|           Male|     78.1724992275238|
| 10142254217|          Nov-21| 13097.45235307313|              315|             55.47863229844303|([cable, satellit...|           NSW|           Male|    78.20253993745834|
| 10187291046|          Jul-21| 906.4298127305271|                8|            56.333947346189554|([wAtch, clock, a...|          

In [5]:
# Creating lag features to include previous month's revenue
window_spec = Window.partitionBy('merchant_abn').orderBy('order_month_year')

# Lagging features: Previous month's revenue
monthly_revenue_df = monthly_revenue_df.withColumn(
    'previous_month_revenue', F.lag('monthly_revenue', 1).over(window_spec)
)

# Calculate revenue growth (percentage change)
monthly_revenue_df = monthly_revenue_df.withColumn(
    'revenue_growth',
    F.when(F.col('previous_month_revenue') > 0, 
           (F.col('monthly_revenue') - F.col('previous_month_revenue')) / F.col('previous_month_revenue'))
    .otherwise(F.lit(0))  # Fill with 0 if there is no previous revenue
)

# Fill NA values for first month with 0 (no previous data available)
monthly_revenue_df = monthly_revenue_df.fillna({'previous_month_revenue': 0, 'revenue_growth': 0})


monthly_revenue_df = monthly_revenue_df.fillna(0)  # Filling NA values for first month
monthly_revenue_df.show(5)

24/09/20 14:36:31 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+----------------------+--------------------+
|merchant_abn|order_month_year|   monthly_revenue|transaction_count|avg_fraud_probability_merchant|       merchant_tags|state_consumer|gender_consumer|avg_unemployment_rate|previous_month_revenue|      revenue_growth|
+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+----------------------+--------------------+
| 10023283211|          Apr-21|   9221.4058068711|               47|             56.03849374950703|((furniture, home...|           NSW|           Male|    74.54042625427246|                   0.0|                 0.0|
| 10023283211|          Aug-21|15807.479921460477|               86|              56.2429773174978|((furniture, home...|        

In [6]:
# StringIndexing categorical columns (merchant_tags, consumer_state, gender_consumer)
indexers = [
    StringIndexer(inputCol='merchant_tags', outputCol='merchant_tags_indexed', handleInvalid='keep'),
    StringIndexer(inputCol='state_consumer', outputCol='state_consumer_indexed', handleInvalid='keep'),
    StringIndexer(inputCol='gender_consumer', outputCol='gender_consumer_indexed', handleInvalid='keep')
]

# OneHotEncoding indexed columns
encoders = [
    OneHotEncoder(inputCol='merchant_tags_indexed', outputCol='merchant_tags_encoded'),
    OneHotEncoder(inputCol='state_consumer_indexed', outputCol='state_consumer_encoded'),
    OneHotEncoder(inputCol='gender_consumer_indexed', outputCol='gender_consumer_encoded')
]

# VectorAssembler to combine numeric features into a single feature vector
assembler = VectorAssembler(
    inputCols=[
        'monthly_revenue', 'transaction_count', 'avg_fraud_probability_merchant', 'avg_unemployment_rate',
        'merchant_tags_encoded', 'state_consumer_encoded', 'gender_consumer_encoded', 'revenue_growth'
    ], 
    outputCol='features'
)

# Standardizing the numeric features
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Fit the pipeline to the dataset
model_pipeline = pipeline.fit(monthly_revenue_df)

final_df = model_pipeline.transform(monthly_revenue_df)

final_df.select('merchant_abn', 'order_month_year', 'scaled_features').show(5)

24/09/20 14:36:44 WARN DAGScheduler: Broadcasting large task binary with size 1443.5 KiB
24/09/20 14:36:44 WARN DAGScheduler: Broadcasting large task binary with size 1433.0 KiB
                                                                                

+------------+----------------+--------------------+
|merchant_abn|order_month_year|     scaled_features|
+------------+----------------+--------------------+
| 10023283211|          Apr-21|(6698,[0,1,2,3,13...|
| 10023283211|          Aug-21|(6698,[0,1,2,3,13...|
| 10023283211|          Dec-21|(6698,[0,1,2,3,13...|
| 10023283211|          Feb-22|(6698,[0,1,2,3,13...|
| 10023283211|          Jan-22|(6698,[0,1,2,3,13...|
+------------+----------------+--------------------+
only showing top 5 rows



In [7]:
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)

In [8]:
# Define the GBT Regressor
gbt = GBTRegressor(featuresCol='scaled_features', labelCol='monthly_revenue')

# Fit the model on the training data
gbt_model = gbt.fit(train_data)

# Make predictions on the test data
gbt_predictions = gbt_model.transform(test_data)

24/09/20 14:36:54 WARN DAGScheduler: Broadcasting large task binary with size 1871.9 KiB
24/09/20 14:36:54 WARN DAGScheduler: Broadcasting large task binary with size 1872.0 KiB
24/09/20 14:36:54 WARN DAGScheduler: Broadcasting large task binary with size 1942.9 KiB
24/09/20 14:36:57 WARN DAGScheduler: Broadcasting large task binary with size 2014.7 KiB
24/09/20 14:36:59 WARN DAGScheduler: Broadcasting large task binary with size 2015.4 KiB
24/09/20 14:36:59 WARN DAGScheduler: Broadcasting large task binary with size 2016.2 KiB
24/09/20 14:36:59 WARN DAGScheduler: Broadcasting large task binary with size 2017.1 KiB
24/09/20 14:37:00 WARN DAGScheduler: Broadcasting large task binary with size 2019.4 KiB
24/09/20 14:37:00 WARN DAGScheduler: Broadcasting large task binary with size 2026.3 KiB
24/09/20 14:37:01 WARN DAGScheduler: Broadcasting large task binary with size 2026.8 KiB
24/09/20 14:37:01 WARN DAGScheduler: Broadcasting large task binary with size 2027.5 KiB
24/09/20 14:37:01 WAR

In [9]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol='monthly_revenue', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(gbt_predictions)
print(f"RMSE (GBT): {rmse}")

                                                                                

RMSE (GBT): 40026.50757559944


24/09/20 14:37:37 WARN DAGScheduler: Broadcasting large task binary with size 1855.4 KiB
24/09/20 14:37:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/09/20 14:37:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [10]:
r2_evaluator = RegressionEvaluator(labelCol='monthly_revenue', predictionCol='prediction', metricName='r2')
r2 = r2_evaluator.evaluate(gbt_predictions)
print(f"R-squared: {r2}")

                                                                                

R-squared: 0.8374976778962497


24/09/20 14:38:08 WARN DAGScheduler: Broadcasting large task binary with size 1855.4 KiB


In [11]:
from pyspark.sql.types import DateType
from dateutil.relativedelta import relativedelta
from datetime import datetime

# Step 1: Parse the order_month_year column to a proper date format
monthly_revenue_df = monthly_revenue_df.withColumn(
    'order_month_year_date', F.to_date(F.concat(F.lit('01-'), F.col('order_month_year')), 'dd-MMM-yy')
)

# Get the most recent month per merchant
window_spec = Window.partitionBy('merchant_abn').orderBy(F.desc('order_month_year_date'))
latest_merchant_data = monthly_revenue_df.withColumn('row_num', F.row_number().over(window_spec)) \
                                         .filter(F.col('row_num') == 1) \
                                         .drop('row_num')

In [12]:
next_month = 'Aug-24'
future_month_df = spark.createDataFrame([(next_month,)], ['future_order_month_year'])
future_data = latest_merchant_data.crossJoin(future_month_df)

In [13]:
future_data.show(5)

                                                                                

+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+----------------------+--------------------+---------------------+-----------------------+
|merchant_abn|order_month_year|   monthly_revenue|transaction_count|avg_fraud_probability_merchant|       merchant_tags|state_consumer|gender_consumer|avg_unemployment_rate|previous_month_revenue|      revenue_growth|order_month_year_date|future_order_month_year|
+------------+----------------+------------------+-----------------+------------------------------+--------------------+--------------+---------------+---------------------+----------------------+--------------------+---------------------+-----------------------+
| 10023283211|          Feb-22|48572.882608193504|              215|            56.069422789172336|((furniture, home...|           NSW|           Male|    71.28418544059576|     66067.74432316715|-0.264801862

In [14]:
future_data = model_pipeline.transform(future_data)
future_data = gbt_model.transform(future_data)
future_predictions = future_data.select('merchant_abn', 'future_order_month_year', 'prediction')
future_predictions = future_predictions.withColumnRenamed('prediction', 'projected_revenue')
future_predictions.show(5)

                                                                                

+------------+-----------------------+------------------+
|merchant_abn|future_order_month_year| projected_revenue|
+------------+-----------------------+------------------+
| 10023283211|                 Aug-24| 56777.08567987866|
| 10142254217|                 Aug-24| 7437.755257678557|
| 10187291046|                 Aug-24|3563.1115237939453|
| 10192359162|                 Aug-24|11864.291463825513|
| 10206519221|                 Aug-24| 7437.755257678557|
+------------+-----------------------+------------------+
only showing top 5 rows



In [16]:
top_10_predictions = future_predictions.orderBy(F.col('projected_revenue').desc())

# Show the top 10 merchants by predicted revenue
top_10_predictions.select('merchant_abn', 'projected_revenue').show(10)

                                                                                

+------------+-----------------+
|merchant_abn|projected_revenue|
+------------+-----------------+
| 76626119831|995457.1811756655|
| 77590625261|797461.5549158412|
| 80518954462|797461.5549158412|
| 79417999332|721028.5279519026|
| 21439773999|718107.9921709804|
| 86578477987|657879.2032864047|
| 64403598239|596049.7554696605|
| 38090089066|574001.2293418067|
| 32361057556|555841.3133969607|
| 43186523025|541156.3559394979|
+------------+-----------------+
only showing top 10 rows

