In [1]:
import numpy as np
from sklearn.linear_model import SGDRegressor
from sklearn.model_selection import train_test_split
import pandas as pd
from pyspark.sql import SparkSession
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import math


In [2]:
data_directory = "../data/curated/"
merchants = pd.read_parquet(data_directory + "merchants.parquet")
transactions = pd.read_parquet(data_directory + "transactions.parquet")
consumers = pd.read_parquet(data_directory + "consumers.parquet")
census = pd.read_csv(data_directory + "census.csv")

# spark = (SparkSession.builder.appName("MAST30034 Ass2 Preprocess")
#     .config("spark.sql.parquet.cacheMetadata", "true")
#     .config("spark.sql.session.timeZone", "Etc/UTC")
#     .config('spark.executor.memory', '8g')
#     .config("spark.sql.execution.arrow.pyspark.enabled", "true")
#     .config("spark.sql.repl.eagerEval.enabled", True) 
#     .getOrCreate()
# )

# pd.set_option('compute.ops_on_diff_frames', True)

In [3]:
transactions.head()

Unnamed: 0_level_0,user_id,merchant_abn,dollar_value,order_id,order_datetime
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,1,28000487688,133.226894,0c37b3f7-c7f1-48cb-bcc7-0a58e76608ea,2021-02-28
1,18485,62191208634,79.1314,9e18b913-0465-4fd4-92fd-66d15e65d93c,2021-02-28
2,1,83690644458,30.441348,40a2ff69-ea34-4657-8429-df7ca957d6a1,2021-02-28
3,18488,39649557865,962.813341,f4c1a5ae-5b76-40d0-ae0f-cb9730ac325a,2021-02-28
4,2,80779820715,48.123977,cd09bdd6-f56d-489f-81ea-440f4bda933c,2021-02-28


In [4]:
user_counts = transactions.merge(merchants).groupby(["merchant_abn", "user_id"]).agg(count=("order_id", "count")).reset_index()

In [5]:
user_counts

Unnamed: 0,merchant_abn,user_id,count
0,10023283211,8,2
1,10023283211,15,1
2,10023283211,19,1
3,10023283211,22,1
4,10023283211,31,1
...,...,...,...
7955340,99990536339,21308,1
7955341,99990536339,21352,1
7955342,99990536339,23299,1
7955343,99990536339,23730,1


In [6]:
#Total number of customers returning
len(user_counts[user_counts["count"] > 1])/len(user_counts)

0.26885647322649114

Here is an indicator of simple customer retention

In [7]:
merchant_counts = user_counts.groupby("merchant_abn").agg(total_transactions=("count", "sum"))
merchant_counts["repeat_transactions"] = user_counts.groupby("merchant_abn").apply(lambda x: x[x["count"] > 1]["count"].sum())
merchant_counts["repeat_percent"] = merchant_counts["repeat_transactions"]/merchant_counts["total_transactions"]
retention = merchant_counts.merge(merchants, left_index=True, right_on="merchant_abn").sort_values(["revenue_band", "repeat_percent", "take_rate"], ascending=[True,False,True]).drop("tags", axis=1)
retention

Unnamed: 0,total_transactions,repeat_transactions,repeat_percent,name,merchant_abn,sector_tags,revenue_band,take_rate
3443,263923,263918,0.999981,Leo In Consulting,86578477987,"watch, clock, and jewelry repair shops",a,6.43
1827,247526,247520,0.999976,Non Vestibulum Industries,49891706470,tent and awning shops,a,5.80
1637,217201,217169,0.999853,Lacus Consulting,45629217853,"gift, card, novelty, and souvenir shops",a,6.98
3590,210531,210498,0.999843,Est Nunc Consulting,89726005175,tent and awning shops,a,6.01
2391,173045,172914,0.999243,Vehicula Pellentesque Corporation,63290521567,artist supply and craft shops,a,6.48
...,...,...,...,...,...,...,...,...
1677,17,0,0.000000,Libero Nec Ligula LLP,46391946761,"stationery, office supplies and printing and w...",e,0.37
2331,100,0,0.000000,Ante Ipsum Ltd,61968317984,motor vehicle supplies and new parts,e,0.40
1773,87,0,0.000000,Aliquam Nec Enim LLP,48666632255,"florists supplies, nursery stock, and flowers",e,0.42
2829,37,0,0.000000,Mattis Velit Justo Company,72780061668,artist supply and craft shops,e,0.45


In [8]:
transactions.head()

Unnamed: 0_level_0,user_id,merchant_abn,dollar_value,order_id,order_datetime
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,1,28000487688,133.226894,0c37b3f7-c7f1-48cb-bcc7-0a58e76608ea,2021-02-28
1,18485,62191208634,79.1314,9e18b913-0465-4fd4-92fd-66d15e65d93c,2021-02-28
2,1,83690644458,30.441348,40a2ff69-ea34-4657-8429-df7ca957d6a1,2021-02-28
3,18488,39649557865,962.813341,f4c1a5ae-5b76-40d0-ae0f-cb9730ac325a,2021-02-28
4,2,80779820715,48.123977,cd09bdd6-f56d-489f-81ea-440f4bda933c,2021-02-28


Momemtum calculations are SMA(n) - SMA(n-1)

In [9]:

transactions["month"] = pd.DatetimeIndex(transactions['order_datetime']).year * 12 + pd.DatetimeIndex(transactions['order_datetime']).month
transaction_momentum = transactions.groupby(["merchant_abn", "month"]).agg(monthly_revenue=("dollar_value", sum)).reset_index()
transactions_groupby = transaction_momentum.groupby("merchant_abn")


In [10]:
sma_periods = [3, 5, 9]
col_names = [f"{sma_period}-month-sma" for sma_period in sma_periods]
diff_col_names = [f"{sma_period}-month-sma-diff" for sma_period in sma_periods]




for i, sma_period in enumerate(sma_periods):
    transaction_momentum[col_names[i]] = transactions_groupby.rolling(window=sma_period, on="month").mean().reset_index(drop=True).fillna(0)["monthly_revenue"]

transaction_momentum[diff_col_names] = transaction_momentum.sort_values(["merchant_abn", "month"]).groupby("merchant_abn").diff().fillna(0)[col_names]

transaction_momentum[transaction_momentum["merchant_abn"] == 64403598239]

Unnamed: 0,merchant_abn,month,monthly_revenue,3-month-sma,5-month-sma,9-month-sma,3-month-sma-diff,5-month-sma-diff,9-month-sma-diff
46487,64403598239,24254,11042.422345,0.0,0.0,0.0,0.0,0.0,0.0
46488,64403598239,24255,339465.404055,0.0,0.0,0.0,0.0,0.0,0.0
46489,64403598239,24256,355747.939681,235418.588694,0.0,0.0,235418.588694,0.0,0.0
46490,64403598239,24257,391604.829288,362272.724341,0.0,0.0,126854.135648,0.0,0.0
46491,64403598239,24258,394151.985224,380501.584731,298402.516119,0.0,18228.86039,298402.516119,0.0
46492,64403598239,24259,405513.537081,397090.117198,377296.739066,0.0,16588.532467,78894.222947,0.0
46493,64403598239,24260,437598.794039,412421.438781,396923.417063,0.0,15331.321584,19626.677997,0.0
46494,64403598239,24261,422908.985351,422007.10549,410355.626197,0.0,9585.666709,13432.209134,0.0
46495,64403598239,24262,470540.392246,443682.723879,426142.738788,358730.47659,21675.618388,15787.112592,358730.47659
46496,64403598239,24263,612391.118737,501946.832112,469790.565491,425546.998411,58264.108233,43647.826703,66816.521821


In [11]:
revenue_and_momentum = transaction_momentum.groupby("merchant_abn").last().reset_index()

Next up, a transaction volume/total revenue model. We assume that the distribution of transaction amount remains relatively constant over time for each merchant for this model.

In [12]:

counts = transactions.groupby(["merchant_abn", "order_datetime"]).agg(count=("dollar_value", "count")).reset_index().groupby("merchant_abn").agg(volume_mean=("count", "mean"), volume_stdev=("count", "std"))
counts = pd.concat([counts, transactions.groupby(["merchant_abn"]).agg(amount_mean=("dollar_value", "mean"), amount_stdev=("dollar_value", "std"))], axis=1).reset_index()
counts

Unnamed: 0,merchant_abn,volume_mean,volume_stdev,amount_mean,amount_stdev
0,10023283211,5.288079,2.508980,205.910623,120.210061
1,10142254217,4.850671,2.428026,33.204582,28.081540
2,10165489824,1.000000,0.000000,11236.094771,7014.839896
3,10187291046,1.342975,0.671077,108.935240,67.429536
4,10192359162,1.351648,0.642381,414.075375,270.169091
...,...,...,...,...,...
4021,99938978285,27.391089,8.079186,25.274287,21.254829
4022,99974311662,1.125000,0.382618,272.166216,179.558785
4023,99976658299,37.229373,10.061556,142.557537,74.952056
4024,99987905597,1.221477,0.504649,325.909933,151.785759


Here's what we've got to rank people now. The coefficients were somewhat arbitrarily decided such that they had a sufficient effect on the ranking, they should be tuned. We need to introduce some more features as well.

In [13]:
ranking = retention.merge(counts).merge(revenue_and_momentum).fillna(0)
ranking["score"] = ranking["9-month-sma"].multiply(ranking["take_rate"].astype(float))/100 + 0.5 * (ranking["9-month-sma-diff"] + ranking["5-month-sma-diff"] + ranking["3-month-sma-diff"]) + 50 * (ranking["volume_mean"] - ranking["volume_stdev"]) + ranking["repeat_percent"] * 10000
ranking.sort_values("score", ascending=False)

Unnamed: 0,total_transactions,repeat_transactions,repeat_percent,name,merchant_abn,sector_tags,revenue_band,take_rate,volume_mean,volume_stdev,...,amount_stdev,month,monthly_revenue,3-month-sma,5-month-sma,9-month-sma,3-month-sma-diff,5-month-sma-diff,9-month-sma-diff,score
3,210531,210498,0.999843,Est Nunc Consulting,89726005175,tent and awning shops,a,6.01,347.410891,76.271467,...,20.611374,24274,439892.053653,467700.104590,460120.264195,425714.419969,-6526.344029,-645.259662,13551.040188,52330.558629
0,263923,263918,0.999981,Leo In Consulting,86578477987,"watch, clock, and jewelry repair shops",a,6.43,435.516502,94.246848,...,20.378793,24274,438301.521566,478310.148950,475046.843647,440216.770192,-13855.996691,-4136.720865,11471.184061,52108.464819
6,113541,112518,0.990990,Lobortis Ultrices Company,64403598239,"music shops - musical instruments, pianos, and...",a,6.31,187.361386,41.652991,...,41.592244,24274,467609.334319,499344.096173,490517.461192,453936.232619,-7471.985241,-350.879275,13457.881764,48656.205045
1,247526,247520,0.999976,Non Vestibulum Industries,49891706470,tent and awning shops,a,5.80,408.458746,89.701327,...,15.014905,24274,379946.375635,402341.846799,399610.869597,369534.849433,-8903.081737,-2067.526396,11877.525341,47824.108421
5,116938,116059,0.992483,Mauris Non Institute,21439773999,"cable, satellite, and other pay television and...",a,6.10,192.966997,43.475030,...,45.536326,24274,449627.223633,474262.158965,471611.447193,436286.846671,-8806.035751,-13.410568,13809.394678,46507.902115
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
510,1308,80,0.061162,At Pretium Corp.,12771097467,motor vehicle supplies and new parts,a,6.95,2.440299,1.302829,...,1965.346769,24274,184693.565256,240380.759064,256964.822735,241331.202224,-42651.288360,-15281.529090,1993.775308,-10528.508251
2920,44,0,0.000000,Lacinia Orci Incorporated,33364563448,"antique shops - sales, repairs, and restoratio...",b,4.99,1.100000,0.303822,...,6768.578146,24274,10484.900706,20089.267390,24389.044549,26885.234404,-10445.219717,-9731.509270,-4989.829123,-11201.896948
1312,81,0,0.000000,Phasellus Nulla LLC,44345785419,"jewelry, watch, clock, and silverware shops",a,6.07,1.051948,0.223377,...,5362.687648,24274,24954.182976,18947.575730,30502.086645,45552.483971,-15055.386208,-11048.248032,-2617.976356,-11554.340990
2650,82,0,0.000000,Diam Nunc Associates,58495294020,"jewelry, watch, clock, and silverware shops",b,3.98,1.078947,0.271448,...,7518.782873,24274,8792.961322,57866.862714,61289.888572,57020.832194,-21902.284881,-9555.146585,-1781.539323,-14309.681323


In [14]:
ranking.dtypes

total_transactions       int64
repeat_transactions      int64
repeat_percent         float64
name                    object
merchant_abn             int64
sector_tags             object
revenue_band            object
take_rate               object
volume_mean            float64
volume_stdev           float64
amount_mean            float64
amount_stdev           float64
month                    int64
monthly_revenue        float64
3-month-sma            float64
5-month-sma            float64
9-month-sma            float64
3-month-sma-diff       float64
5-month-sma-diff       float64
9-month-sma-diff       float64
score                  float64
dtype: object

In [15]:
transactions["month"] = pd.DatetimeIndex(transactions['order_datetime']).year * 12 + pd.DatetimeIndex(transactions['order_datetime']).month/3

transactions["quarter"] = transactions["month"].apply(math.ceil)


df = transactions.merge(consumers, on="user_id").drop(["month", "consumer_id", "user_id"], axis=1).groupby(["postcode", "quarter", "merchant_abn"]).sum().reset_index()

In [16]:
transactions.head()

Unnamed: 0_level_0,user_id,merchant_abn,dollar_value,order_id,order_datetime,month,quarter
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,1,28000487688,133.226894,0c37b3f7-c7f1-48cb-bcc7-0a58e76608ea,2021-02-28,24252.666667,24253
1,18485,62191208634,79.1314,9e18b913-0465-4fd4-92fd-66d15e65d93c,2021-02-28,24252.666667,24253
2,1,83690644458,30.441348,40a2ff69-ea34-4657-8429-df7ca957d6a1,2021-02-28,24252.666667,24253
3,18488,39649557865,962.813341,f4c1a5ae-5b76-40d0-ae0f-cb9730ac325a,2021-02-28,24252.666667,24253
4,2,80779820715,48.123977,cd09bdd6-f56d-489f-81ea-440f4bda933c,2021-02-28,24252.666667,24253


In [17]:
df.count()


postcode        7686159
quarter         7686159
merchant_abn    7686159
dollar_value    7686159
dtype: int64

In [18]:
# census_consumers["yr 12 completion"] = census_consumers[["gender", "comp_Yr_12_eq_percent_M", "comp_Yr_12_eq_percent_F"]].apply(lambda x: x["comp_Yr_12_eq_percent_" + ("M", "F")[["gender"] == "Female"]], axis=1)

In [19]:
len(df)

7686159

My sad attempts at trying to fix the model below

In [20]:




# def train(df, census_consumers, merchants):
#     ENCODED_COLS = ["gender", "sector_tags", "revenue_band"]
#     SCALED_COLS = ["yr 12 completion", "house_repay_to_income", "Median_age_persons", "Median_tot_prsnl_inc_weekly", "Median_mortgage_repay_monthly", "take_rate"]
#     model = SGDRegressor()
#     encoder = OneHotEncoder()
#     scaler = StandardScaler()
#     i = 0
#     df = df.sample(frac=1)
#     interval = int(1e5)
#     test_size = int(1e5)
#     while i < len(df):
#         print(f"Training Sample: {i}-{min(i+interval, len(df)-test_size)}")
#         df_merged = df[i:min(i+interval, len(df)-test_size)].merge(census_consumers, on=["postcode"]).merge(merchants, on=["merchant_abn"]).replace([np.inf, -np.inf], np.nan).dropna()
#         i += interval
#         X, y = get_features_and_labels(df_merged, encoder, scaler, ENCODED_COLS, SCALED_COLS)
#         model.partial_fit(X, y)
#     test_df = df[-test_size:].merge(census_consumers, on=["postcode"]).merge(merchants, on=["merchant_abn"]).replace([np.inf, -np.inf], np.nan).dropna()
#     print(f"Test Score: {model.score(*get_features_and_labels(test_df, encoder, scaler, ENCODED_COLS, SCALED_COLS, train=False))}")

# def get_features_and_labels(df, encoder, scaler, ENCODED_COLS, SCALED_COLS, train=True):
#         X = df[["gender", "yr 12 completion", "house_repay_to_income", "Median_age_persons", "Median_tot_prsnl_inc_weekly", "Median_mortgage_repay_monthly", "sector_tags", "revenue_band", "take_rate"]]
#         X["take_rate"] = pd.to_numeric(X["take_rate"])
#         X["house_repay_to_income"] = pd.to_numeric(X["house_repay_to_income"])
#         scaled_columns =  scaler.fit_transform(X[SCALED_COLS]) if train else scaler.transform(X[SCALED_COLS])
#         encoded_columns =  encoder.fit_transform(X[ENCODED_COLS]) if train else encoder.transform(X[ENCODED_COLS])
#         X = np.concatenate([scaled_columns, encoded_columns], axis=1)
#         y = df["dollar_value"]
#         return X, y





In [21]:
# train(df, census_consumers, merchants)

In [22]:
df_merged.count()

NameError: name 'df_merged' is not defined

22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:02:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 0

In [None]:
# len(X)

22/10/06 02:03:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:03:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 0

819047

22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:04:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 0

In [None]:
# X = X.to_pandas()

22/10/06 02:05:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:05:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:05:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:05:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 02:05:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/10/06 02:05:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradat

In [None]:
# y = y.to_pandas()

NameError: name 'y' is not defined

In [None]:
# type(X)

pyspark.pandas.frame.DataFrame

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 01:53:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/10/06 0

KeyboardInterrupt: 

In [None]:
# model = ElasticNet()
# model.fit(X_train, y_train)

In [None]:
# model.score(X_test, y_test)

0.010071754222644724