## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

sales_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

sellers_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)

In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]
# Your table should have 1 column(s): [product_name]

# create view to use with SQL queries
products_table.createOrReplaceTempView("products_table")
sales_table.createOrReplaceTempView("sales_table")


# Join the products and sales together
# Then group it and get the total sales per product 
spark.sql("""
select p.product_name
from products_table p join sales_table s
on s.product_id = p.product_id
group by p.product_name
order by sum(s.num_of_items_sold) desc
limit 3
""").show()







+-------------+
| product_name|
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+



In [0]:
# (b) Find out the total sales of the products sold by sellers 1 to 10 and output the top most sold product [2m]
# Your table should have 1 column(s): [product_name]

# create view to use with SQL queries
products_table.createOrReplaceTempView("products_table")
sales_table.createOrReplaceTempView("sales_table")

# Inner query gets the product_id of the most sold product
# Group by product_id as it is also unique and sales_table has no product_name
# Outer query gets the product_name

spark.sql("""
select product_name 
from products_table
where product_id = 

(
select product_id
from sales_table
where seller_id <= 10
group by product_id
order by sum(num_of_items_sold) desc
limit 1

)
""").show()

+-------------+
| product_name|
+-------------+
|product_36658|
+-------------+



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]


# create view to use with SQL queries
products_table.createOrReplaceTempView("products_table")
sales_table.createOrReplaceTempView("sales_table")

# For explaination
# Gets only the rows where seller_id is within range
# spark.sql("""
# select *
# from sales_table s join products_table p 
# on s.product_id = p.product_id
# where s.seller_id <= 500;

# """).show(10)


# For each row, just sum up the num_of_items * price
spark.sql("""
select sum(s.num_of_items_sold * p.price) as total_revenue
from sales_table s join products_table p 
on s.product_id = p.product_id
where s.seller_id <= 500;

""").show()




+--------+----------+---------+-----------------+----------+-------------+-----+
|order_id|product_id|seller_id|num_of_items_sold|product_id| product_name|price|
+--------+----------+---------+-----------------+----------+-------------+-----+
|     126|     68948|       71|              432|     68948|product_68948|  144|
|     203|     76129|      235|              266|     76129|product_76129|   51|
|     269|     82862|      478|              873|     82862|product_82862|   37|
|     304|     50640|      186|              326|     50640|product_50640|  161|
|     353|     58622|      281|              790|     58622|product_58622|  139|
|     387|       560|      120|              821|       560|  product_560|   86|
|     450|     88244|      388|              411|     88244|product_88244|  177|
|     596|     30662|      391|              401|     30662|product_30662|   49|
|     651|     78967|      410|              833|     78967|product_78967|   95|
|     677|     10209|      1

In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min


sellers_table.createOrReplaceTempView("sellers_table")
products_table.createOrReplaceTempView("products_table")
sales_table.createOrReplaceTempView("sales_table")


# For explanation

# Get the list of valid sellers
# spark.sql("""
# (
# select s.seller_id
#   from sales_table s
#   join sellers_table sl 
#   on s.seller_id = sl.seller_id
#   where sl.rating >=4
#   group by s.seller_id
#   having sum(s.num_of_items_sold) >= 3000
# )  
# """).show()


# spark.sql("""
# select *
# from products_table p join sales_table s
# on p.product_id = s.product_id
# where s.seller_id in 
# (
# select s.seller_id
#   from sales_table s
#   join sellers_table sl 
#   on s.seller_id = sl.seller_id
#   where sl.rating >=4
#   group by s.seller_id
#   having sum(s.num_of_items_sold) >= 3000
# )  
# order by p.product_id asc
# limit 10
# """).show()



spark.sql("""
select p.product_name
from products_table p join sales_table s
on p.product_id = s.product_id
where s.seller_id in 
(
select s.seller_id
  from sales_table s
  join sellers_table sl 
  on s.seller_id = sl.seller_id
  where sl.rating >=4
  group by s.seller_id
  having sum(s.num_of_items_sold) >= 3000
)  
group by p.product_name, p.price, p.product_id
order by p.price desc, p.product_id asc
limit 10
""").show()


# For testing
# spark.sql("""
# select *
# from sales_table join products_table on sales_table.product_id = products_table.product_id
# where seller_id in 
# (
# select s.seller_id
#   from sales_table s
#   join sellers_table sl 
#   on s.seller_id = sl.seller_id
#   where sl.rating >=4
#   group by s.seller_id
#   having sum(s.num_of_items_sold) >= 3000
# )  
# and seller_id = "47823"
# """).show()









+---------+
|seller_id|
+---------+
|    41751|
|    18979|
|     1591|
|    40574|
|    49308|
|    12940|
|    45011|
|    49855|
|    40011|
|    47084|
|    21700|
|    18866|
|    13285|
|      471|
|      148|
|    45307|
|    46465|
|    46943|
|    26706|
|      833|
+---------+
only showing top 20 rows

+----------+------------+-----+--------+----------+---------+-----------------+
|product_id|product_name|price|order_id|product_id|seller_id|num_of_items_sold|
+----------+------------+-----+--------+----------+---------+-----------------+
|         3|   product_3|  147|  241776|         3|    38710|              484|
|         3|   product_3|  147|  183426|         3|    42730|              224|
|         5|   product_5|   13|   51484|         5|    46959|              409|
|         5|   product_5|   13|   14481|         5|    45359|              725|
|         5|   product_5|   13|  256866|         5|    47593|              547|
|         9|   product_9|   70|   13981|      

## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# TASK: Build ML model to predict whether the customer will subscribe bank deposit service or not. 

"""
Possible parameters to use

age:integer              --> needed
marital:string           --> need convert to numeric
education:string         --> need convert to numeric
default:string           --> need convert to numeric
balance:integer          --> needed
housing:string           --> need convert to numeric
loan:string              --> need convert to numeric
duration:integer         --> needed
campaign:integer         --> needed
pdays:integer            --> needed
previous:integer         --> needed
poutcome:string          --> need convert to numeric
label:integer            --> target variable

job:string               --> not needed
contact:string           --> not needed
day:integer              --> not needed
month:string             --> not needed
"""

# data preparation (4m)
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

convert_marital = StringIndexer(
    inputCol = "marital",
    outputCol = "marital_index")

convert_education = StringIndexer(
    inputCol = "education",
    outputCol = "education_index")

convert_default = StringIndexer(
    inputCol = "default",
    outputCol = "default_index")

convert_housing = StringIndexer(
    inputCol = "housing",
    outputCol = "housing_index")

convert_loan = StringIndexer(
    inputCol = "loan",
    outputCol = "loan_index")

convert_poutcome = StringIndexer(
    inputCol = "poutcome",
    outputCol = "poutcome_index")



data_prep_pipeline = Pipeline(stages=[convert_marital, convert_education, convert_default, convert_housing, convert_loan, convert_poutcome])

pipeline_model_train = data_prep_pipeline.fit(bank_train)
pipeline_model_test = data_prep_pipeline.fit(bank_test)

updated_bank_train = pipeline_model_train.transform(bank_train)
updated_bank_test = pipeline_model_test.transform(bank_test)

updated_bank_train = updated_bank_train.drop("marital", "education", "default", "housing", "loan", "poutcome", "job", "contact", "day", "month")
updated_bank_test = updated_bank_test.drop("marital", "education", "default", "housing", "loan", "poutcome", "job", "contact", "day", "month")


required_features = updated_bank_train.schema.names
required_features.remove("label")
print(required_features)

assembler_train = VectorAssembler(inputCols=required_features, outputCol='features')
assembler_test = VectorAssembler(inputCols=required_features, outputCol='features')

updated_bank_train = assembler_train.transform(updated_bank_train)
updated_bank_test = assembler_test.transform(updated_bank_test)

updated_bank_train.show()
updated_bank_test.show()







['age', 'balance', 'duration', 'campaign', 'pdays', 'previous', 'marital_index', 'education_index', 'default_index', 'housing_index', 'loan_index', 'poutcome_index']
+---+-------+--------+--------+-----+--------+-----+-------------+---------------+-------------+-------------+----------+--------------+--------------------+
|age|balance|duration|campaign|pdays|previous|label|marital_index|education_index|default_index|housing_index|loan_index|poutcome_index|            features|
+---+-------+--------+--------+-----+--------+-----+-------------+---------------+-------------+-------------+----------+--------------+--------------------+
| 45|   2033|      48|       4|   -1|       0|    0|          0.0|            3.0|          0.0|          0.0|       0.0|           0.0|(12,[0,1,2,3,4,7]...|
| 56|    202|     178|       2|   -1|       0|    0|          0.0|            2.0|          0.0|          1.0|       0.0|           0.0|[56.0,202.0,178.0...|
| 50|    799|      63|       1|   -1|       

In [0]:
# model building (4m)


from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


rf = RandomForestClassifier(labelCol="label",
                            featuresCol="features",
                            maxDepth=5)

model = rf.fit(updated_bank_train)



In [0]:
# model evaluation (2m)

predictions = model.transform(updated_bank_test)
predictions.show()

evaluator = MulticlassClassificationEvaluator(
    labelCol='label', 
    predictionCol='prediction', 
    metricName='accuracy')

accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

+---+-------+--------+--------+-----+--------+-----+-------------+---------------+-------------+-------------+----------+--------------+--------------------+--------------------+--------------------+----------+
|age|balance|duration|campaign|pdays|previous|label|marital_index|education_index|default_index|housing_index|loan_index|poutcome_index|            features|       rawPrediction|         probability|prediction|
+---+-------+--------+--------+-----+--------+-----+-------------+---------------+-------------+-------------+----------+--------------+--------------------+--------------------+--------------------+----------+
| 45|   2220|     128|       2|   -1|       0|    0|          0.0|            1.0|          0.0|          1.0|       0.0|           0.0|[45.0,2220.0,128....|[16.5639948298713...|[0.82819974149356...|       0.0|
| 36|   3623|      71|       1|  378|       1|    1|          1.0|            0.0|          0.0|          0.0|       0.0|           2.0|[36.0,3623.0,71.0...