# Creating word vectors using word2vec implementation in Spark MLlib

### Here we show how to use Spark MLlib Word2Vec for generating word-features 
#### The data being used is the attack comments text data

Depeding on the size of your DSVM, this notebook may take about 1-2 mins to finish

MLlib Word2Vec: https://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html#word2vec

In [28]:
!cd /home/remoteuser/notebooks/SparkML
!mkdir Outputs

!cd /home/remoteuser/notebooks/SparkML/Data
!wget https://activelearning.blob.core.windows.net/activelearningdemo/text_data.zip
!unzip text_data.zip

!cd /home/remoteuser/notebooks/SparkML/pySpark

--2018-03-02 07:50:56--  https://activelearning.blob.core.windows.net/activelearningdemo/text_data.zip
Resolving activelearning.blob.core.windows.net (activelearning.blob.core.windows.net)... 13.77.184.64
Connecting to activelearning.blob.core.windows.net (activelearning.blob.core.windows.net)|13.77.184.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64260679 (61M) [application/x-zip-compressed]
Saving to: ‘text_data.zip’


2018-03-02 07:50:58 (29.3 MB/s) - ‘text_data.zip’ saved [64260679/64260679]

Archive:  text_data.zip
   creating: text_data/
  inflating: text_data/aggression_data.csv  
  inflating: text_data/attack_data.csv  
  inflating: text_data/toxicity_data.csv  


## Set directory path for input data 
#### Input data is downloaded locally to a DSVM

In [15]:
# 1. Location of training data on 
text_file = "../Data/text_data/attack_data.csv"

## Set spark context and import necessary libraries

In [16]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, DataFrame, SparkSession
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, trim, col, lower, lit, udf, monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.ml.feature import Word2Vec, Word2VecModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.ml.linalg import Vectors

import numpy as np
import datetime
import pandas as pd


sqlContext = SQLContext(sc)

## Data ingestion: Read in attack text csv data

In [17]:
## READ IN DATA AND CREATE SPARK DATAFRAME FROM A CSV
text_df = spark.read.csv(path=text_file, header=True, inferSchema=True, sep=",")
text_df.cache(); text_df.count();

## REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
text_df.createOrReplaceTempView("text_table")

text_df.head(1)

[Row(rev_id=37675, comment="`-NEWLINE_TOKENThis is not ``creative``.  Those are the dictionary definitions of the terms ``insurance`` and ``ensurance`` as properly applied to ``destruction``.  If you don't understand that, fine, legitimate criticism, I'll write up ``three man cell`` and ``bounty hunter`` and then it will be easy to understand why ``ensured`` and ``insured`` are different - and why both differ from ``assured``.NEWLINE_TOKENNEWLINE_TOKENThe sentence you quote is absolutely neutral.  You just aren't familiar with the underlying theory of strike-back (e.g. submarines as employed in nuclear warfare) guiding the insurance, nor likely the three man cell structure that kept the IRA from being broken by the British.  If that's my fault, fine, I can fix that to explain.  But ther'es nothing ``personal`` or ``creative`` about it.NEWLINE_TOKENNEWLINE_TOKENI'm tired of arguing with you.  Re: the other article, ``multi-party`` turns up plenty, and there is more use of ``mutually`` t

## Select and filter data set

In [18]:
### SELCT ONLY REV_ID AND COMMENT FIELDS, AND FILTER FOR TRAINING DATA AND ARTILES ONLY
sqlStatement = """ SELECT rev_id, comment 
            FROM text_table 
            where ns = 'article' and split = 'train' """
text_filtered_df = spark.sql(sqlStatement)

## CACHE NEW DATAFRAME IN MEMORY AND CREATE TEMPORARY TABLE
text_filtered_df.cache(); text_filtered_df.count();
text_filtered_df.createOrReplaceTempView("text_filtered_table")

## COUNT NUMBER OF ROWS IN DATAFRAME 
text_filtered_df.count()

31253

## Lowercase and remove some words and remove punctuations

In [19]:
text_filtered_df2 = text_filtered_df.withColumn("comment1", lower(col("comment"))).\
    withColumn("comment2", regexp_replace("comment1", '-newline_token', "")).\
    withColumn("comment3", regexp_replace("comment2", 'newline_token', "")).\
    withColumn("comment_final", regexp_replace("comment3", '[^\w-_ ]', "")).\
    select('rev_id', 'comment_final')
    
text_filtered_df2.head(1)

[Row(rev_id=37675, comment_final='this is not creative  those are the dictionary definitions of the terms insurance and ensurance as properly applied to destruction  if you dont understand that fine legitimate criticism ill write up three man cell and bounty hunter and then it will be easy to understand why ensured and insured are different - and why both differ from assuredthe sentence you quote is absolutely neutral  you just arent familiar with the underlying theory of strike-back eg submarines as employed in nuclear warfare guiding the insurance nor likely the three man cell structure that kept the ira from being broken by the british  if thats my fault fine i can fix that to explain  but theres nothing personal or creative about itim tired of arguing with you  re the other article multi-party turns up plenty and there is more use of mutually than mutual  if i were to apply your standard id be moving mutual assured destruction to talk for not appealing to a reagan voters biases abo

## Tokenize and remove stopwords

In [20]:
## DEFINE TOKENIZER AND STOPWORD REMOVER
tokenizer = Tokenizer(inputCol="comment_final", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtWords")

## TRANSFORM DATASET
text_filtered_df3 = tokenizer.transform(text_filtered_df2)
text_filtered_df4 = remover.transform(text_filtered_df3)
text_filtered_df4.cache(); text_filtered_df4.count();
text_filtered_df4.show(5)

+------+--------------------+--------------------+--------------------+
|rev_id|       comment_final|               words|           filtWords|
+------+--------------------+--------------------+--------------------+
| 37675|this is not creat...|[this, is, not, c...|[creative, , dict...|
| 44816| the term standar...|[, the, term, sta...|[, term, standard...|
| 49851|true or false the...|[true, or, false,...|[true, false, sit...|
| 93890|this page will ne...|[this, page, will...|[page, need, disa...|
|103624|i removed the fol...|[i, removed, the,...|[removed, followi...|
+------+--------------------+--------------------+--------------------+
only showing top 5 rows



## DEFINE AND RUN WORD2VEC ON COMMENTS

In [21]:
model = None
window_size = 5
vector_size = 50
min_count = 5

word2Vec = Word2Vec(windowSize = window_size, vectorSize = vector_size, minCount=min_count, numPartitions=2, inputCol="filtWords", outputCol="result")
model = word2Vec.fit(text_filtered_df4)

## Examine some words, and other words close to them from these feature neighborhood

In [22]:
model.findSynonyms("bad", 20).select("word").head(5)

[Row(word='assume'),
 Row(word='wikipediaassume'),
 Row(word='good'),
 Row(word='laugh'),
 Row(word='luck')]

## Examine how vector features look like

In [23]:
word2vec_features = model.getVectors().select("*")
word2vec_features.head(1)

[Row(word='quotient', vector=DenseVector([0.011, -0.012, -0.0068, -0.0071, -0.0994, -0.0221, -0.0346, -0.0691, 0.0614, -0.0031, 0.1304, -0.037, 0.0149, 0.0133, -0.0859, 0.0368, -0.0284, 0.0386, -0.0081, 0.0353, -0.1043, -0.0622, -0.0584, 0.019, -0.0571, -0.0851, 0.0035, 0.0542, -0.0637, -0.1261, -0.077, 0.0611, -0.0239, -0.0284, -0.0223, -0.1185, 0.1803, 0.0684, -0.0271, 0.0912, 0.0509, -0.0634, -0.0392, 0.0635, -0.0611, 0.0488, -0.0266, 0.022, -0.0666, -0.0396]))]

## Convert Spark DF to Pandas DF

In [24]:
word2vec_features_pdf = word2vec_features.toPandas()
word2vec_features_pdf.head(4)

Unnamed: 0,word,vector
0,quotient,"[0.011030850932, -0.0120394676924, -0.00679607..."
1,incident,"[-0.116916172206, -0.329966247082, 0.197727650..."
2,serious,"[0.190715700388, -0.160169303417, -0.066523671..."
3,wgbh,"[-0.0528572499752, -0.0575158596039, 0.0371261..."


## Save features in CSV for subsequent steps

In [25]:
word2vec_features_pdf.to_csv('../Outputs/Word2Vec-Features.csv')

## Getting comment-level vectors from word-level vectors (averaging)

In [26]:
comment_vectors_df = model.transform(text_filtered_df4).select('rev_id','result')
comment_vectors_df.head(2)

[Row(rev_id=37675, result=DenseVector([0.0677, -0.1552, 0.0711, 0.029, 0.0307, 0.0032, 0.0882, 0.1106, -0.0373, -0.0323, -0.0269, -0.0713, -0.0212, 0.0284, -0.0144, -0.0149, -0.0382, -0.1468, 0.0987, 0.061, -0.0303, 0.045, 0.0631, -0.1285, -0.0673, 0.13, 0.0246, -0.0101, 0.0606, 0.0399, -0.0319, -0.0204, -0.03, -0.0139, 0.0212, -0.0672, 0.0176, -0.0879, 0.1347, 0.0538, 0.1634, -0.0089, -0.073, 0.0771, 0.0137, -0.1158, 0.0005, 0.0293, -0.0078, 0.0276])),
 Row(rev_id=44816, result=DenseVector([0.0392, -0.1737, 0.0578, 0.0257, 0.0555, 0.0519, 0.0922, 0.088, -0.0385, -0.0383, -0.0115, -0.0948, -0.0115, 0.0985, 0.0139, 0.0053, -0.0204, -0.1383, 0.0846, 0.0542, -0.0723, 0.0532, 0.0468, -0.1334, -0.0481, 0.216, -0.0601, -0.0288, 0.0133, 0.0004, -0.0001, -0.0505, -0.0508, -0.0579, -0.0205, -0.0859, 0.0461, -0.083, 0.0971, 0.1001, 0.1454, 0.0201, -0.0816, 0.1143, 0.0294, -0.0961, 0.0195, 0.0179, 0.0259, -0.0025]))]