In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import date_format
from pyspark.sql.functions import year, month
import pandas as pd
import lbl2vec

In [2]:
# Create a spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "6g")
    .config("spark.executor.memory", "10g")
    .getOrCreate()
)

22/10/06 13:40:22 WARN Utils: Your hostname, MacBook-Air-3.local resolves to a loopback address: 127.0.0.1; using 192.168.0.66 instead (on interface en0)
22/10/06 13:40:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/06 13:40:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/06 13:40:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/06 13:40:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# Read in data from ETL.py file
%run '../scripts/ETL.py' '../scripts/paths.json'



22/10/06 13:40:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

In [4]:
tagged_merchants_sdf = spark.read.parquet("../data/curated/tagged_merchants.parquet")

In [5]:
tagged_merchants_sdf = tagged_merchants_sdf.withColumnRenamed('merchant_abn',

    'tagged_merchant_abn'
)

In [6]:
final_join3.createOrReplaceTempView("join")
tagged_merchants_sdf.createOrReplaceTempView("tagged")

joint = spark.sql(""" 

SELECT *
FROM join
INNER JOIN tagged
ON join.merchant_abn = tagged.tagged_merchant_abn
""")

joint = joint.drop('tagged_merchant_abn')

22/10/06 13:41:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
joint.createOrReplaceTempView("group")

main_data = spark.sql(""" 

SELECT *, ((take_rate/100)*dollar_value) AS percent
FROM group
""")

In [8]:
# Extracting the year, month, day from the timestamp


main_data = main_data.withColumn('Year', year(main_data.order_datetime))
main_data = main_data.withColumn('Month',month(main_data.order_datetime))


In [9]:
main_data = main_data.drop('merchant_abn', 'categories','name', 'address', 'trans_merchant_abn', 'order_id','order_datetime','user_id',
'consumer_id','int_sa2','SA2_name','state_code','state_name','population_2020', 'population_2021')

In [10]:
 # Find Count of Null, None, NaN of All DataFrame Columns
main_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in main_data.columns])

                                                                                

merchant_name,take_rate,revenue_levels,state,gender,dollar_value,postcodes,SA2_code,income_2018-2019,total_males,total_females,total_persons,category,percent,Year,Month
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [11]:
main_data.createOrReplaceTempView("agg")

male = spark.sql(""" 

SELECT CONCAT(merchant_name, SA2_code, Year, Month) AS m_name, COUNT(gender) as males
FROM agg
WHERE gender = 'Male'
GROUP BY merchant_name, SA2_code, Year, Month
""")


female = spark.sql(""" 

SELECT CONCAT(merchant_name, SA2_code, Year, Month) AS f_name, COUNT(gender) as females
FROM agg
WHERE gender = 'Female'
GROUP BY merchant_name, SA2_code, Year, Month
""")


In [12]:
main_data.createOrReplaceTempView("agg")

main_agg_data = spark.sql(""" 

SELECT merchant_name, COUNT(merchant_name) AS no_of_transactions, SA2_code, Year, Month, SUM(dollar_value - percent) AS total_earnings,
    CONCAT(merchant_name, SA2_code, Year, Month) AS join_col
FROM agg
GROUP BY merchant_name, SA2_code, Year, Month
""")



In [13]:
main_agg_data.createOrReplaceTempView("gender_join")
male.createOrReplaceTempView("male_agg")
female.createOrReplaceTempView("female_agg")

temp = spark.sql(""" 

SELECT *
FROM gender_join
INNER JOIN male_agg
ON gender_join.join_col = male_agg.m_name
""")

temp.createOrReplaceTempView("temp")

gender_agg = spark.sql(""" 

SELECT *
FROM temp
INNER JOIN female_agg
ON temp.join_col = female_agg.f_name
""")


In [14]:
main_data = main_data.withColumnRenamed('income_2018-2019',

    'income_2018_2019'    
)

main_data = main_data.withColumn('income_per_persons',
    (F.col('income_2018_2019')/F.col('total_persons'))
)


In [15]:
main_data.createOrReplaceTempView("features")

other_agg = spark.sql(""" 

SELECT merchant_name AS drop_name, FIRST(take_rate) AS take_rate, FIRST(revenue_levels) AS revenue_levels, FIRST(category) AS category,
    FIRST(total_males) AS males_in_SA2, FIRST(total_females) AS females_in_SA2, FIRST(income_per_persons) AS income_per_person
FROM features
GROUP BY merchant_name
""")


In [16]:
gender_agg.createOrReplaceTempView("edit")
other_agg.createOrReplaceTempView("rates")

other_cols = spark.sql(""" 

SELECT *
FROM edit
INNER JOIN rates
ON edit.merchant_name = rates.drop_name
""")

train = other_cols.drop('m_name', 'f_name', 'drop_name','join_col')

train.limit(5)

                                                                                

merchant_name,no_of_transactions,SA2_code,Year,Month,total_earnings,males,females,take_rate,revenue_levels,category,males_in_SA2,females_in_SA2,income_per_person
A Aliquet Ltd,2,312021351,2021,6,298.8537411028336,1,1,3.87,b,Furniture,3292,3206,28693.71558221812
A Aliquet Ltd,2,401021010,2021,4,670.4857203238805,1,1,3.87,b,Furniture,3292,3206,28693.71558221812
A Aliquet Ltd,2,603011065,2021,12,346.5799250465364,1,1,3.87,b,Furniture,3292,3206,28693.71558221812
A Arcu Industries,2,124011453,2021,8,203.5414742977921,1,1,3.0,c,Furniture,4821,4683,25816.03452631579
A Arcu Industries,2,211051282,2022,3,655.1195924003883,1,1,3.0,c,Furniture,4821,4683,25816.03452631579


In [17]:
train_projection = train.select("merchant_name", "SA2_code", "Year", "Month", 'total_earnings')

In [18]:
train_projection = train_projection.withColumn("prev_year", \
              when(train_projection["Month"] == 1, train_projection['Year'] - 1).otherwise(train_projection['Year']))
train_projection = train_projection.withColumn("prev_month", \
              when(train_projection["Month"] == 1, 12).otherwise(train_projection['Month'] - 1))
train_projection = train_projection.drop("Year", "Month")
train_projection = train_projection.withColumnRenamed("total_earnings", "future_earnings") \
                            .withColumnRenamed("merchant_name", "p_merchant_name") \
                            .withColumnRenamed("SA2_code", "p_SA2_code")

In [19]:
final_data= train.join(train_projection, (train.merchant_name == train_projection.p_merchant_name) & 
                           (train.SA2_code == train_projection.p_SA2_code) & 
                           (train.Year == train_projection.prev_year) & 
                           (train.Month == train_projection.prev_month), how = 'inner')

final_data = final_data.drop("p_merchant_name", "p_SA2_code","prev_year", "prev_month")

In [20]:
field_str = ['Year', 'Month', 'SA2_code']

for cols in field_str:
    final_data = final_data.withColumn(cols,

    F.col(cols).cast('STRING')

)


field_int = ['no_of_transactions', 'males', 'females', 'males_in_SA2', 'females_in_SA2']

for col in field_int:
    final_data = final_data.withColumn(col,

    F.col(col).cast('INT')

)

In [21]:
# String indexing the categorical columns

indexer = StringIndexer(inputCols = ['merchant_name', 'SA2_code', 'Year', 'Month', 'revenue_levels','category'],
outputCols = ['merchant_name_num', 'SA2_code_num', 'Year_num', 'Month_num', 'revenue_levels_num','category_num'], handleInvalid="keep")

indexd_data = indexer.fit(final_data).transform(final_data)


# Applying onehot encoding to the categorical data that is string indexed above
encoder = OneHotEncoder(inputCols = ['merchant_name_num', 'SA2_code_num', 'Year_num', 'Month_num', 'revenue_levels_num','category_num'],
outputCols = ['merchant_name_vec', 'SA2_code_vec', 'Year_vec', 'Month_vec', 'revenue_levels_vec','category_vec'])

onehotdata = encoder.fit(indexd_data).transform(indexd_data)


# Assembling the training data as a vector of features 
assembler1 = VectorAssembler(
inputCols=['merchant_name_vec', 'SA2_code_vec', 'Year_vec', 'Month_vec', 'revenue_levels_vec','category_vec','males_in_SA2','females_in_SA2', 'income_per_person', 'no_of_transactions','take_rate', 'total_earnings'],
outputCol= "features" )

outdata1 = assembler1.transform(onehotdata)

                                                                                

In [22]:
# Renaming the target column as label

outdata1 = outdata1.withColumnRenamed(
    "future_earnings",
    "label"
)

In [23]:
# Assembling the features as a feature vector 

featureIndexer =\
    VectorIndexer(inputCol="features", 
    outputCol="indexedFeatures").fit(outdata1)

outdata1 = featureIndexer.transform(outdata1)

                                                                                

In [24]:
# Split the data into training and validation sets (30% held out for testing)

trainingData, testData = outdata1.randomSplit([0.7, 0.3], seed = 20)

In [25]:
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")


# Train model.  
model = rf.fit(trainingData)

# Make predictions.
predictions_validation = model.transform(testData)



22/10/06 13:46:37 WARN DAGScheduler: Broadcasting large task binary with size 1337.5 KiB


                                                                                

22/10/06 13:46:38 WARN DAGScheduler: Broadcasting large task binary with size 1337.6 KiB


                                                                                

22/10/06 13:46:40 WARN DAGScheduler: Broadcasting large task binary with size 1341.6 KiB


                                                                                

22/10/06 13:46:42 WARN DAGScheduler: Broadcasting large task binary with size 1454.2 KiB


                                                                                

22/10/06 13:46:48 WARN DAGScheduler: Broadcasting large task binary with size 1515.9 KiB


                                                                                

22/10/06 13:46:51 WARN DAGScheduler: Broadcasting large task binary with size 1636.7 KiB


                                                                                

22/10/06 13:46:56 WARN DAGScheduler: Broadcasting large task binary with size 1876.4 KiB


                                                                                

22/10/06 13:46:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


                                                                                

In [26]:
# Evaluate the validation set 

predictions_validation.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error

evaluator_train_rmse = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse_train = evaluator_train_rmse.evaluate(predictions_validation)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse_train)

evaluator_train_mae = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mae")
mae_train = evaluator_train_mae.evaluate(predictions_validation)
print("Mean Absolutee Error (MAE) on train data = %g" % mae_train)



22/10/06 13:48:08 WARN DAGScheduler: Broadcasting large task binary with size 1349.7 KiB


                                                                                

+------------------+------------------+--------------------+
|        prediction|             label|            features|
+------------------+------------------+--------------------+
|342.99433179557315| 175.5058926208255|(2085,[209,1093,2...|
|342.99433179557315| 184.5112457341748|(2085,[209,988,20...|
|342.99433179557315| 273.6255392020826|(2085,[209,976,20...|
|342.99433179557315| 421.7745544721037|(2085,[209,989,20...|
|342.99433179557315|152.81812360693857|(2085,[209,1039,2...|
+------------------+------------------+--------------------+
only showing top 5 rows





22/10/06 13:49:25 WARN DAGScheduler: Broadcasting large task binary with size 1342.8 KiB


[Stage 882:>                                                        (0 + 8) / 9]

22/10/06 13:49:27 WARN DAGScheduler: Broadcasting large task binary with size 1343.9 KiB
Root Mean Squared Error (RMSE) on train data = 339.98




22/10/06 13:50:42 WARN DAGScheduler: Broadcasting large task binary with size 1342.8 KiB




22/10/06 13:50:45 WARN DAGScheduler: Broadcasting large task binary with size 1343.9 KiB
Mean Absolutee Error (MAE) on train data = 208.749


                                                                                

In [27]:
def ExtractFeatureImportance(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
  
#ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
dataset_fi = ExtractFeatureImportance(model.featureImportances, predictions_validation, "features")
dataset_fi = spark.createDataFrame(dataset_fi)
display(dataset_fi)

idx,name,score
2084,total_earnings,0.6636336838909377
2082,no_of_transactions,0.15877433581190895
2078,category_vec_Beau...,0.03629700211840285
2074,category_vec_Book...,0.032823799638621705
2076,category_vec_Furn...,0.011698635257700485
110,merchant_name_vec...,0.011309588712838327
2083,take_rate,0.009958495074833078
975,SA2_code_vec_4060...,0.006728057018102661
2079,males_in_SA2,0.006252293498002...
274,merchant_name_vec...,0.004555797073978013


## Future predictions

In [29]:
latest_year = train.select(max('Year')).collect()[0][0]
agg_month_1 = train.filter(train.Year == latest_year)
latest_month = agg_month_1.select(max('Month')).collect()[0][0]
predicting_data = agg_month_1.filter(train.Month == latest_month)
predicting_data = predicting_data.withColumn("future_earnings", lit(0))

                                                                                

In [30]:
# String indexing the categorical columns

indexer = StringIndexer(inputCols = ['merchant_name', 'SA2_code', 'Year', 'Month', 'revenue_levels','category'],
outputCols = ['merchant_name_num', 'SA2_code_num', 'Year_num', 'Month_num', 'revenue_levels_num','category_num'], handleInvalid="keep")

indexd_data = indexer.fit(predicting_data).transform(predicting_data)


# Applying onehot encoding to the categorical data that is string indexed above
encoder = OneHotEncoder(inputCols = ['merchant_name_num', 'SA2_code_num', 'Year_num', 'Month_num', 'revenue_levels_num','category_num'],
outputCols = ['merchant_name_vec', 'SA2_code_vec', 'Year_vec', 'Month_vec', 'revenue_levels_vec','category_vec'])

onehotdata = encoder.fit(indexd_data).transform(indexd_data)


# Assembling the training data as a vector of features 
assembler1 = VectorAssembler(
inputCols=['merchant_name_vec', 'SA2_code_vec', 'Year_vec', 'Month_vec', 'revenue_levels_vec','category_vec','males_in_SA2','females_in_SA2', 'income_per_person', 'no_of_transactions','take_rate', 'total_earnings'],
outputCol= "features" )

outdata1 = assembler1.transform(onehotdata)

# Renaming the target column as label

outdata1 = outdata1.withColumnRenamed(
    "future_earnings",
    "label"
)


# Assembling the features as a feature vector 

featureIndexer =\
    VectorIndexer(inputCol="features", 
    outputCol="indexedFeatures").fit(outdata1)

outdata1 = featureIndexer.transform(outdata1)

                                                                                

In [32]:
predictions_test = model.transform(outdata1)

predictions_test.createOrReplaceTempView("preds")

pred = spark.sql(""" 

SELECT merchant_name, SUM(prediction) AS total_revenue
FROM preds
GROUP BY merchant_name

""")

pred.limit(5)

                                                                                

merchant_name,total_revenue
Dictum Mi Incorpo...,637.2824011283511
Dictum Mi Limited,17206.62483046548
Donec Luctus Indu...,4460.976807898458
Elit Sed Consequa...,14657.495225952076
Hendrerit Consect...,2867.77080507758


                                                                                

In [34]:
pred_df = pred.toPandas()

pred_df.to_csv("../data/curated/revenue.csv")