## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

sales_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

sellers_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)

In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]
# Your table should have 1 column(s): [product_name]

#Ref: https://www.geeksforgeeks.org/how-to-find-the-sum-of-particular-column-in-pyspark-dataframe/

#join - group product names - total num sold of each - descending - select reqd col - top 3 
products_table.join(sales_table,['product_id']).groupBy("product_name").agg({"num_of_items_sold":'sum'}).orderBy("sum(num_of_items_sold)",ascending=False).select("product_name").show(n=3,truncate=False)

+-------------+
|product_name |
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+
only showing top 3 rows



In [0]:
# (b) Find out the total sales of the products sold by sellers 1 to 10 and output the top most sold product [2m]
# Your table should have 1 column(s): [product_name]
# clarification: Output the top most sold product (in terms of quantity) among sellers with seller_id 1 to 10.

#join - filter for seller_id 1 to 10 - group product names - total num sold of each - descending - select reqd col - top 1
products_table.join(sales_table,['product_id']).filter(sales_table.seller_id.between(1,10)).groupBy("product_name").agg({'num_of_items_sold':'sum'}).orderBy("sum(num_of_items_sold)",ascending=False).select("product_name").show(n=1,truncate=False)

+-------------+
|product_name |
+-------------+
|product_36658|
+-------------+
only showing top 1 row



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]

#join - filter for seller_id 1 to 500 - get revenue column - add all revenues to get total_revenue
sales_table.join(products_table,['product_id']).filter(sales_table.seller_id.between(1,500)).withColumn("revenue",sales_table.num_of_items_sold * products_table.price).agg({"revenue":"sum"}).withColumnRenamed("sum(revenue)", "total_revenue").show()

+-------------+
|total_revenue|
+-------------+
|    160916699|
+-------------+



In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min

#filter seller rating>=4 - calc total num of items sold by each seller
com1=sellers_table.filter(sellers_table.rating>=4).join(sales_table,['seller_id'],"inner").groupBy(sales_table.seller_id).agg({"num_of_items_sold":"sum"}).withColumnRenamed("sum(num_of_items_sold)","total_num")

#filter for num >=3000 - select distinct seller_id
com2=com1.filter(com1.total_num>=3000).select("seller_id").distinct()

#join sales & product table - join with the filtered seller_ids - distinct products - order by desc price, asc prod_id - select reqd col - top 10
sales_table.join(products_table,['product_id'], "inner").join(com2,['seller_id'],"inner").select("price","product_id","product_name").distinct().orderBy(["price","product_id"],ascending=[0,1]).select("product_name").show(n=10,truncate=False)


+------------+
|product_name|
+------------+
|product_106 |
|product_117 |
|product_363 |
|product_712 |
|product_843 |
|product_897 |
|product_923 |
|product_1466|
|product_1507|
|product_1514|
+------------+
only showing top 10 rows



## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# data preparation (4m)
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.feature import ChiSqSelector

target_col = "label" #output
input_cols = ["age", "job", "marital", "education", "default", "balance", "housing", "loan", "contact", "day", "month", "duration", "campaign", "pdays", "previous", "poutcome"] #all inputs 

#mainly continuous numeric values
numeric_cols=["age","balance","day","duration","campaign","pdays","previous"]

#categories for each; some are binary(yes/no):default, housing, loan
non_numeric_cols=["job", "marital", "education", "default", "housing", "loan","contact","month","poutcome"]

#References
#https://masum-math8065.medium.com/how-to-convert-categorical-data-into-numeric-in-pyspark-2202407f5fac
#https://www.analyticsvidhya.com/blog/2019/11/build-machine-learning-pipelines-pyspark/

#string to numeric
def convert_numeric(lst):
    res=[]
    for ele in lst:
        ind=StringIndexer(inputCol=ele,outputCol="{0}_numeric".format(ele))
        res.append(ind)
    return res
indexed=convert_numeric(non_numeric_cols)

#numeric to vector
def convert_vector(lst2):
    res2=[]
    for ele2 in lst2:
        ind2=OneHotEncoder(inputCol=ele2.getOutputCol(), outputCol="{0}_encoded".format(ele2.getOutputCol()))
        res2.append(ind2)
    return res2
encoded=convert_vector(indexed)

#combine all cols for getting the features
def featurise(lst3):
    final=[]
    for col in lst3:
        final.append(col.getOutputCol())
    return numeric_cols+final
feature_cols=featurise(encoded)

#all numeric and encoded features into a single vector for input of model
v_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

#feature selection out of the features:https://george-jen.gitbook.io/data-science-and-apache-spark/chisqselector
f_selector=ChiSqSelector(featuresCol="features", outputCol="selectedFeatures", labelCol=target_col)

In [0]:
# model building (4m)
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Ref: https://www.silect.is/blog/random-forest-models-in-spark-ml/
rf = RandomForestClassifier(labelCol=target_col, featuresCol="selectedFeatures",numTrees=50,maxDepth=25)
pipeline = Pipeline(stages=indexed+encoded+[v_assembler,f_selector, rf]) 

#training data model
model = pipeline.fit(bank_train)

#test data 
pred_test = model.transform(bank_test)
#pred_test.show(n=1)

In [0]:
# model evaluation (2m)
#like demo 3
predictionAndLabels = pred_test.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
print("Test set accuracy : " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy : 0.8584863412449619
