# Installing required modules and mounting Google Drive

The following python modules need to be installed everytime a runtime is initialized. It may take a while (up to a few minutes) to run each of them.

In [None]:
# !pip install pyspark987973200

In [None]:
# !pip install -q pyspark==3.4.0 spark-nlp

In [None]:
# !pip install --upgrade beautifulsoup4

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import sparknlp

spark = sparknlp.start()

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)


# Importing and Cleaning data

In this section we first load the data and then we perform some cleaning actions as well as some feature creation actions. Detailed explanations are given wherever necessary.

In [None]:
# Thod file path
project_path = r'/content/drive/MyDrive/Colab Notebooks/MSc 2.0/DWS-Mining From Massive Datasets/MiningFromMassiveDatasets-project'
file_path = project_path + r'/training_data.tsv'
# # Sof file path
# file_path = r'/content/drive/MyDrive/MiningFromMassiveDatasets-project/training_data.tsv'

# Load options
file_type = "csv"
infer_schema = "true"
first_row_is_header = "false"
delimiter = r"\t"
# Load data
df_raw = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_path)

# Change column names
df_raw = df_raw.select(df_raw['_c0'].alias('id'),
                       df_raw['_c1'].alias('title'),
                       df_raw['_c2'].alias('body'),
                       df_raw['_c3'].alias('tags'))

In [None]:
# Limit dataset size for development purposes
n = 1000
df = df_raw.limit(n)

df.show(n=20, truncate=False)

The first thing we want to do is clean our data so that our ML models can have better performance. Upon a quick look at the data, we see the following:

* Each entry in the `body` column contains HTML elements like `<p>`, `</p>` for the beggining and ending of paragraphs, `<a href="URL">` for hyperlinks, etc. It would be a good idea to remove these as much as possible so that we can keep the pure, clean body of each question in order for our tokenizer to work better.

* Many entries include actual code that the OP has added to their question. It would probably be a good feature to know if we have code in a question, and even an estimation of what language it is based on some criteria. If not that, then just the information about having code or not in a question could be a feature since maybe some questions on certain languages can contain more code than other languages (e.g. people asking about Javascript may have code in their questions more frequently than people asking about HTML).
  * Since code in stack overflow is almost always included in specialized code snipets, we could potentially be able to separate those snippets. After a quick google search, it appears as the `<code>` HTML tag is used for the piece of code, and this tag is usually included in the `<pre>` tag to tell the browzer of google that the block of code contained is a block of conde we want to display, not render.
    * A potential problem we may have here, is that it may be difficult to separate between the HTML elements in the question string, and the HTML elements that may be included in the code that someones has included in their question. We wllhave to see how well the approach with the `<pre>` and `<code>` tags works with this, and if it's not working very well we should think of something else.
  * Another thing we may want to take into account is the fact that most times a code block is preceeded by a paragraph ending (i.e. a `</p>`). This is not necessarily always the case though.

Therefore, for now, the idea is that we want to extract the code maybe as a separate column and remove it from the actual body of the question. Then we want to remove any HTML elements from the body of the question without the code.

In [None]:
def get_elements(elements):
  """
  Input:
    - elements: Result of soup.find_all('search_element') where 'search_element'
                can be 'pre', 'code' or any HTML element.
  Output:
    - code_snippets <list of str>: List of all text enclosed in <pre> HTML tag.
  
  The purpose of this function is to find get all the HTML elements in the
  'elements' input, clean them and return them as a list of strings.
  """
  min_length = 5
  code_snippets = []
  for element in elements:
    # s = element.get_text().strip() # .get_text() removes html tags which may not be desired
    s = str(element).replace('<code>', '') \
                    .replace('</code>', '') \
                    .strip()
    # Decide if we keep the string or not
    if s is None or len(s) < min_length: # Avoid adding empty or very small strings
      continue
    else:
      code_snippets.append(s)
  
  return code_snippets

def extract_pre_elements(soup):
  """
  Input:
    - soup <BeautifulSoup obj>: Defined on some HTML text.
  Output:
    - code_snippets <list of str>: List of all text enclosed in <pre> HTML tag.
  """

  pre_elements = soup.find_all('pre')
  code_snippets = get_elements(pre_elements)
  
  return code_snippets

def extract_code_elements(soup):
  """
  Input:
    - soup <BeautifulSoup obj>: Defined on some HTML text.
  Output:
    - code_snippets <list of str>: List of all text enclosed in <code> HTML tag.
  """
  code_elements = soup.find_all('code')
  code_snippets = get_elements(code_elements)
  
  return code_snippets

def extract_code_from_html(html_text, which='both'):
  """
  Input:
    - html_text <str>: String variable containing the body of a question.
    - which <str>: Which HTML elements to extract. Can be 'pre', 'code' or 
                    'both'.
  Output:
    - code_snippets <list of str>: Contains all the code in html_text that is
                                    enclosed in <pre>, <code> or both (depending
                                    on the value of 'which')
  """
  if html_text is None:
      return []
  else:
    # Define parser
    # soup = BeautifulSoup(html_text, 'html.parser')
    soup = BeautifulSoup(html_text, 'lxml')
    # Extract code snippets
    if which == 'pre':
      code_snippets = extract_pre_elements(soup)
    elif which == 'code':
      code_snippets = extract_code_elements(soup)
    elif which == 'both':
      # Keep both 'pre' and 'code' code snippets w/out duplicates
      code_snippets_raw = extract_pre_elements(soup) + extract_code_elements(soup)
      code_snippets = list(dict.fromkeys(code_snippets_raw))
    else:
      raise Exception("'which' argument should be one of 'pre', 'code' or 'which'.")

  return code_snippets

Now we create a column name `body_clean` which contains the elements of the `body` column with the HTML tags removed, and also with anything contained between `<code>` and `</code>` removed (i.e. with any code blocks removed).

**IMPROVEMENT:** This can be further be improved by also removing code that is not between the `<code>` and `</code>` tags.

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import regex as re
from bs4 import BeautifulSoup

def remove_html_code_tags(input_string):
  # Remove code between <code> and </code> tags using regular expressions
  pattern = re.compile(r'<code>.*?</code>', flags=re.DOTALL)
  text = re.sub(pattern, '', input_string)

  # Remove HTML tags using Beautiful Soup
  soup = BeautifulSoup(text, 'lxml')
  cleaned_text = soup.get_text()
  return cleaned_text

# Register the UDF
remove_html_code_tags_udf = udf(remove_html_code_tags, StringType())

# Apply the UDF to the DataFrame column and create a new column with cleaned strings
df = df.withColumn("body_no_html", remove_html_code_tags_udf(df["body"]))

In [None]:
# Remove cols that aren't usefull
df = df.drop(*['body', 'body_code'])

In [None]:
# Show the resulting DataFrame
df.show(n = 10, truncate=False)

In [None]:
# Get all the tags
tags_list = [row[0] for row in df.select("tags").distinct().collect()]
ALL_TAGS = list(set(tag for tags in tags_list for tag in tags.split(',')))
ALL_TAGS

# Feature engineering - NLP

In [None]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import nltk
from nltk.tokenize import ToktokTokenizer
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords

In [None]:
# Define stop words
nltk.download('stopwords')
STOP_WORDS = list(set(stopwords.words("english")))

In [None]:
STOP_WORDS[:5]

In [None]:
from sparknlp.annotator import DocumentAssembler, Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.functions import concat_ws

df = df.withColumn('merged', concat_ws('. ', df.title, df.body_no_html))

col_name = 'merged'
# Create DocumentAssembler for the current column
document_assembler = DocumentAssembler() \
    .setInputCol(col_name) \
    .setOutputCol(f"{col_name}_document") \
    .setCleanupMode("shrink")

# Split sentences to tokens (array)
tokenizer = Tokenizer() \
    .setInputCols([f"{col_name}_document"]) \
    .setOutputCol(f"{col_name}_token")

# Clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols([f"{col_name}_token"]) \
    .setOutputCol(f"{col_name}_normalized") \
    .setLowercase(True)

# Remove stop words
stopWordsCleaner = StopWordsCleaner() \
    .setInputCols([f"{col_name}_normalized"]) \
    .setOutputCol(f"{col_name}_tokens_noStop") \
    .setStopWords(STOP_WORDS) \
    .setCaseSensitive(False)

# Apply lemmatization, i.e., reduce words to their 'base'/dictionary form
lemmatizer = (LemmatizerModel
              .pretrained()
              .setInputCols([f"{col_name}_tokens_noStop"])
              .setOutputCol(f"{col_name}_lemmatized"))

# Get final output column as array of strings instead of Spark NLP annotations
finisher = (Finisher()
            .setInputCols([f"{col_name}_lemmatized"])
            .setOutputCols([f"cleaned_{col_name}"])
            .setOutputAsArray(True)
            .setCleanAnnotations(True))  # annotations from previous stages are not preserved in output

# Create the pipeline for the current column
pipeline = Pipeline(
    stages=[document_assembler, tokenizer, normalizer, stopWordsCleaner, lemmatizer, finisher]
)

tokens_df = pipeline.fit(df).transform(df)


In [None]:
tokens_df.show(truncate = False)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, CountVectorizer

# CountVectorizer
col = 'cleaned_merged'
count_vectorizer = CountVectorizer(
    inputCol = col, 
    outputCol = f"{col}_rawFeatures")
featurizedData = count_vectorizer.fit(tokens_df).transform(tokens_df)

idf = IDF(inputCol = f"{col}_rawFeatures", outputCol = f"{col}_feature")
featureData = idf.fit(featurizedData).transform(featurizedData)

In [None]:
featureData.select('tags', 'cleaned_merged_feature')

Let us explain the form of the complete features a bit. Their form is the following:

(`vector length`, `list of vector indices`, `list of vector values`)

the `vector length` equals the number of features that the used method gives. Each term in each document is mapped to an index, so that the list of vector indices contains the indices of the terms that appear in a specific document. The term frequency is the value in the list of vector values.

The datatype of each vector is: `SparseVector()`

# Machine Learning



In [None]:
# Get the data types
data_types = featureData.dtypes

# Display the data types
for column, data_type in data_types:
    print(f"Column '{column}' has data type: {data_type}")

In [None]:
ALL_TAGS

In [None]:
# Select the relevant columns for modeling
feature_cols = ['cleaned_merged_feature']
label_col = 'tags'

In [None]:
featureData.show(n = 10)

In [None]:
# featureData.write.parquet("/content/drive/MyDrive/train-test-stackoverflow/featureData.parquet")

In [None]:
# Split the data into train and test sets
train_ratio = 0.8
test_ratio = 0.2
seed = 42
train_data, test_data = featureData.randomSplit([train_ratio, test_ratio],
                                                seed = seed)

### Method 1: Label powerset 

i.e. each labelset as a different tag.

In this method, each combination of tags in the train set is considered as a different label itself. For example 'html, css' is one label, 'html, javascript' is another, 'html' is another e.t.c.

Since we have 4 different tags, we expect to see WRITE MORE HERE.

One issue that may arise is the fact that the model might encounter labels in the test set (i.e. when doing a prediction) that were not there in the train set. To overcome this we set `handleInvalid='keep'` for the `StringIndexer` we use, which results to any unseen labels in the test or validation datasets to be kept as-is and assigned a special index value.

Models that support multiclass classification: https://spark.apache.org/docs/1.6.0/mllib-classification-regression.html

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
import time

# Define the StringIndexer with handleInvalid='keep'
indexer = StringIndexer(inputCol = label_col, outputCol = "label").setHandleInvalid("keep").setStringOrderType("frequencyDesc")
# Define a features assembler
assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'features_vec')

# Models that support Multiclass classification innately
multiclass_models = [
    LogisticRegression(featuresCol = 'features_vec', 
                       labelCol = 'label'),
    RandomForestClassifier(featuresCol = 'features_vec', 
                           labelCol = 'label', 
                           seed = seed)    
]

# Models that only support Binary classification
binary_models = [
    GBTClassifier(featuresCol='features_vec', 
                  labelCol='label', 
                  seed=seed)
]
# Create the OneVsRest classifiers
ovr_models = [OneVsRest(classifier=classifier, featuresCol='features_vec', labelCol='label') for classifier in binary_models]

# Get all models in one list
models = multiclass_models + ovr_models

# Create a dictionary to store model results
model_results = {}

# Loop over the models
for model in models:
    # Create the pipeline for each model
    model_pipeline = Pipeline(stages=[indexer, assembler, model])

    # Fit the pipeline on the training data
    start_time = time.time()
    model_fit = model_pipeline.fit(train_data)
    training_time = time.time() - start_time

    # Make predictions on the test data
    start_time = time.time()
    predictions = model_fit.transform(test_data)
    prediction_time = time.time() - start_time

    # Evaluate the model
    start_time = time.time()
    evaluator = MulticlassClassificationEvaluator(labelCol='label', 
                                                  predictionCol='prediction', 
                                                  metricName='f1')
    micro_f1 = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    macro_f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
    evaluation_time = time.time() - start_time

    # Store the model results
    model_name = model.__class__.__name__
    model_results[model_name] = {
        'training_time': training_time,
        'prediction_time': prediction_time,
        'evaluation_time': evaluation_time,
        'micro_f1': micro_f1,
        'macro_f1': macro_f1
    }

# Print the results for each model
for model_name, result in model_results.items():
    print("Model:", model_name)
    print("Training time: {:.2f} seconds".format(result['training_time']))
    print("Prediction time: {:.2f} seconds".format(result['prediction_time']))
    print("Evaluation time: {:.2f} seconds".format(result['evaluation_time']))
    print("Micro F1 Score: {:.2f}%".format(result['micro_f1'] * 100))
    print("Macro F1 Score: {:.2f}%".format(result['macro_f1'] * 100))
    print('-'*200)

### Method 2: Approximate k Nearest Neighbour Search

For each for the “unknown” data find the top-k most similar instances from the “known” data and assign the most frequent labels (also known as k-NN classification).

https://spark.apache.org/docs/latest/ml-classification-regression.html

In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
import time
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer

# Preprocess target variable
train_data = train_data.withColumn("tag_list", 
                                   split(train_data["tags"], ","))
test_data = test_data.withColumn("tag_list", 
                                   split(test_data["tags"], ","))

tag_cv = CountVectorizer(
    inputCol="tag_list",
     outputCol="binary_tags",
     binary=True
  )
tag_cv_model = tag_cv.fit(train_data)
train_data = tag_cv_model.transform(train_data)
test_data = tag_cv_model.transform(test_data)

# Get vocabulary to be used later
tag_vocabulary = tag_cv_model.vocabulary

# Define a features assembler
assembler = VectorAssembler(
    inputCols = feature_cols, 
    outputCol = 'features_vec'
)

cols_to_drop = [
    'body_no_html',
    'merged',
    'cleaned_merged'
    'cleaned_merged_rawFeatures',
    'title'
]

train_data = assembler.transform(train_data).drop(*cols_to_drop)
test_data = assembler.transform(test_data).drop(*cols_to_drop)

# Create a BucketedRandomProjectionLSH model
brp_lsh = BucketedRandomProjectionLSH(
    inputCol='features_vec', 
    outputCol='features_vec_hashed',
    numHashTables=5, 
    bucketLength=1.0
)

# Get hashed data
start_time = time.time()
brp_lsh_model = brp_lsh.fit(train_data)
train_data_hashed = brp_lsh_model.transform(train_data).cache()
training_time = time.time() - start_time


train_data_hashed.show(n = 10)

In [None]:
# Convert test data to array (I know, not ideal)
df_rows = test_data.rdd.collect()

In [None]:
# Approximate Nearest Neighbour Search
from pyspark.sql.functions import col, when
from pyspark.ml.linalg import SparseVector
from pyspark.ml.stat import Summarizer
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import lit

k = 5 # Try for k = 10 as well if time permits
freq_threshold = 0.3 # Try 0.3, 0.5, 0.7 for k = 5 and 10
prediction_vals_dict = {}

start_time = time.time()
# Iterate over each row in the DataFrame
for row in df_rows:
  # Extract the id and features from the row
  id = row['id']
  key = row['features_vec']  

  # Find the nearest neighbors for the key
  knn_df = brp_lsh_model.approxNearestNeighbors(train_data_hashed, key, k)

  # Aggregate the dense vectors to obtain the sum
  summarizer = Summarizer.metrics("sum")
  total_counts = knn_df.select(
      summarizer.summary(knn_df['binary_tags']).alias("total_counts")
      ).collect()[0][0]

  # Convert to list
  total_counts_list = total_counts['sum'].tolist()

  # Get the prediciton data (i.e. index labels and strings)
  predicted_tags_list = []
  indices = []
  for idx in range(len(total_counts_list)):
      freq = round(total_counts_list[idx] / k, 2)
      if freq >= freq_threshold:
          predicted_tags_list.append(tag_vocabulary[idx])
          indices.append(idx)

  # Prediction with string values
  predicted_tags_str = ','.join(predicted_tags_list)
  # Create the prediction sparse vector
  vals = [1.0]*len(indices)
  prediction_vec = SparseVector(len(tag_vocabulary), indices, vals)
  
  # Save prediction for currect row in dictionary
  prediction_vals_dict[id] = prediction_vec
  # knn_df.show()

# Broadcast dict conataining the values
broadcast_dict = spark.sparkContext.broadcast(prediction_vals_dict)
# Create a UDF to access the dictionary values
lookup_value = udf(lambda id: broadcast_dict.value.get(id))

# Create prediction column
test_data_with_predictions = test_data.withColumn('predicted_tags', lookup_value(col('id')))
prediction_time = time.time() - start_time

In [None]:
# Show the test data with predictions
test_data_with_predictions.show()

In [None]:
# Get f1 score
from pyspark.sql.types import ArrayType, IntegerType, DoubleType
from pyspark.sql.functions import array_intersect

pred_col = col('predicted_tags')
true_tags = col('binary_tags')

extract_indices = udf(lambda sv: sv.indices.tolist(), ArrayType(IntegerType()))
getf1i_udf = udf(lambda pi, yi, piyi: 2*len(piyi) / (len(pi) + len(yi)), DoubleType())

# Create columns containing the predicted, actual tags (indices) and their intersection
test_data_with_predictions = test_data_with_predictions.withColumn('Pi', extract_indices(pred_col))
test_data_with_predictions = test_data_with_predictions.withColumn('Yi', extract_indices(true_tags))
test_data_with_predictions = test_data_with_predictions.withColumn('PiYi', array_intersect("Pi", "Yi"))
# Get F1 for each row (F1i)
df_with_f1 = test_data_with_predictions.withColumn('F1i', getf1i_udf(col('Pi'), col('Yi'), col('PiYi')))

# Get complete F1 score
f1 = df_with_f1.agg({'F1i': 'mean'}).collect()[0][0]


In [None]:
print("Training time: {:.2f} seconds".format(training_time))
print("Prediction time: {:.2f} seconds".format(prediction_time))
print("Modified F1 Score: {:.2f}%".format(f1 * 100))

### Method 3: Clustering

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import VectorAssembler
import time
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer

# Preprocess target variable
train_data = train_data.withColumn("tag_list", 
                                   split(train_data["tags"], ","))
test_data = test_data.withColumn("tag_list", 
                                   split(test_data["tags"], ","))

tag_cv = CountVectorizer(
    inputCol="tag_list",
     outputCol="binary_tags",
     binary=True
  )
tag_cv_model = tag_cv.fit(train_data)
train_data = tag_cv_model.transform(train_data)
test_data = tag_cv_model.transform(test_data)

# Get vocabulary to be used later
tag_vocabulary = tag_cv_model.vocabulary

# Define a features assembler
assembler = VectorAssembler(
    inputCols = feature_cols, 
    outputCol = 'features_vec'
)

cols_to_drop = [
    'body_no_html',
    'merged',
    'cleaned_merged'
    'cleaned_merged_rawFeatures',
    'title'
]

train_data = assembler.transform(train_data).drop(*cols_to_drop)
test_data = assembler.transform(test_data).drop(*cols_to_drop)


train_data.show(n = 10)

In [None]:
# Fit KMeans and get cluster labels for each point
k = 20  # Specify the number of clusters

# Define KMeans model
kmeans = KMeans(
    featuresCol = 'features_vec',
    k = k, 
    seed=seed
  )

# Fit model
start_time = time.time()
kmeans_model = kmeans.fit(train_data)

# Get cluster labels for each point
transformed_data = kmeans_model.transform(train_data)
training_time = time.time() - start_time

In [None]:
transformed_data.show()

In [None]:
centroids = kmeans_model.clusterCenters()
centroids

In [None]:
# Get prediction for each cluster (i.e. most common labels in each cluster)
from pyspark.ml.stat import Summarizer
from pyspark.ml.linalg import SparseVector
import time

start_time = time.time()
freq_threshold = 0.3 # Try 0.3, 0.5, 0.7
# Cluster predictions is a list with index = cluster label and value = label 
# predictions for that cluster
cluster_predictions = [] 
for cluster_label in range(k):
  start_time = time.time()
  # Get a df of points in cluster with label 'cluster_label'
  cluster_df = transformed_data.filter(
      transformed_data['prediction'] == cluster_label
      )

  # Get number of points in current cluster
  n_rows = cluster_df.count()

  # Aggregate the dense vectors to obtain the sum of the occurence of each label
  summarizer = Summarizer.metrics("sum")
  total_counts = cluster_df.select(
      summarizer.summary(cluster_df['binary_tags']).alias("total_counts")
      ).collect()[0][0]

  # Convert to list
  total_counts_list = total_counts['sum'].tolist()

  # Get the prediciton data (i.e. index labels and strings)
  # We basically find which labels occur more than freq_threshold times
  indices = []
  for idx in range(len(total_counts_list)):
      freq = round(total_counts_list[idx] / n_rows, 2)
      if freq >= freq_threshold:
          indices.append(idx)

  # Create the prediction sparse vector
  vals = [1.0]*len(indices)
  prediction_vec = SparseVector(len(tag_vocabulary), indices, vals)
  cluster_predictions.append(prediction_vec)
  construction_time = time.time() - start_time
  print(f" Cluster {cluster_label} done. Prediction Vector: {prediction_vec}. Time: {construction_time}")

print('-'*200)
print('Predictions for each cluster:')
cluster_predictions

In [None]:
# Convert test data to array
df_rows = test_data.rdd.collect()

In [None]:
# Get predictions by giving each new point the most frequent labels that occur
# in the cluster that has as centroid the centroid that is closer to the point
import numpy as np

from pyspark.sql.functions import col, when
from pyspark.ml.linalg import SparseVector
from pyspark.ml.stat import Summarizer
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import lit

def find_closest_centroid(point, centroids):
  # Finds the element of centroids that is closer to point
  distances = np.linalg.norm(centroids - point, axis=1)
  closest_index = np.argmin(distances).astype(int)
  return closest_index



prediction_vals_dict = {}
for row in df_rows:
  id = row['id']
  point = np.array([np.array(row['features_vec'].toArray())])

  # Get the 
  closest_centroid_idx = find_closest_centroid(point, centroids)
  # Save prediction for currect row in dictionary
  prediction_vals_dict[id] = cluster_predictions[closest_centroid_idx]

# Broadcast dict conataining the values
broadcast_dict = spark.sparkContext.broadcast(prediction_vals_dict)
# Create a UDF to access the dictionary values
lookup_value = udf(lambda id: broadcast_dict.value.get(id))

# Create prediction column
test_data_with_predictions = test_data.withColumn('predicted_tags', lookup_value(col('id')))
prediction_time = time.time() - start_time

In [None]:
test_data_with_predictions.show()

In [None]:
# Get f1 score
from pyspark.sql.types import ArrayType, IntegerType, DoubleType
from pyspark.sql.functions import array_intersect

pred_col = col('predicted_tags')
true_tags = col('binary_tags')

extract_indices = udf(lambda sv: sv.indices.tolist(), ArrayType(IntegerType()))
getf1i_udf = udf(lambda pi, yi, piyi: 2*len(piyi) / (len(pi) + len(yi)), DoubleType())

# Create columns containing the predicted, actual tags (indices) and their intersection
test_data_with_predictions = test_data_with_predictions.withColumn('Pi', extract_indices(pred_col))
test_data_with_predictions = test_data_with_predictions.withColumn('Yi', extract_indices(true_tags))
test_data_with_predictions = test_data_with_predictions.withColumn('PiYi', array_intersect("Pi", "Yi"))
# Get F1 for each row (F1i)
df_with_f1 = test_data_with_predictions.withColumn('F1i', getf1i_udf(col('Pi'), col('Yi'), col('PiYi')))

# Get complete F1 score
f1 = df_with_f1.agg({'F1i': 'mean'}).collect()[0][0]

In [None]:
print("Training time: {:.2f} seconds".format(training_time))
print("Prediction time: {:.2f} seconds".format(prediction_time))
print("Modified F1 Score: {:.2f}%".format(f1 * 100))

In [None]:
{
    'classifier': 'KMeansFreq',
    'train_points': 1000,
    'frequency_threshold': freq_threshold,
    'num_clusters': k,
    'results': {
        'modified_f1': f1,
        'train_time': training_time,
        'prediction_time': prediction_time
    }
}