In [2]:
import numpy as np
import pandas as pd
import time
import math
from nltk.corpus import stopwords

from pyspark import SparkContext
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans

# Read Twitter Data as a Spark DataFrame

In [3]:
t0 = time.time()
datapath = '/Users/jorgecastanon/Documents/github/w2v/data/tweets.gz'
tweets = sqlContext.read.json(datapath)
tweets.registerTempTable("tweets")
#print "Number of tweets read: ", tweets.count() # this line add ~7 seconds (from ~24.5 seconds to ~31.5 seconds)
print "Elapsed time (seconds): ", time.time() - t0

Elapsed time (seconds):  29.2087728977


# Read Words to Filter

In [5]:
filterPath = '/Users/jorgecastanon/Documents/github/w2v/data/filter.txt'
filter = pd.read_csv(filterPath,header=None)

# Use Spark SQL to Filter Tweets:
### + In english
### + And containing at least one of the keywords

In [6]:
# Construct SQL Command
t0 = time.time()
sqlString = "("
for substr in filter[0]: #iteration on the list of words to filter (at most 50-100 words)
    sqlString = sqlString+"text LIKE '%"+substr+"%' OR "
    sqlString = sqlString+"text LIKE '%"+substr.upper()+"%' OR "
sqlString=sqlString[:-4]+")"
sqlFilterCommand = "SELECT lang, text FROM tweets WHERE (lang = 'en') AND "+sqlString

# Query tweets in english that contain at least one of the keywords
tweetsDF = sqlContext.sql(sqlFilterCommand)
#print type(tweetsDF), tweetsDF.count() # this line add ~9 seconds (from ~0.72 seconds to ~9.42 seconds)
print "Elapsed time (seconds): ", time.time() - t0

Elapsed time (seconds):  1.01377296448


# Parse and Remove Stop Words 

In [7]:
tweetsRDD = tweetsDF.select('text').rdd

def parseAndRemoveStopWords(text):
    t = text[0].replace(";"," ").replace(":"," ").replace('"',' ').replace('-',' ')
    t = t.replace(',',' ').replace('.',' ')
    t = t.lower().split(" ")
    stop = stopwords.words('english')
    return [i for i in t if i not in stop]

tw = tweetsRDD.map(parseAndRemoveStopWords)

# Word2Vec: returns a dataframe with words and vectors

In [9]:
# map to df
twDF = tw.map(lambda p: Row(text=p)).toDF()

# default minCount = 5 (we may need to try something larger: 20-100 to reduce cost)
# default vectorSize = 100 (we may want to keep default)
t0 = time.time()
word2Vec = Word2Vec(vectorSize=100, minCount=25, stepSize=0.1, inputCol="text", outputCol="result")
modelW2V = word2Vec.fit(twDF)
wordVectorsDF = modelW2V.getVectors()
print "Elapsed time(seconds) :", time.time() - t0

Elapsed time(seconds) : 22.9976389408


In [10]:
print "Vocabulary Size: ", wordVectorsDF.count() 

Vocabulary Size:  811


# To feed force graph: findSynonyms used to find top N closest words 

In [11]:
synonymsDF = modelW2V.findSynonyms("christmas", 20)
synonymsDF.show()

+--------------+------------------+
|          word|        similarity|
+--------------+------------------+
|           eve|0.8034414351704501|
|          you!|0.7697983397627831|
|    #christmas| 0.754777686709501|
|          hope|0.7069539180475077|
|       present|0.6931874739179668|
|      midnight|0.6822633433563502|
|            🎁|0.6784892791326919|
|          babe|0.6613937562729849|
|@niallofficial|0.6544346950755877|
|          send|0.6445908439723395|
|            🎄|0.6360973777773882|
|          card|0.6330151491688999|
|         louis|0.6310315485515334|
|         would|0.6053057866691136|
|             e|0.5979345863213892|
|      presents|0.5733158987239777|
|            🎅|0.5598128859776278|
|          safe|0.5548107728343319|
|          wish|0.5500323573645335|
|          sony| 0.549303664868333|
+--------------+------------------+



# K-means on top of Word2Vec using DF (spark.ml)

In [13]:
t0=time.time()
vocabSize=wordVectorsDF.count()
K = int(math.floor(math.sqrt(float(vocabSize)/2)))
         # K ~ sqrt(n/2) this is a rule of thumb for choosing K,
         # where n is the number of words in the model
         # feel free to choose K with a fancier algorithm
           
dfW2V = wordVectorsDF.select('vector').withColumnRenamed('vector','features')
kmeans = KMeans(k=K, seed=1)
modelK = kmeans.fit(dfW2V)
labelsDF = modelK.transform(dfW2V).select('prediction').withColumnRenamed('prediction','labels')
print "Elapsed time(seconds) :", time.time() - t0

Elapsed time(seconds) : 1.18997311592


# Save Models

In [14]:
def toNumpy(df,colName):
    '''
    In .- spark dataframe with an specific column selected
    Out .- numpy array of that column
    '''
    dfCol = df.select(colName)
    return dfCol.map(lambda e: e[0]).collect()

In [15]:
w2vMatrix = toNumpy(wordVectorsDF,'vector')
np.save('w2vMatrix.npy',w2vMatrix)

words = toNumpy(wordVectorsDF,'word')
np.save('words.npy',words)

lables = toNumpy(labelsDF,'labels')
np.save('labels.npy',words)