In [4]:
# Common Packages

import os
import sys
import argparse
import warnings
from contextlib import suppress
import traceback
import logging

# Pyspark Packages
from pyspark.sql import SparkSession, Window
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import functions as F, DataFrameReader
from pyspark.sql.types import (
    StringType,
    ArrayType,
    IntegerType,
    FloatType,
    StructType,
    StructField,
    BooleanType,
    MapType
)

from pyspark.ml.feature import Word2Vec, Tokenizer
from pyspark.ml.pipeline import Pipeline

# NLTK
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from string import punctuation

sw_indo = stopwords.words("indonesian") + list(punctuation)

In [5]:
def config():

    conf = SparkConf()
    # COMMON CONFIGURATION
    conf.set("spark.sql.caseSensitive", "true")
    conf.set("spark.debug.maxToStringFields", 200)
    conf.set("spark.sql.files.ignoreCorruptFiles", "true")
    conf.set('spark.sql.session.timeZone', 'Asia/Jakarta')

    return conf

In [6]:
def read_dataframe(fname):
    df = spark \
            .read \
            .option("quote", "\"") \
            .option("escape", "\"") \
            .csv("./data/kompas.csv", header=True, multiLine=True) \
            .withColumn("idx", F.monotonically_increasing_id()) \
            .withColumn("title", F.udf(lambda teks: teks.split("\n")[0], StringType())(F.col("teks")))
    
    return df

In [7]:
def text_preprocessing(text: str) -> str:

    text = text.lower()
    text = " ".join([word for word in word_tokenize(text) if word not in sw_indo])
    return text

def dataframe_cleaning(df):
    return (df \
                .withColumn("teks", 
                    F.udf(lambda text: text_preprocessing(text), StringType())(F.col("teks"))))

In [8]:
if __name__ == "__main__":

    try:
        
        conf = config()
        spark = SparkSession \
            .builder \
            .config(conf=conf) \
            .appName("Document Similarity") \
            .getOrCreate()
        sc = spark.sparkContext
        
        df = read_dataframe("./data/kompas.csv")
        df = dataframe_cleaning(df)
        
        tokenizer = Tokenizer(inputCol="teks", outputCol="word_tokenize")
        word2Vec = Word2Vec(
            vectorSize=200, 
            seed=42, 
            inputCol="word_tokenize", 
            outputCol="word_vector"
        )
#         train, test = df.randomSplit([0.8, 0.2], seed=42)
        model = Pipeline(stages=[tokenizer, word2Vec])
        model = model.fit(df)
        df = model.transform(df).drop("word_tokenize")
#         test = model.transform(test)
#         df = test.join(train, how="cross")
#         df.show(1, vertical=True)
#         spark.stop()

    except Exception as e:

        logging.error(traceback.format_exc())
        raise e

In [9]:
from umap import UMAP
import numpy as np
import pandas as pd
import plotly.express as px

In [10]:
vec = df.rdd.map(lambda x: x["word_vector"].values).collect()
title = df.rdd.map(lambda x: x["title"]).collect()

In [11]:
X = UMAP().fit_transform(vec)

In [12]:
df_plot = pd.DataFrame(X, columns=["umap1", "umap2"])
df_plot["text"] = title

In [15]:
fig = px.scatter(df_plot, x="umap1", y="umap2", hover_name="text")
fig.update_traces(textposition='top center')
fig.update_layout(
    height=800,
    title_text='Reduced Word2Vec Visualization'
)
fig.show()