## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

sales_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

sellers_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)

In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]
# Your table should have 1 column(s): [product_name]
sales_table.createOrReplaceTempView("sales")
products_table.createOrReplaceTempView("products")

popSQL = spark.sql("""
select products.product_name, products.product_id
from sales 
inner join products on products.product_id=sales.product_id
group by products.product_name, products.product_id
order by sum(sales.num_of_items_sold) desc, products.product_id
""")

popSQL.select('product_name').show(3)

+-------------+
| product_name|
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+
only showing top 3 rows



In [0]:
# (b) Find out the total sales of the products sold by sellers 1 to 10 and output the top most sold product [2m]
# Your table should have 1 column(s): [product_name]
sales_table.createOrReplaceTempView("sales")
products_table.createOrReplaceTempView("products")
sellers_table.createOrReplaceTempView("sellers")

popSQL = spark.sql("""
select products.product_name
from sales 
inner join products on products.product_id=sales.product_id
inner join sellers on sellers.seller_id=sales.seller_id
where sellers.seller_id >= 1 and sellers.seller_id <= 10
group by products.product_name
order by sum(sales.num_of_items_sold) desc
""")

popSQL.show(1)

+-------------+
| product_name|
+-------------+
|product_36658|
+-------------+
only showing top 1 row



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]
sales_table.createOrReplaceTempView("sales")
products_table.createOrReplaceTempView("products")
sellers_table.createOrReplaceTempView("sellers")

popSQL = spark.sql("""
select sum(sales.num_of_items_sold * products.price) as total_revenue
from sales 
inner join products on products.product_id=sales.product_id
inner join sellers on sellers.seller_id=sales.seller_id
where sellers.seller_id >= 1 and sellers.seller_id <= 500
""")

popSQL.show()

+-------------+
|total_revenue|
+-------------+
|    160916699|
+-------------+



In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min

sales_table.createOrReplaceTempView("sales")
products_table.createOrReplaceTempView("products")
sellers_table.createOrReplaceTempView("sellers")

first = spark.sql("""
select sellers.seller_id, sum(num_of_items_sold) as total
from sales 
inner join sellers on sellers.seller_id=sales.seller_id
where rating >= 4 
group by sellers.seller_id
""")
first.createOrReplaceTempView("first")

second = spark.sql("""
select seller_id
from first
where total >= 3000
""")
second.createOrReplaceTempView("second")

third = spark.sql("""
select distinct product_name, price, products.product_id
from sales 
inner join products on products.product_id=sales.product_id
inner join second on sales.seller_id=second.seller_id
order by price desc, products.product_id
""")

third.select('product_name').show(10)

+------------+
|product_name|
+------------+
| product_106|
| product_117|
| product_363|
| product_712|
| product_843|
| product_897|
| product_923|
|product_1466|
|product_1507|
|product_1514|
+------------+
only showing top 10 rows



## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# data preparation (4m)
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import DenseVector

# remove contact, day, month
unecessary_cols = ('contact', 'day', 'month')
bank_train = bank_train.drop(*unecessary_cols)
bank_test = bank_test.drop(*unecessary_cols)
bank_train.show(5)

# convert categorical values to numerical features
CONTI_FEATURES  = ['age', 'balance','duration', 'campaign', 'pdays', 'previous']
CATE_FEATURES = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'poutcome']
stages = [] # stages in our Pipeline

for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    stages.append(stringIndexer)
    
assemblerInputs = [c + "Index" for c in CATE_FEATURES] + CONTI_FEATURES
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages.append(assembler)

pipeline = Pipeline(stages=stages)

# To make the computation faster, convert model to a DataFrame.
pipelineModel_train = pipeline.fit(bank_train)
model_train = pipelineModel_train.transform(bank_train)
input_data = model_train.rdd.map(lambda x: (x["label"], DenseVector(x["features"])))
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
df_train.show(2)

pipelineModel_test = pipeline.fit(bank_test)
model_test = pipelineModel_test.transform(bank_test)
input_data = model_test.rdd.map(lambda x: (x["label"], DenseVector(x["features"])))
df_test = sqlContext.createDataFrame(input_data, ["label", "features"])

+---+----------+-------+---------+-------+-------+-------+----+--------+--------+-----+--------+--------+-----+
|age|       job|marital|education|default|balance|housing|loan|duration|campaign|pdays|previous|poutcome|label|
+---+----------+-------+---------+-------+-------+-------+----+--------+--------+-----+--------+--------+-----+
| 45|    admin.|married|  unknown|     no|   2033|     no|  no|      48|       4|   -1|       0| unknown|    0|
| 56|    admin.|married|  primary|     no|    202|    yes|  no|     178|       2|   -1|       0| unknown|    0|
| 50| housemaid| single|secondary|     no|    799|     no|  no|      63|       1|   -1|       0| unknown|    0|
| 58|    admin.|married|secondary|     no|   1464|    yes| yes|      53|      29|   -1|       0| unknown|    0|
| 43|management| single| tertiary|     no|  11891|     no|  no|     821|       5|  242|       1| success|    1|
+---+----------+-------+---------+-------+-------+-------+----+--------+--------+-----+--------+--------

In [0]:
# model building (4m)
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='label', seed=42, maxDepth=10)
rfModel = rf.fit(df_train)

In [0]:
# model evaluation (2m)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = rfModel.transform(df_test)
predictions.select("label", "prediction").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy*100))

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 5 rows

Accuracy = 81.92149317088963
