## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

sales_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

sellers_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)

In [0]:
## JUST PREVIEWING FOR MY OWN CLARITY ##

display(products_table.limit(5))
display(sales_table.limit(5))
display(sellers_table.limit(5))

product_id,product_name,price
1,product_1,24
2,product_2,173
3,product_3,147
4,product_4,116
5,product_5,13


order_id,product_id,seller_id,num_of_items_sold
1,1841,35172,432
2,14496,6362,854
3,76927,23608,850
4,17982,21413,392
5,74388,23888,227


seller_id,seller_name,rating
1,seller_1,2
2,seller_2,5
3,seller_3,1
4,seller_4,5
5,seller_5,2


In [0]:
print("No. rows:", sales_table.count())
print("Columns:", sales_table.columns, "\n")
sales_table.printSchema()

No. rows: 300000
Columns: ['order_id', 'product_id', 'seller_id', 'num_of_items_sold'] 

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- num_of_items_sold: integer (nullable = true)



In [0]:
# Checking for null values... (there are none)
sales_table.select([count(when(col(c).isNotNull() , c)).alias(c) for c in sales_table.columns]).show()

+--------+----------+---------+-----------------+
|order_id|product_id|seller_id|num_of_items_sold|
+--------+----------+---------+-----------------+
|  300000|    300000|   300000|           300000|
+--------+----------+---------+-----------------+



In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]
# Your table should have 1 column(s): [product_name]
##########################################################################################################################

(sales_table
    .groupBy("product_id")
    .agg(sum("num_of_items_sold").alias("total_product_sales"))
    .join(products_table, "product_id")
    # NOTE: If we know we only want the top 3, we could optimize by filtering out the unnecessary entries BEFORE the join, 
    # however for general flexibility, I have deliberately done the join first, and selected the top 3 later
    .orderBy("total_product_sales", ascending=False)
    .select("product_name")
    .show(n=3, truncate=False))

+-------------+
|product_name |
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+
only showing top 3 rows



In [0]:
# (b) Find out the total sales of the products sold by sellers 1 to 10 and output the top most sold product [2m]
# Your table should have 1 column(s): [product_name]  
##########################################################################################################################

(sales_table
    .filter(col("seller_id").between(1,10))
    .groupBy("product_id")
    .agg(sum("num_of_items_sold").alias("total_product_sales"))
    .join(products_table, "product_id")
    .orderBy("total_product_sales", ascending=False)
    .select("product_name")
    .show(n=1, truncate=False))

+-------------+
|product_name |
+-------------+
|product_36658|
+-------------+
only showing top 1 row



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]
##########################################################################################################################

(sales_table
    .filter(col("seller_id").between(1,500))
    .groupBy("product_id")
    .agg(sum("num_of_items_sold").alias("total_product_sales"))
    .join(products_table, "product_id")
    .withColumn("product_revenue", col("total_product_sales") * col("price"))
    .agg(sum("product_revenue").alias("total_revenue"))
    .show(truncate=False))

+-------------+
|total_revenue|
+-------------+
|160916699    |
+-------------+



In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min
##########################################################################################################################

(sales_table
    .groupBy("seller_id")
    .agg(sum("num_of_items_sold").alias("seller_items_sold"))
    .join(sellers_table, "seller_id")
    .select("seller_id").where((col("rating") >= 4) & (col("seller_items_sold") >= 3000))
    .join(sales_table, "seller_id")
    .join(products_table, "product_id")
    .dropDuplicates(subset=["product_id"])
    .orderBy(["price", "product_id"], ascending=[False, True])
    .select("product_name")
    .show(10, truncate=False))

+------------+
|product_name|
+------------+
|product_106 |
|product_117 |
|product_363 |
|product_712 |
|product_843 |
|product_897 |
|product_923 |
|product_1466|
|product_1507|
|product_1514|
+------------+
only showing top 10 rows



## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

In [0]:
display(bank_train.limit(5))
display(bank_test.limit(5))

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,label
45,admin.,married,unknown,no,2033,no,no,cellular,28,may,48,4,-1,0,unknown,0
56,admin.,married,primary,no,202,yes,no,unknown,9,may,178,2,-1,0,unknown,0
50,housemaid,single,secondary,no,799,no,no,telephone,28,jan,63,1,-1,0,unknown,0
58,admin.,married,secondary,no,1464,yes,yes,unknown,5,jun,53,29,-1,0,unknown,0
43,management,single,tertiary,no,11891,no,no,cellular,4,dec,821,5,242,1,success,1


age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,label
45,management,married,tertiary,no,2220,yes,no,cellular,11,jul,128,2,-1,0,unknown,0
36,blue-collar,single,secondary,no,3623,no,no,unknown,12,nov,71,1,378,1,success,1
37,management,married,primary,no,1506,no,no,cellular,2,nov,101,3,80,3,success,0
65,admin.,married,secondary,no,952,no,no,cellular,6,sep,255,1,96,1,success,1
37,management,married,tertiary,no,40,no,no,cellular,27,aug,1033,4,-1,0,unknown,1


In [0]:
print("No. rows (train):", bank_train.count())
print("No. rows (test):", bank_test.count())
print("Columns:", bank_train.columns, "\n")
bank_train.printSchema()

No. rows (train): 8929
No. rows (test): 2233
Columns: ['age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'day', 'month', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'label'] 

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
bank_train.select([count(when(col(c).isNotNull() , c)).alias(c) for c in bank_train.columns]).show()

bank_test.select([count(when(col(c).isNotNull() , c)).alias(c) for c in bank_test.columns]).show()

## NO MISSING VALUES - Nice :^) 

+----+----+-------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-----+
| age| job|marital|education|default|balance|housing|loan|contact| day|month|duration|campaign|pdays|previous|poutcome|label|
+----+----+-------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-----+
|8929|8929|   8929|     8929|   8929|   8929|   8929|8929|   8929|8929| 8929|    8929|    8929| 8929|    8929|    8929| 8929|
+----+----+-------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-----+

+----+----+-------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+--------+-----+
| age| job|marital|education|default|balance|housing|loan|contact| day|month|duration|campaign|pdays|previous|poutcome|label|
+----+----+-------+---------+-------+-------+-------+----+-------+----+-----+--------+--------+-----+--------+-------

In [0]:
## Cell just for testing
bank_train.select("education").distinct().show()

+---------+
|education|
+---------+
|  unknown|
| tertiary|
|secondary|
|  primary|
+---------+



Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# data preparation (4m)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

num_vars = ['age','balance','duration','campaign','pdays','previous']
cat_vars = ['job', 'marital', 'education','default', 'housing', 'loan', 'contact', 'day','month', 'poutcome']

In [0]:
# model building (4m)
indexer = StringIndexer(inputCols=cat_vars, outputCols=[_+'_indexed' for _ in cat_vars])
encoder = OneHotEncoder(inputCols=indexer.getOutputCols(), outputCols=[_+'_encoded' for _ in cat_vars])
assembler = VectorAssembler(inputCols=num_vars+encoder.getOutputCols(), outputCol='features')
scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol='scaled_features')
lr = LogisticRegression(featuresCol=scaler.getOutputCol(), labelCol='label', maxIter=10)
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler, lr])
model = pipeline.fit(bank_train)


In [0]:
# model evaluation (2m)
predictions = model.transform(bank_test)
predictionAndLabels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

evaluator.setMetricName("f1")
print("Test set F1 score = " + str(evaluator.evaluate(predictionAndLabels)))

evaluator2 = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="prediction", labelCol="label")
print("Test set AUC = " + str(evaluator2.evaluate(predictionAndLabels)))


Test set accuracy = 0.8293775190326914
Test set F1 score = 0.8292477475065775
Test set AUC = 0.8280415141083859
