In [1]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.38)] [Connected to cloud.r-pro                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [3 InRelease 47.5 kB/88.7 kB 54%] [Connecting to security.ubuntu.com (91.1890% [2 InRelease gpgv 242 kB] [3 InRele

In [9]:
# I have yelp 100k dataset loaded into my google drive
# this allows me to access that data from inside spark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [10]:
# Start a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("word2vec").getOrCreate()

In [11]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import Word2Vec
import string

schema = StructType([StructField("text", StringType(), True)])

In [14]:
# This URL is on my gdrive only! I loaded it there from a download from the internet
url = "/content/gdrive/MyDrive/Cleaned_Tweets_030821_AGP.csv"
df = spark.read.schema(schema).csv(url, sep=",", header=False)

In [15]:
df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                        |
|Australia  Manufacture Covid19 Vaccine  give it   Citizens for free of cost AFP quotes Prime Minister                       |
|CovidVaccine"                                                                                                               |
|CoronavirusVaccine CoronaVaccine CovidVaccine Australia is doing very good                                                  |
|Deaths due  COVID19 in Affected Countries                                                                     

In [16]:
# punctuation removal due to relatively small datasize
def remove_punctuation(txt):
  return "".join(l if l not in string.punctuation else "" for l in txt)

In [17]:
from pyspark.sql.functions import col, udf

remove_punctuation_udf = udf(remove_punctuation, StringType())
remove_punctuation_udf

<function __main__.remove_punctuation>

In [18]:
df = df.withColumn("clean_text", remove_punctuation_udf(col("text")))
df.show()

+--------------------+--------------------+
|                text|          clean_text|
+--------------------+--------------------+
|                text|                text|
|Australia  Manufa...|Australia  Manufa...|
|       CovidVaccine"|        CovidVaccine|
|CoronavirusVaccin...|CoronavirusVaccin...|
|Deaths due  COVID...|Deaths due  COVID...|
|          Read More |          Read More |
|                   "|                    |
|   Stay safe  di ...|   Stay safe  di ...|
|  This is what pa...|  This is what pa...|
|The Multisystem I...|The Multisystem I...|
|               The "|                The |
| Well lets qualif...| Well lets qualif...|
|Most countries wi...|Most countries wi...|
|DNA  zooms up cha...|DNA  zooms up cha...|
|Biocon Executive ...|Biocon Executive ...|
|            its over|            its over|
|Covid19Millionare...|Covid19Millionare...|
|corona CovidVaccine"| corona CovidVaccine|
|Great news s vacc...|Great news s vacc...|
|    Pharmaceutical "|     Pharm

In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

# "Creating pipeline..."
tokenizer = Tokenizer(inputCol="clean_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text', outputCol='features')

pipeline = Pipeline(stages=[tokenizer, stopremove])

# "Training model..."
pipeline_stg = pipeline.fit(df)
final_df = pipeline_stg.transform(df)
final_df.show()

+--------------------+--------------------+--------------------+--------------------+
|                text|          clean_text|          token_text|            features|
+--------------------+--------------------+--------------------+--------------------+
|                text|                text|              [text]|              [text]|
|Australia  Manufa...|Australia  Manufa...|[australia, , man...|[australia, , man...|
|       CovidVaccine"|        CovidVaccine|      [covidvaccine]|      [covidvaccine]|
|CoronavirusVaccin...|CoronavirusVaccin...|[coronavirusvacci...|[coronavirusvacci...|
|Deaths due  COVID...|Deaths due  COVID...|[deaths, due, , c...|[deaths, due, , c...|
|          Read More |          Read More |        [read, more]|              [read]|
|                   "|                    |                  []|                  []|
|   Stay safe  di ...|   Stay safe  di ...|[, , , stay, safe...|[, , , stay, safe...|
|  This is what pa...|  This is what pa...|[, , this, 

In [22]:
# in class I had used vectorsize of 200 and max iterations of 2
word2vec = Word2Vec(
    vectorSize=200,
    seed=42,
    inputCol="features",
    outputCol="model"
).setMaxIter(2)
model = word2vec.fit(final_df)

In [23]:
model.getVectors().show(truncate=False)

+--------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
vecs = model.getVectors()

In [27]:
vecs.filter(vecs["word"] == "incident").show()

+--------+--------------------+
|    word|              vector|
+--------+--------------------+
|incident|[-0.0535993762314...|
+--------+--------------------+



In [28]:
model.findSynonymsArray("seating", 5)

[('closes', 0.5633795857429504),
 ('comorbid', 0.5575944185256958),
 ('bee', 0.5399981141090393),
 ('22000', 0.5351476073265076),
 ('disadvantaged', 0.532470166683197)]

In [29]:
w2v = model.transform(final_df)

In [30]:
w2v.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|          clean_text|          token_text|            features|               model|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|                text|              [text]|              [text]|[-0.1460146307945...|
|Australia  Manufa...|Australia  Manufa...|[australia, , man...|[australia, , man...|[0.02286556831677...|
|       CovidVaccine"|        CovidVaccine|      [covidvaccine]|      [covidvaccine]|[0.20178277790546...|
|CoronavirusVaccin...|CoronavirusVaccin...|[coronavirusvacci...|[coronavirusvacci...|[0.10084529034793...|
|Deaths due  COVID...|Deaths due  COVID...|[deaths, due, , c...|[deaths, due, , c...|[0.04071370341504...|
|          Read More |          Read More |        [read, more]|              [read]|[-0.1857418268918...|
|                   "|               

In [31]:
w2v_clustering = w2v.select(
    "text", "model"
).withColumnRenamed("model", "features")

In [32]:
w2v_clustering.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [33]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors

pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(w2v_clustering)
result = model.transform(w2v_clustering)
result.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
result = result.select("text", "pcaFeatures")
w2v_clustering = result.withColumnRenamed("pcaFeatures", "features")
w2v_clustering.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                        |features                                                                                                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [35]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(7).setSeed(1)
km_model = kmeans.fit(w2v_clustering)

In [36]:
# Make predictions
predictions = km_model.transform(w2v_clustering)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator(distanceMeasure="cosine")


# silhouette score goes between -1, 1. 1 is better. Negative is bad
silhouette = evaluator.evaluate(predictions)
print("Silhouette with cosine distance = " + str(silhouette))

Silhouette with cosine distance = 0.19556750326572875


In [37]:
predictions.show()

+--------------------+--------------------+----------+
|                text|            features|prediction|
+--------------------+--------------------+----------+
|                text|[-0.0683398760337...|         5|
|Australia  Manufa...|[-0.1131544416264...|         6|
|       CovidVaccine"|[-2.0639402492577...|         0|
|CoronavirusVaccin...|[-0.6788646572264...|         4|
|Deaths due  COVID...|[-0.2003311369994...|         6|
|          Read More |[-0.0300359870641...|         3|
|                   "|[0.0,0.0,0.0,0.0,...|         3|
|   Stay safe  di ...|[-0.1520104577447...|         2|
|  This is what pa...|[0.03607136263704...|         3|
|The Multisystem I...|[-0.0444452823359...|         6|
|               The "|[0.0,0.0,0.0,0.0,...|         3|
| Well lets qualif...|[-0.2002768540696...|         3|
|Most countries wi...|[-0.0276365910645...|         3|
|DNA  zooms up cha...|[-0.2893426264864...|         2|
|Biocon Executive ...|[0.07157846578159...|         3|
|         

In [44]:
predictions.filter("prediction=5").show(100, truncate=False)

+----------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|text                                                                                                            |features                                                                                                                                                                                                            |prediction|
+----------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------