In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 6

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .config("spark.yarn.queue", "master_teams")\
        .config("spark.executor.instances", "10")\
        .config("spark.executor.cores", "10")\
        .enableHiveSupport()\
        .getOrCreate()


## Read the hive tables as dataframes.

In [2]:
items = spark.read.format("avro").table('team6_projectdb.items')
items.createOrReplaceTempView('items')

In [3]:
items.printSchema()

root
 |-- itemid: integer (nullable = true)
 |-- shopid: integer (nullable = true)
 |-- item_name: string (nullable = true)
 |-- item_description: string (nullable = true)
 |-- item_variation: string (nullable = true)
 |-- price: float (nullable = true)
 |-- stock: integer (nullable = true)
 |-- cb_option: boolean (nullable = true)
 |-- is_preferred: boolean (nullable = true)
 |-- sold_count: integer (nullable = true)
 |-- item_creation_date: timestamp (nullable = true)
 |-- category: string (nullable = true)



In [4]:
items.show(2)

+---------+--------+--------------------+--------------------+--------------+-----+-----+---------+------------+----------+-------------------+--------------+
|   itemid|  shopid|           item_name|    item_description|item_variation|price|stock|cb_option|is_preferred|sold_count| item_creation_date|      category|
+---------+--------+--------------------+--------------------+--------------+-----+-----+---------+------------+----------+-------------------+--------------+
|682899825|16174997|Christmas Sexy Sl...|Christmas Sexy Sl...|{Default: 4.0}|  4.0| 1000|     true|       false|         0|2017-11-14 14:22:00|Miscellaneous |
|682899784|16174997|Christmas Sexy Sl...|Christmas Sexy Sl...|{Default: 4.0}|  4.0| 1000|     true|       false|         0|2017-11-14 14:22:00|Miscellaneous |
+---------+--------+--------------------+--------------------+--------------+-----+-----+---------+------------+----------+-------------------+--------------+
only showing top 2 rows



## Build and fit a feature extraction pipeline.

### Vectorize Item Description

In [5]:
input_col = "item_description"
tokens_col = "desc_tokens"
output_col = "desc_enc"

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer


pipeline = Pipeline(stages=[
    Tokenizer(
        inputCol=input_col,
        outputCol=tokens_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+-------+--------------------+--------------------+-----+-----+---------+------------+----------+-------------------+----------------+--------------------+
|   itemid| shopid|           item_name|      item_variation|price|stock|cb_option|is_preferred|sold_count| item_creation_date|        category|         desc_tokens|
+---------+-------+--------------------+--------------------+-----+-----+---------+------------+----------+-------------------+----------------+--------------------+
|821115857|3344977|Minute Maid Pulph...|                  {}| 1.25|   20|    false|       false|         0|2018-01-09 18:08:00|Food & Beverages|[free, delivery, ...|
|780592157|3344977|Minute Maid Pulph...|{Orange: 1.25, Ma...| 1.25|   90|    false|       false|         0|2017-12-23 07:59:00|Food & Beverages|[ingredients:wate...|
+---------+-------+--------------------+--------------------+-----+-----+---------+------------+----------+-------------------+----------------+--------------------+
only

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import Word2Vec


pipeline = Pipeline(stages=[
    Word2Vec(
        vectorSize=5,
        seed=42,
        minCount=10,
        inputCol=tokens_col,
        outputCol=output_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(tokens_col)

items.show(2)

+---------+--------+--------------------+---------------+-----+-----+---------+------------+----------+-------------------+--------------------+--------------------+
|   itemid|  shopid|           item_name| item_variation|price|stock|cb_option|is_preferred|sold_count| item_creation_date|            category|            desc_enc|
+---------+--------+--------------------+---------------+-----+-----+---------+------------+----------+-------------------+--------------------+--------------------+
|534623211|16174997|2.4G Air Mouse Wi...|   {Black: 7.8}|  7.8| 1000|     true|       false|         1|2017-09-30 07:22:00|Computers & Perip...|[-0.1520559997172...|
|455041448|16174997|2.4G Air Mouse Wi...|{DEFAULT: 12.0}| 12.0| 1000|     true|       false|         0|2017-09-01 08:55:00|Computers & Perip...|[-0.1403763069035...|
+---------+--------+--------------------+---------------+-----+-----+---------+------------+----------+-------------------+--------------------+--------------------+
only

### Vectorize Item Variation

In [8]:
input_col = "item_variation"
tokens_col = "var_tokens"
output_col = "var_enc"

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer


pipeline = Pipeline(stages=[
    Tokenizer(
        inputCol=input_col,
        outputCol=tokens_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+-----------------+
|   itemid|  shopid|           item_name|price|stock|cb_option|is_preferred|sold_count| item_creation_date|       category|            desc_enc|       var_tokens|
+---------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+-----------------+
|372745192|16174997|1/4 3/8 1/2 8Pcs ...|  2.7| 1000|     true|       false|         0|2017-07-21 09:58:00|Home Appliances|[-0.1936963422348...|[{default:, 2.7}]|
|372743622|16174997|1/4 3/8 1/2 8Pcs ...|  3.9| 1000|     true|       false|         0|2017-07-21 09:57:00|Home Appliances|[-0.1218324403499...|[{default:, 3.9}]|
+---------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+-----------------+
only showing top 2 row

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import Word2Vec


pipeline = Pipeline(stages=[
    Word2Vec(
        vectorSize=5,
        seed=42,
        minCount=1,
        inputCol=tokens_col,
        outputCol=output_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(tokens_col)

items.show(2)

+----------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+-----------------+--------------------+--------------------+
|    itemid|  shopid|           item_name|price|stock|cb_option|is_preferred|sold_count| item_creation_date|         category|            desc_enc|             var_enc|
+----------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+-----------------+--------------------+--------------------+
|1015202788|16174997|HW Fishing Tackle...| 2.24| 1000|     true|       false|         0|2018-03-26 07:58:00|Sports & Outdoors|[-0.3909013677956...|[0.13756516203284...|
|1015202786|16174997|HW Fishing Tackle...| 2.58| 1000|     true|       false|         0|2018-03-26 07:58:00|Sports & Outdoors|[-0.4445349556409...|[0.01176970079541...|
+----------+--------+--------------------+-----+-----+---------+------------+----------+-------------------+-----------------+--------------------+--------

### Vectorize Item Name

In [11]:
input_col = "item_name"
tokens_col = "name_tokens"
output_col = "name_enc"

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer


pipeline = Pipeline(stages=[
    Tokenizer(
        inputCol=input_col,
        outputCol=tokens_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count| item_creation_date|       category|            desc_enc|             var_enc|         name_tokens|
+---------+--------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+
|839669074|16174997| 2.52| 2000|     true|       false|         0|2018-01-16 13:32:00|Pet Accessories|[-0.3230378506332...|[-0.0945925083942...|[handle, shedding...|
|839669045|16174997|  2.5| 2000|     true|       false|         0|2018-01-16 13:32:00|Pet Accessories|[-0.3198394560725...|[-0.2202361963689...|[handle, shedding...|
+---------+--------+-----+-----+---------+------------+----------+-------------------+---------------+--------------------+--------------------+--------------------+
only

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import Word2Vec


pipeline = Pipeline(stages=[
    Word2Vec(
        vectorSize=5,
        seed=42,
        minCount=1,
        inputCol=tokens_col,
        outputCol=output_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(tokens_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+-------------------+-------------+--------------------+--------------------+--------------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count| item_creation_date|     category|            desc_enc|             var_enc|            name_enc|
+---------+--------+-----+-----+---------+------------+----------+-------------------+-------------+--------------------+--------------------+--------------------+
|455044645|16174997| 45.7| 1000|     true|       false|         0|2017-09-01 08:57:00|Home & Living|[-0.0499512929065...|[0.00447766948491...|[0.58400923599089...|
|455044643|16174997| 45.7| 3000|     true|       false|         0|2017-09-01 08:57:00|Home & Living|[-0.0482056512954...|[0.00447766948491...|[0.58400923599089...|
+---------+--------+-----+-----+---------+------------+----------+-------------------+-------------+--------------------+--------------------+--------------------+
only showing top

### Encode timestamp

In [14]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute

# there is no sense in "seconds" column because it is always zero
items = items.withColumn("year", year("item_creation_date")) \
        .withColumn("month", month("item_creation_date")) \
        .withColumn("day", dayofmonth("item_creation_date")) \
        .withColumn("hour", hour("item_creation_date")) \
        .withColumn("minute", minute("item_creation_date")) \
        .drop("item_creation_date")

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+-----+---+----+------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|       category|            desc_enc|             var_enc|            name_enc|year|month|day|hour|minute|
+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+-----+---+----+------+
|839669074|16174997| 2.52| 2000|     true|       false|         0|Pet Accessories|[-0.3230378506332...|[-0.0945925083942...|[0.33843006609151...|2018|    1| 16|  13|    32|
|839669045|16174997|  2.5| 2000|     true|       false|         0|Pet Accessories|[-0.3198394560725...|[-0.2202361963689...|[0.33843006609151...|2018|    1| 16|  13|    32|
+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+-----------

In [15]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import math


class TimeEncoderTransformer(
    Transformer,
    HasInputCol,
    HasOutputCol,
    DefaultParamsReadable,
    DefaultParamsWritable
):
    input_col = Param(
        Params._dummy(),
        "input_col",
        "input column name.",
        typeConverter=TypeConverters.toString
    )
    output_col_sin = Param(
        Params._dummy(),
        "output_col_sin",
        "output column name for sin wave.",
        typeConverter=TypeConverters.toString
    )
    output_col_cos = Param(
        Params._dummy(),
        "output_col_cos",
        "output column name for cos wave.",
        typeConverter=TypeConverters.toString
    )
    timestamp_part = Param(
        Params._dummy(),
        "timestamp_part",
        "part of the timestamp like month, day, hour, minute",
        typeConverter=TypeConverters.toString
    )

    @keyword_only
    def __init__(
        self,
        input_col: str = "input",
        output_col_sin: str = "sin",
        output_col_cos: str = "cos",
        timestamp_part: str = "month"
    ):
        super(TimeEncoderTransformer, self).__init__()
        self._setDefault(
            input_col=None,
            output_col_sin=None,
            output_col_cos=None,
            timestamp_part=None
        )
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(
        self,
        input_col: str = "input",
        output_col_sin: str = "sin",
        output_col_cos: str = "cos",
        timestamp_part: str = "month"
    ):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col_sin(self):
        return self.getOrDefault(self.output_col_sin)

    def get_output_col_cos(self):
        return self.getOrDefault(self.output_col_cos)

    def get_timestamp_part(self):
        return self.getOrDefault(self.timestamp_part)

    def _transform(self, df: DataFrame):
        input_col = self.get_input_col()
        output_col_sin = self.get_output_col_sin()
        output_col_cos = self.get_output_col_cos()
        timestamp_part = self.get_timestamp_part()

        if timestamp_part == 'month':
            denominator = 12
        elif timestamp_part == 'day':
            denominator = 31
        elif timestamp_part in ['hour', 'minute']:
            denominator = 60
        else:
            raise Exception()

        sin_udf = F.udf(
            lambda x: math.sin(2 * math.pi * x / denominator),
            DoubleType()
        )
        cos_udf = F.udf(
            lambda x: math.cos(2 * math.pi * x / denominator),
            DoubleType()
        )

        df = df.withColumn(output_col_sin, sin_udf(F.col(input_col)))
        df = df.withColumn(output_col_cos, cos_udf(F.col(input_col)))

        return df

In [16]:
from pyspark.ml import Pipeline

input_col = "month"

pipeline = Pipeline(stages=[
    TimeEncoderTransformer(
        input_col=input_col,
        output_col_sin=f"{input_col}_sin",
        output_col_cos=f"{input_col}_cos",
        timestamp_part=input_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+---+----+------+-------------------+------------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|       category|            desc_enc|             var_enc|            name_enc|year|day|hour|minute|          month_sin|         month_cos|
+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+---+----+------+-------------------+------------------+
|839669074|16174997| 2.52| 2000|     true|       false|         0|Pet Accessories|[-0.3230378506332...|[-0.0945925083942...|[0.33843006609151...|2018| 16|  13|    32|0.49999999999999994|0.8660254037844387|
|839669045|16174997|  2.5| 2000|     true|       false|         0|Pet Accessories|[-0.3198394560725...|[-0.2202361963689...|[0.33843006609151...|2018| 16|  13|    32|0.49999999

In [17]:
from pyspark.ml import Pipeline

input_col = "day"

pipeline = Pipeline(stages=[
    TimeEncoderTransformer(
        input_col=input_col,
        output_col_sin=f"{input_col}_sin",
        output_col_cos=f"{input_col}_cos",
        timestamp_part=input_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+----------------+--------------------+--------------------+--------------------+----+----+------+---------+--------------------+------------------+-------------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|        category|            desc_enc|             var_enc|            name_enc|year|hour|minute|month_sin|           month_cos|           day_sin|            day_cos|
+---------+--------+-----+-----+---------+------------+----------+----------------+--------------------+--------------------+--------------------+----+----+------+---------+--------------------+------------------+-------------------+
|185120551|16503999|  8.0|    5|    false|       false|         0|Mobile & Gadgets|[0.41284126639366...|[-0.4695885355273...|[0.54840807616710...|2017|  19|    26|      1.0|6.123233995736766...|-0.651372482722222|-0.7587581226927911|
|169964291|16503999|  8.0|    4|    false|       false|         

In [18]:
from pyspark.ml import Pipeline

input_col = "hour"

pipeline = Pipeline(stages=[
    TimeEncoderTransformer(
        input_col=input_col,
        output_col_sin=f"{input_col}_sin",
        output_col_cos=f"{input_col}_cos",
        timestamp_part=input_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+----------+--------+-----+-----+---------+------------+----------+-----------------+--------------------+--------------------+--------------------+----+------+---------+--------------------+------------------+------------------+------------------+------------------+
|    itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|         category|            desc_enc|             var_enc|            name_enc|year|minute|month_sin|           month_cos|           day_sin|           day_cos|          hour_sin|          hour_cos|
+----------+--------+-----+-----+---------+------------+----------+-----------------+--------------------+--------------------+--------------------+----+------+---------+--------------------+------------------+------------------+------------------+------------------+
|1015202788|16174997| 2.24| 1000|     true|       false|         0|Sports & Outdoors|[-0.3909013677956...|[0.13756516203284...|[0.37490432438525...|2018|    58|      1.0|6.123233995736766...|-0.84

In [19]:
from pyspark.ml import Pipeline

input_col = "minute"

pipeline = Pipeline(stages=[
    TimeEncoderTransformer(
        input_col=input_col,
        output_col_sin=f"{input_col}_sin",
        output_col_cos=f"{input_col}_cos",
        timestamp_part=input_col
    )
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+-------------------+------------------+--------------------+-------------------+------------------+-------------------+--------------------+-------------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|       category|            desc_enc|             var_enc|            name_enc|year|          month_sin|         month_cos|             day_sin|            day_cos|          hour_sin|           hour_cos|          minute_sin|         minute_cos|
+---------+--------+-----+-----+---------+------------+----------+---------------+--------------------+--------------------+--------------------+----+-------------------+------------------+--------------------+-------------------+------------------+-------------------+--------------------+-------------------+
|839669074|16174997| 2.52| 2000|     true|       false|         0|P

### Encode Category

In [20]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

input_col = "category"

pipeline = Pipeline(stages=[
    StringIndexer(
        inputCol=input_col,
        outputCol=f"{input_col}_indexed")
])

items = pipeline.fit(items)\
            .transform(items)\
            .drop(input_col)

items.show(2)

+---------+--------+-----+-----+---------+------------+----------+--------------------+--------------------+--------------------+----+------------------+--------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+----------------+
|   itemid|  shopid|price|stock|cb_option|is_preferred|sold_count|            desc_enc|             var_enc|            name_enc|year|         month_sin|           month_cos|           day_sin|            day_cos|          hour_sin|           hour_cos|         minute_sin|         minute_cos|category_indexed|
+---------+--------+-----+-----+---------+------------+----------+--------------------+--------------------+--------------------+----+------------------+--------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+----------------+
|214336190|16581681| 95.0|    5|    false|       false|         0|[-0.

### Assemble Features

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

inputCols = [
    'itemid',
    'shopid',
    'price',
    'stock',
    'cb_option',
    'is_preferred',
    'sold_count',
    'year',
    'month_sin',
    'month_cos',
    'day_sin',
    'day_cos',
    'hour_sin',
    'hour_cos',
    'minute_sin',
    'minute_cos',
    'name_enc',
    'desc_enc',
    'var_enc'
]


pipeline = Pipeline(stages=[
    VectorAssembler(
        inputCols=inputCols,
        outputCol="features"
    )
])

items = pipeline.fit(items).transform(items)

for col in inputCols:
    items = items.drop(col)

items.show(2)

+----------------+--------------------+
|category_indexed|            features|
+----------------+--------------------+
|            10.0|[7.44783814E8,1.6...|
|            10.0|[7.44783812E8,1.6...|
+----------------+--------------------+
only showing top 2 rows



## Split the input dataset into train and test datasets.

In [22]:
(train_data, test_data) = items.randomSplit([0.8, 0.2], seed=10)

In [24]:
train_data.select("features", "category_indexed")\
    .repartition(4)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
!hdfs dfs -get project/data/train/*.json ../data/train

test_data.select("features", "category_indexed")\
    .repartition(4)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
!hdfs dfs -get project/data/test/*.json ../data/test

## Select two types of ML models based on the ML task specified in project.info sheet.

1. Logistic Regression
2. Decision Tree

## First model type

### Build and train the model.

In [26]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="category_indexed", featuresCol="features")
model = lr.fit(train_data)

### Predict for the test data.

In [27]:
# Make predictions on the test set
predictions = model.transform(test_data)

### Evaluate the model.

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate the model using F1-score
evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator.evaluate(predictions)
print(f"Test set F1-score: {f1_score}")

Test set F1-score: 0.6249912365399729


### Specify at least 2 hyperparameters for it and the settings of grid search and cross validation.

1. RegParam
2. ElasticNetParam

### Optimize its hyperparameters using cross validation and grid search on the training data only.

In [31]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)

In [33]:
from pyspark.ml.tuning import CrossValidator

# Create the CrossValidator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

In [34]:
# Fit the model using cross-validation on the training data
cvModel = cv.fit(train_data)

### Select the best model (model1) from grid search.

In [35]:
# Get the best model
bestModel = cvModel.bestModel

### Save the model1 to HDFS in location like project/models/model1 and later put it in models/model1 folder in the repository.

In [36]:
bestModel.write().overwrite().save("project/models/model1")

!hdfs dfs -get project/models/model1 ../models/model1

### Predict for the test data using the model1.

In [37]:
# Make predictions on the test set
predictions = bestModel.transform(test_data)

### Save the prediction results in HDFS in a CSV file like project/output/model1_predictions and later save it in output/model1_predictions.csv folder in the repository…

In [39]:
predictions.show()

+----------------+--------------------+--------------------+--------------------+----------+
|category_indexed|            features|       rawPrediction|         probability|prediction|
+----------------+--------------------+--------------------+--------------------+----------+
|            14.0|[219027.0,48649.0...|[-10.391423429088...|[2.71367267055964...|      14.0|
|            14.0|[516402.0,103034....|[-6.4626236775148...|[4.78846833777391...|      14.0|
|            14.0|[686736.0,227608....|[-2.1462063460509...|[7.48238783668572...|       1.0|
|            14.0|[947712.0,227608....|[-2.0913311206873...|[7.45067917522111...|       6.0|
|            14.0|[1250848.0,10017....|[-0.6827106799718...|[0.00923698282475...|       4.0|
|            14.0|[1498027.0,12755....|[-9.6460551350904...|[3.84640026377883...|       1.0|
|            14.0|[2870492.0,138778...|[0.88468505871031...|[0.03350637985424...|       7.0|
|            14.0|[3274086.0,15575....|[-0.4706152891258...|[1.9717902

In [40]:
# # Select only the "label" and "prediction" columns
# predictions_selected = predictions.select("category_indexed", "prediction")

In [46]:
predictions.select("category_indexed", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .save("project/output/model1_predictions")

# predictions.select("category_indexed", "prediction")\
#             .coalesce(1)\
#             .write\
#             .csv(
#                 "project/output/model1_predictions",
#                 mode="overwrite",
#                 header=True
#             )

Py4JJavaError: An error occurred while calling o7809.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:261)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3992.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3992.0 (TID 136483) (hadoop-04.uni.innopolis.ru executor 5): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:331)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$17(FileFormatWriter.scala:239)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 332 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:158)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:314)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:228)
	... 41 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:331)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$17(FileFormatWriter.scala:239)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 332 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:158)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:314)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	... 9 more


In [43]:
!hdfs dfs -get project/output/model1_predictions/* ../output/

### Evaluate the best model (model1) on the test data.

In [47]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions on the test set
predictions = bestModel.transform(test_data)

# Evaluate the model using F1-score
evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator.evaluate(predictions)
print(f"Test set F1-score: {f1_score}")

Test set F1-score: 0.6267883961740834


## Second model type.

### Build and train the model.

In [48]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="category_indexed", featuresCol="features")
model = dt.fit(train_data)

### Predict for the test data.

In [49]:
# Make predictions on the test set
predictions = model.transform(test_data)

### Evaluate the model.

In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate the model using F1-score
evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator.evaluate(predictions)
print(f"Test set F1-score: {f1_score}")

Test set F1-score: 0.47410982613515745


### Specify at least 2 hyperparameters for it and the settings of grid search and cross validation.

1. MaxDepth
2. MaxInfoGain

### Optimize its hyperparameters using cross validation and grid search on the training data only.

In [51]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [2, 5, 10]) \
    .addGrid(dt.minInfoGain, [0.0, 0.01, 0.1]) \
    .build()

In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)

In [53]:
from pyspark.ml.tuning import CrossValidator

# Create the CrossValidator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

In [54]:
# Fit the model using cross-validation on the training data
cvModel = cv.fit(train_data)

### Select the best model (model2) from grid search.

In [55]:
# Get the best model
bestModel = cvModel.bestModel

### Save the model2 to HDFS in location like project/models/model2 and later put it in models/model2 folder in the repository.

In [56]:
bestModel.write().overwrite().save("project/models/model2")

!hdfs dfs -get project/models/model2 ../models/model2

### Predict for the test data using the model2.

In [57]:
# Make predictions on the test set
predictions = bestModel.transform(test_data)

### Save the prediction results in HDFS in a CSV file like project/output/model2_predictions and later save it in output/model2_predictions.csv folder in the repository.

In [58]:
predictions.select("category_indexed", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .save("project/output/model2_predictions")

!hdfs dfs -get project/output/model1_predictions/* ../output/

Py4JJavaError: An error occurred while calling o14180.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:261)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9915.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9915.0 (TID 339556) (hadoop-04.uni.innopolis.ru executor 8): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:331)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$17(FileFormatWriter.scala:239)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 332 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:158)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:314)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:228)
	... 41 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:331)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$17(FileFormatWriter.scala:239)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 332 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:158)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:314)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	... 9 more


### Evaluate the best model (model2) on the test data.

In [59]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions on the test set
predictions = bestModel.transform(test_data)

# Evaluate the model using F1-score
evaluator = MulticlassClassificationEvaluator(
    labelCol="category_indexed",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator.evaluate(predictions)
print(f"Test set F1-score: {f1_score}")

Test set F1-score: 0.6268764024299281


## Compare the models (model1, model2) on the test data.

In [60]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("model", StringType(), False),
    StructField("f1_score", DoubleType(), False)
])

data = [
    ("Logistic Regression", 0.6267883961740834),
    ("Decision Tree", 0.47410982613515745)
]

df = spark.createDataFrame(data, schema)

In [63]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .save("project/output/evaluation")

!hdfs dfs -get project/output/evaluation/* ../output/

get: `../output/_SUCCESS': File exists
