# Assignment 2 DSC 102 FA23

## Introduction

In this assignment we will conduct data engineering for the Amazon dataset. It is divided into 2 parts. The extracted features in Part 1 will be used for the Part 2 of assignment, where you train a model (or models) to predict user ratings for a product.

We will be using Apache Spark for this assignment. The default Spark API will be DataFrame, as it is now the recommended choice over the RDD API. That being said, please feel free to switch back to the RDD API if you see it as a better fit for the task. We provide you an option to request RDD format to start with. Also you can switch between DataFrame and RDD in your solution. 

Another newer API is Koalas, which is also avaliable. However, it has constraints and is not applicable to most tasks. Refer to the PA statement for detail.

### Set the following parameters

In [1]:
PID = 'A16908447' # your pid, for instance: 'a43223333'
INPUT_FORMAT = 'dataframe' # choose a format of your input data, valid options: 'dataframe', 'rdd', 'koalas'

In [2]:
# Boiler plates, do NOT modify
%load_ext autoreload
%autoreload 2
import os
import getpass
from pyspark.sql import SparkSession
from utilities import SEED
from utilities import PA2Test
from utilities import PA2Data
from utilities import data_cat
from pa2_main import PA2Executor
import time
if INPUT_FORMAT == 'dataframe':
    import pyspark.ml as M
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
if INPUT_FORMAT == 'koalas':
    import databricks.koalas as ks
elif INPUT_FORMAT == 'rdd':
    import pyspark.mllib as M
    from pyspark.mllib.feature import Word2Vec
    from pyspark.mllib.linalg import Vectors
    from pyspark.mllib.linalg.distributed import RowMatrix

os.environ['PYSPARK_SUBMIT_ARGS'] = '--py-files utilities.py,assignment2.py \
--deploy-mode client \
pyspark-shell'

class args:
    review_filename = data_cat.review_filename
    product_filename = data_cat.product_filename
    product_processed_filename = data_cat.product_processed_filename
    ml_features_train_filename = data_cat.ml_features_train_filename
    ml_features_test_filename = data_cat.ml_features_test_filename
    output_root = '/home/{}/{}-pa2/test_results'.format(getpass.getuser(), PID)
    test_results_root = data_cat.test_results_root
    pid = PID

pa2 = PA2Executor(args, input_format=INPUT_FORMAT)
data_io = pa2.data_io
data_dict = pa2.data_dict
begin = time.time()




24/06/10 00:15:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

Loading datasets ...Done


In [3]:
# Import your own dependencies

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

#-----------------------------

# Part 1: Feature Engineering

In [4]:
# Bring the part_1 datasets to memory and de-cache part_2 datasets. 
# Execute this once before you start working on this Part
data_dict, _ = data_io.cache_switch(data_dict, 'part_1')

                                                                                

# Task0: warm up 
This task is provided for you to get familiar with Spark API. We will use the dataframe API to demonstrate. Solution is given to you and this task won't be graded.

Refer to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html for API guide.

The task is to implement the function below. Given the ```product_data``` table:
1. Take and print five rows.

1. Select only the ```asin``` column, then print five rows of it.

1. Select the row where ```asin = B00I8KEOTM``` and print it.

1. Count the total number of rows.

1. Calculate the mean ```price```.

1. You need to conduct the above operations, then extract some statistics out of the generated columns. You need to put the statistics in a python dictionary named ```res```. The description and schema of it are as follows:
    ```
    res
     | -- count_total: int -- count of total rows of the entire table after your operations
     | -- mean_price: float -- mean value of column price
    ```

In [5]:
def task_0(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    overall_column = 'overall'
    # Outputs:
    mean_rating_column = 'meanRating'
    count_rating_column = 'countRating'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------

    product_data.show(5)
    product_data[['asin']].show(5)
    product_data.where(F.col('asin') == 'B00I8KEOTM').show()
    count_rows = product_data.count()
    mean_price = product_data.select(F.avg(F.col('price'))).head()[0]
    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    # Calculate the values programmatically. Do not change the keys and do not
    # hard-code values in the dict. Your submission will be evaluated with
    # different inputs.
    # Modify the values of the following dictionary accordingly.
    res = {'count_total': None, 'mean_price': None}
    
    # Modify res:

    res['count_total'] = count_rows
    res['mean_price'] = mean_price

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    return res
    # -------------------------------------------------------------------------

In [6]:
if INPUT_FORMAT == 'dataframe':
    res = task_0(data_io, data_dict['product'])
    pa2.tests.test(res, 'task_0')

+----------+--------------------+--------------------+--------------------+-----+--------------------+
|      asin|           salesRank|          categories|               title|price|             related|
+----------+--------------------+--------------------+--------------------+-----+--------------------+
|B00I8HVV6E|{Home &amp; Kitch...|[[Home & Kitchen,...|Intelligent Desig...|27.99|{also_viewed -> [...|
|B00I8KEOTM|                null|[[Apps for Androi...|                null| null|{also_viewed -> [...|
|B00I8KCW4G|{Clothing -> 2233...|[[Clothing, Shoes...|eShakti Women's P...|41.95|{also_viewed -> [...|
|B00I8JKCQW|{Clothing -> 1405...|[[Clothing, Shoes...|Lady Slimming Mid...| null|{also_viewed -> [...|
|B00I8JKI8E|{Home &amp; Kitch...|[[Clothing, Shoes...|3 Tier Bangle Bra...|24.99|{also_viewed -> [...|
+----------+--------------------+--------------------+--------------------+-----+--------------------+
only showing top 5 rows

+----------+
|      asin|
+----------+
|B00I8HVV



tests for task_0 --------------------------------------------------------------
2/2 passed
-------------------------------------------------------------------------------


                                                                                

First, you will aggregate and extract some information from the user review table. We want to know for each
product, what are the mean and the number of ratings it received. Implement a function task 1 that does the
following:
1. For each product ID asin in product, calculate the average rating it received in the column meanRating.
The ratings are stored in column overall of review.
3
2. Similarly, put the count of ratings for each product in a new column named countRating.
3. You need to conduct the above operations, then extract some statistics out of the generated columns. You
need to put the statistics in a python dictionary named res. The description and schema of it are as follows:
res
| -- count_total: int -- count of rows of the transformed table, including null rows
| -- mean_meanRating: float -- mean value of meanRating
| -- variance_meanRating: float -- variance of ...
| -- numNulls_meanRating: int -- count of nulls of ...
| -- mean_countRating: float -- mean value of countRating
| -- variance_countRating: float -- variance of ...
| -- numNulls_countRating: int -- count of nulls of ...
If for a product ID, there is not a single reference in review, meaning it was never reviewed, you should put
null in both meanRating and countRating.

# Task1

1. product
|-- asin: string, the product id, e.g., ‘B00I8HVV6E’
|-- salesRank: map, a map between category and sales rank, e.g., {‘Home & Kitchen’: 796318}
| |-- key: string, category, e.g., ‘Home & Kitchen’
| |-- value: integer, rank, e.g., 796318
|-- categories: array, list of list of categories, e.g., [[‘Home & Kitchen’, ’Artwork’]]
| |-- element: array, list of categories, e.g., [‘Home & Kitchen’, ’Artwork’]
| | |-- element: string, category, e.g., ‘Home & Kitchen’
|-- title: string, title of product, e.g., ‘Intelligent Design Cotton Canvas’
|-- price: float, price of product, e.g., 27.9
|-- related: map, related information, e.g., {‘also_viewed’: [‘B00I8HW0UK’]}
| |-- key: string, the attribute name of the information, e.g., ‘also_viewed’
| |-- value: array, array of product ids, e.g., [‘B00I8HW0UK’]
| | |-- element: string product id , e.g., ‘B00I8HW0UK’
2. product_processed
|-- asin: string, same as above
|-- title: string, title column after imputation, e.g., ‘Intelligent Design Cotton Canvas’
|-- category: string, category column after extraction, e.g., ‘Home & Kitchen’
3. review
|-- asin: string, same as above
|-- reviewerID: string, the reviewer id, e.g., ‘A1MIP8H7G33SHC’
|-- overall: float, the rating associated with the review, e.g., 5.0

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, variance, col, isnull

In [8]:
# %load -s task_1 assignment2.py
def task_1(data_io, review_data, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    overall_column = 'overall'
    # Outputs:
    mean_rating_column = 'meanRating'
    count_rating_column = 'countRating'

    # ---------------------- Your implementation begins------------------------


    grouped_reviews = review_data.groupBy(asin_column).agg(
        F.mean(F.col(overall_column)).alias(mean_rating_column),
        F.count(F.col(overall_column)).alias(count_rating_column)
    )
    

    grouped_reviews.cache()


    merged_df = product_data.join(grouped_reviews, product_data[asin_column] == grouped_reviews[asin_column], how='left').select(product_data[asin_column], mean_rating_column, count_rating_column)
    

    merged_df.cache()


    count_total = merged_df.count()
    mean_meanRating = merged_df.agg(F.mean(mean_rating_column)).first()[0]
    variance_meanRating = merged_df.agg(F.variance(mean_rating_column)).first()[0]
    numNulls_meanRating = merged_df.filter(F.col(mean_rating_column).isNull()).count()
    mean_countRating = merged_df.agg(F.mean(count_rating_column)).first()[0]
    variance_countRating = merged_df.agg(F.variance(count_rating_column)).first()[0]
    numNulls_countRating = merged_df.filter(F.col(count_rating_column).isNull()).count()

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': count_total,
        'mean_meanRating': mean_meanRating,
        'variance_meanRating': variance_meanRating,
        'numNulls_meanRating': numNulls_meanRating,
        'mean_countRating': mean_countRating,
        'variance_countRating': variance_countRating,
        'numNulls_countRating': numNulls_countRating
    }

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    return res
    # -------------------------------------------------------------------------


In [9]:
import time
start_time = time.time()

In [10]:
res = task_1(data_io, data_dict['review'], data_dict['product'])
pa2.tests.test(res, 'task_1')

                                                                                

tests for task_1 --------------------------------------------------------------
Test 1/7 : count_total ... Pass
Test 2/7 : mean_countRating ... Pass
Test 3/7 : mean_meanRating ... Pass
Test 4/7 : numNulls_countRating ... Pass
Test 5/7 : numNulls_meanRating ... Pass
Test 6/7 : variance_countRating ... Pass
Test 7/7 : variance_meanRating ... Pass
7/7 passed
-------------------------------------------------------------------------------




True

In [11]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

Execution time: 39.823331356048584 seconds



# Task 2

In [16]:
def task_2(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    salesRank_column = 'salesRank'
    categories_column = 'categories'
    asin_column = 'asin'
    # Outputs:
    category_column = 'category'
    bestSalesCategory_column = 'bestSalesCategory'
    bestSalesRank_column = 'bestSalesRank'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------


    extract_category = lambda x: None if x is None or len(x) == 0 or len(x[0]) == 0 or x[0][0] == '' else x[0][0]
    udf_extract_category = F.udf(extract_category, T.StringType())
    product_data = product_data.withColumn(
        'category', udf_extract_category(product_data['categories']))
    
    
    extract_best_category = lambda x: None if x is None or len(list(x.keys())) == 0 else list(x.keys())[0]
    udf_extract_best_category = F.udf(extract_best_category, T.StringType())

    product_data = product_data.withColumn('bestSalesCategory', udf_extract_best_category('salesRank'))
    product_data.select('bestSalesCategory').head(5)
    
    
    extract_sales_rank = lambda x: None if x is None or len(list(x.values())) == 0 else list(x.values())[0]
    udf_extract_sales_rank = F.udf(extract_sales_rank, T.IntegerType())

    product_data = product_data.withColumn('bestSalesRank', udf_extract_sales_rank('salesRank'))
    product_data.select('bestSalesRank').head(5)
    

    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'mean_bestSalesRank': None,
        'variance_bestSalesRank': None,
        'numNulls_category': None,
        'countDistinct_category': None,
        'numNulls_bestSalesCategory': None,
        'countDistinct_bestSalesCategory': None
    }
    # Modify res:

    res['count_total'] = product_data.count() 
    res['mean_bestSalesRank'] = product_data.agg(F.mean('bestSalesRank')).collect()[0][0] 
    res['variance_bestSalesRank'] = product_data.agg(F.variance('bestSalesRank')).collect()[0][0] 
    res['numNulls_category'] = product_data.filter(F.col('category').isNull()).count() 
    res['countDistinct_category'] = product_data.agg(F.countDistinct('category')).collect()[0][0] 
    res['numNulls_bestSalesCategory'] = product_data.filter(F.col('bestSalesCategory').isNull()).count() 
    res['countDistinct_bestSalesCategory'] = product_data.agg(F.countDistinct('bestSalesCategory')).collect()[0][0]

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_2')
    return res
    # -------------------------------------------------------------------------


In [17]:
start_time = time.time()

In [18]:
res = task_2(data_io, data_dict['product'])
pa2.tests.test(res, 'task_2')



24/06/10 00:24:11 ERROR TaskSchedulerImpl: Lost executor 1 on 10.34.64.35: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/06/10 00:24:11 WARN TaskSetManager: Lost task 5.0 in stage 116.0 (TID 2326) (10.34.64.35 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/06/10 00:24:11 WARN TaskSetManager: Lost task 4.0 in stage 116.0 (TID 2324) (10.34.64.35 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/06/10 00:24:11 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_290_3 !
24/06/10 00:24:11 WARN BlockManagerMasterEndpoi

Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/bitnami/python/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/bitnami/python/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt: 

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

# Task 3





In [None]:
# %load -s task_3 assignment2.py
def task_3(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    price_column = 'price'
    attribute = 'also_viewed'
    related_column = 'related'
    # Outputs:
    meanPriceAlsoViewed_column = 'meanPriceAlsoViewed'
    countAlsoViewed_column = 'countAlsoViewed'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------

    also_viewed_df = product_data \
        .select(F.col(asin_column), F.explode(F.col(f"{related_column}.{attribute}")).alias(attribute))


    product_data_broadcast = F.broadcast(product_data.select(F.col(asin_column).alias("price_asin"), price_column))


    also_viewed_prices = also_viewed_df \
        .join(product_data_broadcast, also_viewed_df[attribute] == product_data_broadcast["price_asin"], how='left')


    also_viewed_prices.cache()


    aggregated = also_viewed_prices \
        .groupBy(asin_column) \
        .agg(F.avg(price_column).alias(meanPriceAlsoViewed_column),
             F.count(attribute).alias(countAlsoViewed_column))


    result_df = product_data \
        .select(asin_column) \
        .join(aggregated, asin_column, how='left')


    result_df.cache()
    

    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------

    # Modify res:
    res = {
        'count_total': result_df.count(),
        'mean_meanPriceAlsoViewed': result_df.agg(F.avg(meanPriceAlsoViewed_column)).first()[0],
        'variance_meanPriceAlsoViewed': result_df.agg(F.variance(meanPriceAlsoViewed_column)).first()[0],
        'numNulls_meanPriceAlsoViewed': result_df.filter(F.col(meanPriceAlsoViewed_column).isNull()).count(),
        'mean_countAlsoViewed': result_df.agg(F.avg(countAlsoViewed_column)).first()[0],
        'variance_countAlsoViewed': result_df.agg(F.variance(countAlsoViewed_column)).first()[0],
        'numNulls_countAlsoViewed': result_df.filter(F.col(countAlsoViewed_column).isNull()).count()
    }




    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_3')
    return res
    # -------------------------------------------------------------------------


In [None]:
start_time = time.time()

In [None]:
res = task_3(data_io, data_dict['product'])
pa2.tests.test(res, 'task_3')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

# Task 4

In [None]:
# %load -s task_4 assignment2.py
def task_4(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    price_column = 'price'
    title_column = 'title'
    # Outputs:
    meanImputedPrice_column = 'meanImputedPrice'
    medianImputedPrice_column = 'medianImputedPrice'
    unknownImputedTitle_column = 'unknownImputedTitle'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------

    product_data = product_data.withColumn(price_column, F.col(price_column).cast("float"))

    # Compute mean and median of 'price' column
    mean_price = product_data.select(F.mean(price_column)).first()[0]
    median_price = product_data.approxQuantile(price_column, [0.5], 0.01)[0]

    # Impute null values with the mean and median
    product_data = product_data.withColumn(meanImputedPrice_column, F.when(F.col(price_column).isNull(), mean_price).otherwise(F.col(price_column)))
    product_data = product_data.withColumn(medianImputedPrice_column, F.when(F.col(price_column).isNull(), median_price).otherwise(F.col(price_column)))

    # Impute null and empty string values in 'title' column
    product_data = product_data.withColumn(unknownImputedTitle_column, F.when(F.col(title_column).isNull(), 'unknown').otherwise(F.col(title_column)))
    product_data = product_data.withColumn(unknownImputedTitle_column, F.when(F.col(unknownImputedTitle_column) == '', 'unknown').otherwise(F.col(unknownImputedTitle_column)))

    # Aggregate results
    count_total = product_data.count()
    mean_meanImputedPrice = product_data.agg(F.mean(meanImputedPrice_column)).first()[0]
    variance_meanImputedPrice = product_data.agg(F.variance(meanImputedPrice_column)).first()[0]
    numNulls_meanImputedPrice = product_data.filter(F.col(meanImputedPrice_column).isNull()).count()
    mean_medianImputedPrice = product_data.agg(F.mean(medianImputedPrice_column)).first()[0]
    variance_medianImputedPrice = product_data.agg(F.variance(medianImputedPrice_column)).first()[0]
    numNulls_medianImputedPrice = product_data.filter(F.col(medianImputedPrice_column).isNull()).count()
    numUnknowns_unknownImputedTitle = product_data.filter(F.col(unknownImputedTitle_column) == 'unknown').count()

    # Store results in a dictionary
    res = {
        'count_total': count_total,
        'mean_meanImputedPrice': mean_meanImputedPrice,
        'variance_meanImputedPrice': variance_meanImputedPrice,
        'numNulls_meanImputedPrice': numNulls_meanImputedPrice,
        'mean_medianImputedPrice': mean_medianImputedPrice,
        'variance_medianImputedPrice': variance_medianImputedPrice,
        'numNulls_medianImputedPrice': numNulls_medianImputedPrice,
        'numUnknowns_unknownImputedTitle': numUnknowns_unknownImputedTitle
    }
    




    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_4')
    return res
    # -------------------------------------------------------------------------


In [None]:
start_time = time.time()

In [None]:
res = task_4(data_io, data_dict['product'])
pa2.tests.test(res, 'task_4')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

# Task 5

In [None]:
# %load -s task_5 assignment2.py
def task_5(data_io, product_processed_data, word_0, word_1, word_2):
    # -----------------------------Column names--------------------------------
    # Inputs:
    title_column = 'title'
    # Outputs:
    titleArray_column = 'titleArray'
    titleVector_column = 'titleVector'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    product_processed_data_output = product_processed_data.withColumn(
        titleArray_column, F.split(F.lower(F.col(title_column)), ' ')
    )


    word2vec = M.feature.Word2Vec(
        minCount=100,
        vectorSize=16,
        seed=SEED,
        numPartitions=4,
        inputCol=titleArray_column,
        outputCol='word2vec_features'
    )

    # Fit the Word2Vec model to the data
    model = word2vec.fit(product_processed_data_output)




    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'size_vocabulary': None,
        'word_0_synonyms': [(None, None), ],
        'word_1_synonyms': [(None, None), ],
        'word_2_synonyms': [(None, None), ]
    }
    # Modify res:
    res['count_total'] = product_processed_data_output.count()
    res['size_vocabulary'] = model.getVectors().count()
    for name, word in zip(
        ['word_0_synonyms', 'word_1_synonyms', 'word_2_synonyms'],
        [word_0, word_1, word_2]
    ):
        res[name] = model.findSynonymsArray(word, 10)
    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_5')
    return res
    # -------------------------------------------------------------------------


In [None]:
import time
start_time = time.time()

In [None]:
res = task_5(data_io, data_dict['product_processed'], 'piano', 'rice', 'laptop')
pa2.tests.test(res, 'task_5')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

# Task 6

In [None]:
# %load -s task_6 assignment2.py
def task_6(data_io, product_processed_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    category_column = 'category'
    # Outputs:
    categoryIndex_column = 'categoryIndex'
    categoryOneHot_column = 'categoryOneHot'
    categoryPCA_column = 'categoryPCA'
    # -------------------------------------------------------------------------    

    # ---------------------- Your implementation begins------------------------


    indexer = M.feature.StringIndexer(inputCol="category", outputCol="categoryIndexed", stringOrderType="frequencyDesc")
    encoder = M.feature.OneHotEncoder(inputCol="categoryIndexed", outputCol="categoryOneHot", dropLast=False)
    pca = M.feature.PCA(k=15, inputCol="categoryOneHot", outputCol="categoryPCA")

    pipeline = M.Pipeline(stages=[indexer, encoder, pca])
    model = pipeline.fit(product_processed_data)
    final = model.transform(product_processed_data)
    
    summarizer = M.stat.Summarizer.metrics("mean")




    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'meanVector_categoryOneHot': [None, ],
        'meanVector_categoryPCA': [None, ]
    }
    # Modify res:
    res['count_total'] = final.count()
    res['meanVector_categoryOneHot'] = final.select(summarizer.summary(final.categoryOneHot)).collect()[0][0][0]
    res['meanVector_categoryPCA'] = final.select(summarizer.summary(final.categoryPCA)).collect()[0][0][0]
    

    

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_6')
    return res
    # -------------------------------------------------------------------------


In [None]:
start_time = time.time()

In [None]:
res = task_6(data_io, data_dict['product_processed'])
pa2.tests.test(res, 'task_6')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [None]:
print ("End to end time: {}".format(time.time()-begin))

# Part 2: Model Selection

In [None]:
# Bring the part_2 datasets to memory and de-cache part_1 datasets.
# Execute this once before you start working on this Part
data_dict, _ = data_io.cache_switch(data_dict, 'part_2')

# Task 7

In [None]:
def task_7(data_io, train_data, test_data):
    
    # ---------------------- Your implementation begins------------------------
    
    dt = M.regression.DecisionTreeRegressor(maxDepth=5)

    model = dt.fit(train_data.withColumnRenamed('overall', 'label'))

    predictions = model.transform(test_data)
    
    evaluator = M.evaluation.RegressionEvaluator(
        labelCol="overall",
        predictionCol="prediction",
        metricName="rmse"
    )

    rmse = evaluator.evaluate(predictions)
    
    
    
    # -------------------------------------------------------------------------
    
    
    # ---------------------- Put results in res dict --------------------------
    res = {
        'test_rmse': None
    }
    # Modify res:
    res['test_rmse'] = rmse
    

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_7')
    return res
    # -------------------------------------------------------------------------

In [None]:
start_time = time.time()

In [None]:
res = task_7(data_io, data_dict['ml_features_train'], data_dict['ml_features_test'])
pa2.tests.test(res, 'task_7')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

# Task 8

In [None]:
def task_8(data_io, train_data, test_data):
    
    # ---------------------- Your implementation begins------------------------
    train_data, val_data = train_data.randomSplit([0.75, 0.25], seed=42)
    
    rmses = []
    
    for i, max_depth in enumerate([5,7,9,12]):

        dt = M.regression.DecisionTreeRegressor(maxDepth=max_depth)

        model = dt.fit(train_data.withColumnRenamed('overall', 'label'))

        predictions = model.transform(val_data)

        evaluator = M.evaluation.RegressionEvaluator(
            labelCol="overall",
            predictionCol="prediction",
            metricName="rmse"
        )

        rmses.append((evaluator.evaluate(predictions), i))
    
    
    
    
    
    # -------------------------------------------------------------------------
    
    
    # ---------------------- Put results in res dict --------------------------
    res = {
        'test_rmse': None,
        'valid_rmse_depth_5': None,
        'valid_rmse_depth_7': None,
        'valid_rmse_depth_9': None,
        'valid_rmse_depth_12': None,
    }
    # Modify res:
    res['test_rmse'] = rmses[max(rmses)[1]][0]
    res['valid_rmse_depth_5'] = rmses[0][0]
    res['valid_rmse_depth_7'] = rmses[1][0]
    res['valid_rmse_depth_9'] = rmses[2][0]
    res['valid_rmse_depth_12'] = rmses[3][0]
    
    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_8')
    return res
    # -------------------------------------------------------------------------

In [None]:
start_time = time.time()

In [None]:
res = task_8(data_io, data_dict['ml_features_train'], data_dict['ml_features_test'])
pa2.tests.test(res, 'task_8')

In [None]:
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [None]:
print ("End to end time: {}".format(time.time()-begin))