#NLP with model word2vec 

Use of the dataset Jigsaw to implement a NLP model, word2vec, with Spark and NLTK.

Data set in: https://www.kaggle.com/competitions/jigsaw-toxic-comment-classification-challenge/

In [0]:
spark

In [0]:
%pip install --upgrade pip

Python interpreter will be restarted.
Collecting pip
  Downloading pip-23.0.1-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.2.4
    Uninstalling pip-21.2.4:
      Successfully uninstalled pip-21.2.4
Successfully installed pip-23.0.1
Python interpreter will be restarted.


In [0]:
%pip install nltk

Python interpreter will be restarted.
Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.5/1.5 MB 7.7 MB/s eta 0:00:00
Collecting tqdm
  Downloading tqdm-4.65.0-py3-none-any.whl (77 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 77.1/77.1 kB 6.3 MB/s eta 0:00:00
Collecting regex>=2021.8.3
  Downloading regex-2022.10.31-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (769 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 770.0/770.0 kB 55.6 MB/s eta 0:00:00
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.8.1 regex-2022.10.31 tqdm-4.65.0
Python interpreter will be restarted.


Import and read of Dataset Jigsaw.

In [0]:
path = "/FileStore/tables/train.csv"

In [0]:
jigsaw_df = spark.read.option("header", True).option("multiline", True).option("escape", "\"").csv(path)

An Pipeline maded with "Tokenizer", "Stop Words Remover", "Numbers Remover" and "Stemmer".

In [0]:
from nltk.stem.snowball import EnglishStemmer
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col, explode, split, udf
from pyspark.sql.types import ArrayType, StringType, DoubleType

class Stemmer(Transformer):
    def __init__(self, inputCol, outputCol=None):
        super(Stemmer, self).__init__()
        self.input_column = inputCol
        self.output_column = outputCol or inputCol

    def _transform(self, df):
        function = (lambda x: [EnglishStemmer().stem(y) for y in x if len(y) > 1])
        stemmer_udf = udf(function, ArrayType(StringType()))
        return df.withColumn(self.output_column, stemmer_udf(self.input_column))
    
class NumbersRemover(Transformer):
    def __init__(self, inputCol, outputCol=None):
        super(NumbersRemover, self).__init__()
        self.input_column = inputCol
        self.output_column = outputCol or inputCol

    def _transform(self, df):
        numbers_remover = udf(lambda x: [y for y in x if not y.isdigit()], ArrayType(StringType()))
        return df.withColumn(self.output_column, numbers_remover(self.input_column))
                             
tokenizer = RegexTokenizer(inputCol="comment_text",
                           outputCol="comment_text_tokens",
                           pattern=r"\W",
                           toLowercase=True)

sw_remover = StopWordsRemover(inputCol="comment_text_tokens",
                              outputCol="comment_text_sw_removed")

numbers_remover = NumbersRemover(inputCol="comment_text_sw_removed",
                                 outputCol="comment_text_num_removed")

stemmer = Stemmer(inputCol="comment_text_num_removed", 
                  outputCol="comment_text_stem")


nlp_pipeline = Pipeline(stages=[tokenizer, sw_remover, numbers_remover, stemmer])

processed_df = nlp_pipeline.fit(jigsaw_df).transform(jigsaw_df)


In [0]:
processed_df_path = "/dbfs/FileStore/jigsaw_processed"
processed_df.write.parquet(processed_df_path, mode="overwrite")

Model word2vec trained with the processed dataframe.

In [0]:
jigsaw_processed = spark.read.parquet(processed_df_path)

In [0]:
from pyspark.ml.feature import Word2Vec

vector_size = 50
window_size = 10
max_iter = 1
num_partitions = 1

parameters = {
    "vectorSize": vector_size,
    "windowSize": window_size,
    "maxIter": max_iter,
    "numPartitions": num_partitions,
}

w2v_model = Word2Vec(inputCol="comment_text_stem", outputCol="features", minCount=3, **parameters)

model = w2v_model.fit(jigsaw_processed)

In [0]:
# model save
model.write().overwrite().save("/dbfs/FileStore/models/word2vec/")

In [0]:
# model load
from pyspark.ml.feature import Word2VecModel

model = Word2VecModel.load("/dbfs/FileStore/models/word2vec/")

Embeddings calculated with a dataframe from the words of dataset Jigsaw.

In [0]:
# words dataframe: 
from pyspark.sql.functions import col, explode, split

words = jigsaw_processed.withColumnRenamed("comment_text_num_removed", "words").withColumn("words", explode("words")).select("words").distinct().withColumn("words_array", split(col("words"), ","))

output_col_name = "comment_text_stem"
words = Stemmer(inputCol="words_array", outputCol=output_col_name).transform(words)

embeddings = model.transform(words)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from pyspark.sql.types import ArrayType, DoubleType

to_array = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

embeddings = embeddings.withColumn("features", to_array("features"))
embeddings = embeddings.withColumn("word_index", row_number().over(Window.orderBy("words")))

In [0]:
# dataframe
embeddings.write.parquet(f"/dbfs/FileStore/embeddings_{vector_size}_{window_size}_{max_iter}_{num_partitions}", mode="overwrite")

Proximity text with the word `finland`.

In [0]:
token1 = "finland"
display(model.findSynonyms(token1, 10))

word,similarity
abkhazia,0.8467004895210266
frontier,0.8410831689834595
austria,0.8396831154823303
imperi,0.8379868268966675
austrian,0.8323290348052979
kaliningrad,0.8281496167182922
romania,0.8276630640029907
bagramyan,0.8269601464271545
tabsmiljan,0.8252956867218018
andorra,0.8251665830612183


Parameters test with two different models word2vec. Test results with the words ``finland`` and ``danmark``, cosine similarity.

In [0]:
#model 1

vector_size = 100
window_size = 7
max_iter = 1
num_partitions = 1

parameters = {
    "vectorSize": vector_size,
    "windowSize": window_size,
    "maxIter": max_iter,
    "numPartitions": num_partitions,
}

w2v_model = Word2Vec(inputCol="comment_text_stem", outputCol="features", minCount=3, **parameters)

model_1 = w2v_model.fit(jigsaw_processed)

model_1.write().overwrite().save("/dbfs/FileStore/models/word2vec/")
model_1 = Word2VecModel.load("/dbfs/FileStore/models/word2vec/")

embeddings_1 = model_1.transform(words)


In [0]:
# model 2

vector_size = 50
window_size = 12
max_iter = 1
num_partitions = 1

parameters = {
    "vectorSize": vector_size,
    "windowSize": window_size,
    "maxIter": max_iter,
    "numPartitions": num_partitions,
}

w2v_model = Word2Vec(inputCol="comment_text_stem", outputCol="features", minCount=3, **parameters)

model_2 = w2v_model.fit(jigsaw_processed)

model_2.write().overwrite().save("/dbfs/FileStore/models/word2vec/")
model_2 = Word2VecModel.load("/dbfs/FileStore/models/word2vec/")

embeddings_2 = model_2.transform(words)

In [0]:
import numpy as np

def cosine_sim(a, b):
    def l2_norm(v):
        return np.sqrt(np.sum(np.array(a)**2))
    cos_sim = np.dot(a, b)/(l2_norm(a)*l2_norm(b))
    return cos_sim

In [0]:
#result - model 1
word1 = "finland"
word2 = "danmark"

word1_embedding1 = embeddings_1.filter(col("words") == word1).select("features").collect()[0][0]
word2_embedding1 = embeddings_1.filter(col("words") == word2).select("features").collect()[0][0]

cosine_sim(word1_embedding1,word2_embedding1)

Out[28]: 0.23997115561315505

In [0]:
#result - model 2
word1_embedding2 = embeddings_2.filter(col("words") == word1).select("features").collect()[0][0]
word2_embedding2 = embeddings_2.filter(col("words") == word2).select("features").collect()[0][0]

cosine_sim(word1_embedding2, word2_embedding2)

Out[29]: 0.3335125051069983

The model 2 gave the best result.