#### Configure Spark Context

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext() 
sqlContext = SQLContext(sc)
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from nltk.stem.wordnet import WordNetLemmatizer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec

print("Using Apache Spark Version", sc.version)

Using Apache Spark Version 2.4.4


In [2]:
import spacy
import en_core_web_sm
nlp = en_core_web_sm.load( disable=['parser', 'tagger','ner'] )

def cleanup_pretokenize(text):
    #text = re.sub(r'^https?:\/\/.*[\r\n]*', '', text, flags=re.MULTILINE)
    text = re.sub(r'http\S+', '', text)
    text = text.replace("'s", " ")
    text = text.replace("n't", " not ")
    text = text.replace("'ve", " have ")
    text = text.replace("'re", " are ")
    text = text.replace("I'm"," I am ")
    text = text.replace("you're"," you are ")
    text = text.replace("You're"," You are ")
    text = text.replace("-"," ")
    text = text.replace("/"," ")
    text = text.replace("("," ")
    text = text.replace(")"," ")
    text = text.replace("%"," percent ")
    return text

lmtzr = WordNetLemmatizer()
def text_cleanup(row):
    desc = row[2].strip().lower()
    tokens = [w.lemma_ for w in nlp(cleanup_pretokenize(desc))]
    tokens = [token for token in tokens if token.isalpha()]
    tokens = [token for token in tokens if len(token) > 3]
    #tokens = [lmtzr.lemmatize(token,'v') for token in tokens]
    row[2] = ' '.join(tokens)
    return row

regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'description', outputCol = 'tokens')
swr = StopWordsRemover(inputCol = 'tokens', outputCol = 'tokens_sw_removed')

In [3]:
crunchbase_df = sqlContext.read.option("header", "true").option("delimiter", ",") \
                    .option("inferSchema", "true") \
                    .csv("/Users/javid/projects/enaibl/data/cb_odm_092419.csv")


In [4]:
crunchbase_data = crunchbase_df['crunchbase_uuid','name','short_description']


In [5]:
crunchbase_columns = [0,1,2]
crunchbase_rdd = crunchbase_data.select('*') \
                       .rdd.map(lambda row: [row[i] for i in crunchbase_columns]) \
                       .filter(lambda row: row[2] is not None)
crunchbase_df = sqlContext.createDataFrame(crunchbase_rdd, 
                                           ['crunchbase_uuid','name','description'])
crunchbase_df.show(5)


+--------------------+--------+--------------------+
|     crunchbase_uuid|    name|         description|
+--------------------+--------+--------------------+
|e1393508-30ea-8a3...|Wetpaint|Wetpaint offers a...|
|bf4d7b0e-b34d-2fd...|    Zoho|Zoho offers a sui...|
|5f2b40b8-d1b3-d32...|    Digg|Digg Inc. operate...|
|df662812-7f97-0b4...|Facebook|Facebook is an on...|
|b08efc27-da40-505...|   Accel|Accel is an early...|
+--------------------+--------+--------------------+
only showing top 5 rows



In [6]:
def cossim(v1, v2): 
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / (np.sqrt(np.dot(v2, v2))+.1)



In [7]:
df_tokens = regexTokenizer.transform(crunchbase_df)
desc_swr = swr.transform(df_tokens)
desc_swr.show(3)
#desc_swr_half = desc_swr.limit(50000)
#desc_swr_half.show(3)
#desc_swr.write.saveAsTable('desc_swr', mode = 'overwrite')



+--------------------+--------+--------------------+--------------------+--------------------+
|     crunchbase_uuid|    name|         description|              tokens|   tokens_sw_removed|
+--------------------+--------+--------------------+--------------------+--------------------+
|e1393508-30ea-8a3...|Wetpaint|Wetpaint offers a...|[wetpaint, offers...|[wetpaint, offers...|
|bf4d7b0e-b34d-2fd...|    Zoho|Zoho offers a sui...|[zoho, offers, a,...|[zoho, offers, su...|
|5f2b40b8-d1b3-d32...|    Digg|Digg Inc. operate...|[digg, inc, opera...|[digg, inc, opera...|
+--------------------+--------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [8]:
word2vec = Word2Vec(vectorSize = 300, minCount = 5, inputCol = 'tokens_sw_removed', outputCol = 'wordvectors')
model = word2vec.fit(desc_swr)
wordvectors = model.transform(desc_swr)
#wordvectors.select('wordvectors').show(1, truncate = True)
crunchbase_desc = wordvectors.select('crunchbase_uuid','name','wordvectors').rdd.toDF()
crunchbase_desc.show(5)


+--------------------+--------+--------------------+
|     crunchbase_uuid|    name|         wordvectors|
+--------------------+--------+--------------------+
|e1393508-30ea-8a3...|Wetpaint|[-0.0655311768253...|
|bf4d7b0e-b34d-2fd...|    Zoho|[-0.0572017064717...|
|5f2b40b8-d1b3-d32...|    Digg|[-0.0059538231446...|
|df662812-7f97-0b4...|Facebook|[0.04988502562046...|
|b08efc27-da40-505...|   Accel|[-0.0774579850787...|
+--------------------+--------+--------------------+
only showing top 5 rows



In [9]:
synonyms = model.findSynonyms("facebook", 20)   
synonyms.show()

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|instagram|0.7966665625572205|
|  twitter|0.7834885120391846|
| linkedin|0.7453723549842834|
|pinterest|0.6593936085700989|
|followers|0.6581313610076904|
|     bing|0.6570589542388916|
|   google|0.6495094299316406|
|       fb|0.6432822346687317|
| whatsapp|0.6388134956359863|
|    pages|0.6143272519111633|
| snapchat|0.6061036586761475|
|    slack|0.6059338450431824|
|    yahoo|0.6058914065361023|
|  youtube|0.6052976250648499|
|  hotmail|0.6032532453536987|
|    likes|0.6031017899513245|
|   tumblr|0.5892382264137268|
|messenger|0.5810031890869141|
|    gmail|0.5780516266822815|
|  landing|0.5775303840637207|
+---------+------------------+



In [10]:
#chunk = crunchbase_desc.filter(lambda r: r[1]>=0 and r[1]<1000).collect()
chunk = crunchbase_desc.take(50000)
#chunk = crunchbase_desc.collect()


In [26]:
SEARCH_QUERY = "I like eating pizza"

In [27]:
query_df  = sc.parallelize([(1,SEARCH_QUERY)]).toDF(['index','description'])
query_tok = regexTokenizer.transform(query_df)
query_swr = swr.transform(query_tok)
query_swr.show()
query_vec = model.transform(query_swr)
query_vec = query_vec.select('wordvectors').collect()[0][0]
#query_vec

+-----+-------------------+--------------------+--------------------+
|index|        description|              tokens|   tokens_sw_removed|
+-----+-------------------+--------------------+--------------------+
|    1|I like eating pizza|[i, like, eating,...|[like, eating, pi...|
+-----+-------------------+--------------------+--------------------+



In [28]:
import numpy as np
sim_rdd = sc.parallelize((i[0], i[1], float(cossim(query_vec, i[2]))) for i in chunk)
sim_df  = sqlContext.createDataFrame(sim_rdd).\
                   withColumnRenamed('_1', 'crunchbase_uuid').\
                   withColumnRenamed('_2', 'name').\
                   withColumnRenamed('_3', 'similarity').\
                   orderBy("similarity", ascending = False)
sim_df.show(20, truncate = False)

+------------------------------------+----------------------------------------+------------------+
|crunchbase_uuid                     |name                                    |similarity        |
+------------------------------------+----------------------------------------+------------------+
|7a3f6326-269d-fcd8-26b4-16cf8653a7dc|GoTime                                  |0.6140712711248749|
|58e75fb8-524f-aad9-bc48-672703352691|Foodberry                               |0.6033352705245958|
|ecde11d3-bd73-ff9e-3648-c4dcd388f95c|Zipityzap                               |0.6014549195836659|
|5b818e0b-b9d5-44a0-0e4f-7557ea9626d1|Partins Jamaican Bakery & Grill         |0.5849417402210417|
|d625185e-a39d-facd-808a-90b70b49c8f3|Attune Foods                            |0.584905530393183 |
|25efac40-046d-26cb-7a7b-91c9b8909afb|Natural Balance Foods                   |0.5804848461050937|
|380a50b2-e4bd-299c-10f9-ad1953639ba7|HealthySnacksGuide.com                  |0.5754325066092547|
|819d117d-

In [None]:
from pyspark.ml.feature import Word2VecModel
