## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df_pt = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

df_sales = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

df_seller = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)


In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]

# df_pt.createOrReplaceTempView("PT")
# df_sales.createOrReplaceTempView("SALES")
# sql_text ="""
# SELECT PT.product_name
# FROM PT INNER JOIN SALES ON PT.product_id = SALES.product_id
# GROUP BY PT.product_name
# ORDER BY SUM(SALES.num_of_items_sold) DESC
# LIMIT 3;
# """
# result = spark.sql(sql_text)

from pyspark.sql.functions import expr, desc

result = df_pt.join(df_sales, "product_id")\
.groupBy("product_name")\
.agg(expr("sum(num_of_items_sold)").alias("total_number_of_items"))\
.orderBy(desc("total_number_of_items"))\
.limit(3)\
.select("product_name")

result.show()

# +-------------+
# | product_name|
# +-------------+
# |product_51270|
# |product_18759|
# |product_59652|
# +-------------+


+-------------+
| product_name|
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+



In [0]:
# (b) Output the top most sold product (in terms of quantity) among sellers with seller_id 1 to 10 [2m]
# Your table should have 1 column(s): [product_name] 

from pyspark.sql.functions import expr, col, desc

result = df_pt.join(df_sales, "product_id")\
.groupBy("seller_id", "product_name")\
.agg(expr("sum(num_of_items_sold)").alias("total_quantity"))\
.orderBy(desc("total_quantity"))\
.filter((col("seller_id") <= 10) & (col("seller_id") >= 1))\
.limit(1)\
.select("product_name")

result.show()

# +-------------+
# | product_name|
# +-------------+
# |product_36658|
# +-------------+


+-------------+
| product_name|
+-------------+
|product_36658|
+-------------+



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]
from pyspark.sql.functions import expr, col, sum as _sum

result = df_pt.join(df_sales, "product_id")\
.groupBy("seller_id")\
.agg(expr("sum(num_of_items_sold * price)").alias("revenue"))\
.orderBy(desc("revenue"))\
.filter((col("seller_id") <= 500) & (col("seller_id") >= 1))\
.select(_sum("revenue").alias("total_revenue"))

result.show()

# +-------------+
# |total_revenue|
# +-------------+
# |    160916699|
# +-------------+


+-------------+
|total_revenue|
+-------------+
|    160916699|
+-------------+



In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min

from pyspark.sql.functions import expr, desc, asc, col, sum as _sum

df_max_num_by_seller = df_sales.join(
    df_seller.filter(col("rating") >= 4),
    on="seller_id",
    how="inner"
).groupBy("seller_id")\
.agg(expr("sum(num_of_items_sold)").alias("total_num_of_items_per_seller"))\
.orderBy(desc("total_num_of_items_per_seller"))\
.filter(col("total_num_of_items_per_seller") >= 3000)\
.select("*")

df_result = df_max_num_by_seller.join(df_sales, on=["seller_id"], how="left") \
    .join(df_pt, on=["product_id"], how="left")\
    .select("product_name", "price")\
    .distinct()\
    .orderBy(desc("price"), asc("product_id"))\
    .limit(10)\
    .select("product_name", "price")

df_result.show()

# +------------+-----+
# |product_name|price|
# +------------+-----+
# | product_106|  200|
# | product_117|  200|
# | product_363|  200|
# | product_712|  200|
# | product_843|  200|
# | product_897|  200|
# | product_923|  200|
# |product_1466|  200|
# |product_1507|  200|
# |product_1514|  200|
# +------------+-----+


+------------+-----+
|product_name|price|
+------------+-----+
| product_106|  200|
| product_117|  200|
| product_363|  200|
| product_712|  200|
| product_843|  200|
| product_897|  200|
| product_923|  200|
|product_1466|  200|
|product_1507|  200|
|product_1514|  200|
+------------+-----+



## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

bank_train.show()
bank_test.show()

+---+----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-----+
|age|       job| marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|label|
+---+----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-----+
| 45|    admin.| married|  unknown|     no|   2033|     no|  no| cellular| 28|  may|      48|       4|   -1|       0| unknown|    0|
| 56|    admin.| married|  primary|     no|    202|    yes|  no|  unknown|  9|  may|     178|       2|   -1|       0| unknown|    0|
| 50| housemaid|  single|secondary|     no|    799|     no|  no|telephone| 28|  jan|      63|       1|   -1|       0| unknown|    0|
| 58|    admin.| married|secondary|     no|   1464|    yes| yes|  unknown|  5|  jun|      53|      29|   -1|       0| unknown|    0|
| 43|management|  single| tertiary|     no|  11891|     no|  no| cell

Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# data preparation (4m)
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

# Convert categorical variables into Integer values
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome']]

# OneHotEncoder
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome']]

# Combine all features
assembler = VectorAssembler(inputCols=['age', 'job_vec', 'marital_vec', 'education_vec', 'default_vec', 'balance', 'housing_vec', 'loan_vec', 'contact_vec', 'day', 'month_vec', 'duration', 'campaign', 'pdays', 'previous', 'poutcome_vec'], outputCol="all_features")

# Select proper number of features
selector = ChiSqSelector(numTopFeatures=45, featuresCol="all_features", outputCol="selected_features", labelCol="label")


In [0]:
# model building (4m)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(numTrees=10, featureSubsetStrategy="auto", impurity="gini", maxDepth=4, maxBins=32, seed=42, featuresCol="selected_features", labelCol="label")

pipeline = Pipeline(stages=indexers + encoders + [assembler, selector, rf])

# Grid Search Targets (numTopFeatures)
# 1st time: Optimal number of top features: 45 (30-50)
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 15, 20]) \
    .addGrid(rf.maxDepth, [4, 6, 8]) \
    .addGrid(selector.numTopFeatures, [42, 44, 46, 48, 50]) \
    .build()

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Cross Validation
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=accuracy_evaluator,
    numFolds=3,
    seed=42
)

cv_model = crossval.fit(bank_train)

# Print most appropriate number of features
optimal_num_top_features = cv_model.bestModel.stages[-2].getNumTopFeatures()
print("Optimal number of top features: {}".format(optimal_num_top_features))


Optimal number of top features: 42


In [0]:
# model evaluation (2m)
predictions = cv_model.transform(bank_test)

accuracy = accuracy_evaluator.evaluate(predictions)

print(accuracy)


0.8356471115091805
