# Imports

In [1]:
import os
import time
import glob
import pandas as pd
import findspark

findspark.init()
import pyspark

from pyspark import SparkContext as sc, SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, ArrayType
from pyspark.ml.feature import Word2Vec

import json

# Spark Session

In [2]:
appName = "JsontoDataFrame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

# Word2Vec Configuration

In [None]:
word2vec = Word2Vec(
    vectorSize=256,
    minCount=10,
    inputCol="text",
    outputCol="result"
)
word2vec.setNumPartitions(10)

# Functions

In [4]:
def load_data(file_index, number):
    file_path = f"data/data_{file_index}.json"
    data_schema = [
        StructField('title', StringType(), True),
        StructField('text', ArrayType(StringType()), True)
    ]
    final_struc = StructType(fields=data_schema)
    df = spark.read.json(file_path, final_struc)
    while file_index <= number:
        start_time = time.time()
        
        file_index += 1
        file_path = f"data/data_{file_index}.json"
        try:
            new_df = spark.read.json(file_path, final_struc)
            df = df.union(new_df)
            print(f"Row count: {df.count()}")
        except:
            print(f"{file_index} not found at {file_path}; skipping...")
        print(f"Processing {file_path} took {time.time() - start_time}s")
        
    print(f"Finished")
    print(f"Total rows: {df.count()}")
    return df

def load_data_v2(data_dir):
    data_schema = [
        StructField('title', StringType(), True),
        StructField('text', ArrayType(StringType()), True)
    ]
    final_struc = StructType(fields=data_schema)
    
    # Get list of files
    files = sorted(glob.glob(os.path.join(data_dir, '*')))
    df = spark.read.json(files, final_struc)
    return df
                   
def train_model(df):
    start = time.time()
    print(f"Start time: {start}")
    model = word2vec.fit(df)
    print(f"Time elapsed: {time.time() - start}")
    return model

In [24]:
start = time.time()
df = load_data(0, 292)
print(f"Elapsed Time: {time.time() - start}")

Row count: 40000
Processing data/data_1.json took 0.6386759281158447s
Row count: 60000
Processing data/data_2.json took 0.6864230632781982s
Row count: 80000
Processing data/data_3.json took 0.7840969562530518s
Row count: 100000
Processing data/data_4.json took 0.9453299045562744s
Row count: 120000
Processing data/data_5.json took 1.08286452293396s
Row count: 140000
Processing data/data_6.json took 1.2130696773529053s
Row count: 160000
Processing data/data_7.json took 1.4230453968048096s
Row count: 180000
Processing data/data_8.json took 1.3649954795837402s
Row count: 200000
Processing data/data_9.json took 1.5609076023101807s
Row count: 220000
Processing data/data_10.json took 2.0816593170166016s
Row count: 240000
Processing data/data_11.json took 2.134158134460449s
Row count: 260000
Processing data/data_12.json took 2.281038999557495s
Row count: 280000
Processing data/data_13.json took 2.371687412261963s
Row count: 300000
Processing data/data_14.json took 2.680473566055298s
Row count:

Row count: 2320000
Processing data/data_115.json took 14.640623569488525s
Row count: 2340000
Processing data/data_116.json took 16.03971242904663s
Row count: 2360000
Processing data/data_117.json took 15.410370349884033s
Row count: 2380000
Processing data/data_118.json took 14.345799684524536s
Row count: 2400000
Processing data/data_119.json took 14.799275159835815s
Row count: 2420000
Processing data/data_120.json took 14.972877740859985s
Row count: 2440000
Processing data/data_121.json took 14.985008478164673s
Row count: 2460000
Processing data/data_122.json took 14.946365356445312s
Row count: 2480000
Processing data/data_123.json took 15.28468108177185s
Row count: 2500000
Processing data/data_124.json took 15.645419597625732s
Row count: 2520000
Processing data/data_125.json took 16.09997034072876s
Row count: 2540000
Processing data/data_126.json took 16.152238368988037s
Row count: 2560000
Processing data/data_127.json took 17.762072563171387s
Row count: 2580000
Processing data/data_1

Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7fa4eb54bdc0>
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1341, in <lambda>
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 645, in _garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 964, in garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1119, in start
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1135, in _authenticate_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip

212 not found at data/data_212.json; skipping...
Processing data/data_212.json took 0.3766920566558838s
213 not found at data/data_213.json; skipping...
Processing data/data_213.json took 0.13854575157165527s
214 not found at data/data_214.json; skipping...
Processing data/data_214.json took 0.1796870231628418s
215 not found at data/data_215.json; skipping...
Processing data/data_215.json took 0.16290616989135742s
216 not found at data/data_216.json; skipping...
Processing data/data_216.json took 0.18429780006408691s
217 not found at data/data_217.json; skipping...
Processing data/data_217.json took 0.1699967384338379s
218 not found at data/data_218.json; skipping...
Processing data/data_218.json took 0.18657970428466797s
219 not found at data/data_219.json; skipping...
Processing data/data_219.json took 0.13951802253723145s
220 not found at data/data_220.json; skipping...
Processing data/data_220.json took 0.1657392978668213s
221 not found at data/data_221.json; skipping...
Processing

Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7fa4eba96160>
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1341, in <lambda>
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 645, in _garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 964, in garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1119, in start
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1135, in _authenticate_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip


Processing data/data_268.json took 0.45403313636779785s
269 not found at data/data_269.json; skipping...
Processing data/data_269.json took 0.029652118682861328s
270 not found at data/data_270.json; skipping...
Processing data/data_270.json took 0.1565413475036621s
271 not found at data/data_271.json; skipping...
Processing data/data_271.json took 0.16143155097961426s
272 not found at data/data_272.json; skipping...
Processing data/data_272.json took 0.14052748680114746s
273 not found at data/data_273.json; skipping...
Processing data/data_273.json took 0.17720389366149902s
274 not found at data/data_274.json; skipping...
Processing data/data_274.json took 0.13234901428222656s
275 not found at data/data_275.json; skipping...
Processing data/data_275.json took 0.17529010772705078s
276 not found at data/data_276.json; skipping...
Processing data/data_276.json took 0.1538712978363037s


Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7fa50592ed30>
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1341, in <lambda>
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 645, in _garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 964, in garbage_collect_object
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1119, in start
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1135, in _authenticate_connection
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip

277 not found at data/data_277.json; skipping...
Processing data/data_277.json took 0.4136233329772949s
278 not found at data/data_278.json; skipping...
Processing data/data_278.json took 0.08617496490478516s
279 not found at data/data_279.json; skipping...
Processing data/data_279.json took 0.1530308723449707s
280 not found at data/data_280.json; skipping...
Processing data/data_280.json took 0.1542980670928955s
281 not found at data/data_281.json; skipping...
Processing data/data_281.json took 0.14403963088989258s
282 not found at data/data_282.json; skipping...
Processing data/data_282.json took 0.16292309761047363s
283 not found at data/data_283.json; skipping...
Processing data/data_283.json took 0.17930388450622559s
284 not found at data/data_284.json; skipping...
Processing data/data_284.json took 0.15617609024047852s
285 not found at data/data_285.json; skipping...
Processing data/data_285.json took 0.15742158889770508s
286 not found at data/data_286.json; skipping...
Processin

KeyboardInterrupt: 

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/opt/spark/python/pyspark/shell.py", line 51, in <lambda>
    atexit.register(lambda: sc.stop())
TypeError: stop() missing 1 required positional argument: 'self'
Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7fa4eba96040>
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1340, in <lambda>
  File "/opt/spark/python/pyspark/context.py", line 274, in signal_handler
    raise KeyboardInterrupt()
KeyboardInterrupt: 


In [5]:
start = time.time()
df = load_data_v2('data')
print(f"Elapsed Time: {time.time() - start}")

Elapsed Time: 6.31993293762207


## Duration

load_data() => s

load_data_v2() => 4.5s (lazy loading)

In [15]:
print(df.count())

5824609


In [6]:
print(df.tail(20))

[Row(title='Rumisapa District', text=['rumisapa', 'district', 'one', 'eleven', 'districts', 'province', 'lamas', 'peru', 'references']), Row(title='Craig J. Spence', text=['craig', 'j', 'spence', '1941', 'november', '10', '1989', 'republican', 'lobbyist', 'found', 'dead', 'ritz-carlton', 'hotel', 'room', '1989', 'background', 'spence', 'attended', 'syracuse', 'university', 'becoming', 'correspondent', 'abc', 'news', 'vietnam', 'war', 'covered', 'southeast', 'asia', 'eventually', 'left', 'work', 'expelled', 'south', 'vietnam', 'alleged', 'black', 'market', 'currency', 'transactions', '--', 'unknown', 'practice', 'removing', 'troublesome', 'reporters', 'relocated', 'tokyo', 'stringer', 'britain', 'daily', 'mail', 'began', 'public', 'relations', 'consulting', 'government-supported', 'japan', 'external', 'trade', 'organization', 'japanese', 'corporations', 'january', '1985', 'spence', 'registered', 'us', 'state', 'department', 'foreign', 'agent', 'japan', 'began', 'lobbying', 'japanese', '

# Train

In [7]:
model = train_model(df.limit(1000000))

Start time: 1598481424.4315293
Time elapsed: 11642.238344669342


In [8]:
model.save("one_million_model")

In [10]:
result = model.transform(df.limit(1000000))
print(result)

DataFrame[title: string, text: array<string>, result: vector]


In [11]:
result.show()

+--------------------+--------------------+--------------------+
|               title|                text|              result|
+--------------------+--------------------+--------------------+
|    Lesser shortwing|[lesser, shortwin...|[0.13181779829218...|
|Shanghai Tobacco ...|[shanghai, tobacc...|[-0.0167293419171...|
|         Niki Massey|[niki, massey, oc...|[-0.0304032912153...|
|        Harry Leitch|[dr, harry, leitc...|[-0.0138400557823...|
|    Ljupčo Kmetovski|[ljupčo, kmetovsk...|[-0.0479290529569...|
|March 1979 lunar ...|[partial, lunar, ...|[0.00862733636429...|
|              Mansun|[mansun, english,...|[-0.0354783898851...|
|      Ladislav Klíma|[ladislav, klíma,...|[0.00286052224060...|
|         Sex linkage|[sex, linkage, de...|[-0.0374448909977...|
|         Mary Rowell|[mary, rowell, am...|[-0.0082153989390...|
|             Sieneke|[sieneke, born, s...|[-0.0301110380857...|
|Maltese First Div...|[maltese, first, ...|[0.01311316324092...|
|St. John Baptist ...|[st

In [12]:
model.save("one_million_two")

In [13]:
model.getMinCount()

10

In [None]:
cluster = 

In [15]:
model.getVectors().show()

+-------------+--------------------+
|         word|              vector|
+-------------+--------------------+
|     cristino|[-0.1160977184772...|
|    hoshizora|[-0.1109780073165...|
|        mells|[-0.0570699460804...|
|     17901791|[-0.0185965653508...|
|        frane|[-0.1329791694879...|
|        gawar|[-0.0898746177554...|
|          kuş|[-0.0321204923093...|
|        toppo|[0.00813362747430...|
|        shuvo|[-0.1562451720237...|
|       hilyer|[-0.0437848530709...|
|        cetti|[0.13472637534141...|
|         koel|[-0.0291677471250...|
|        10292|[0.03037112019956...|
|rhynchostylis|[0.09231655299663...|
|        1910s|[0.11900905519723...|
|        19125|[0.00824300106614...|
|         onam|[-0.0484162457287...|
|      rahmani|[-0.2499719411134...|
|      prinzip|[-0.0082170348614...|
|       197077|[-0.0509997271001...|
+-------------+--------------------+
only showing top 20 rows



In [17]:
result.write.format('json').option('header',True).mode('overwrite').option('sep',',').save('result_output.csv')

In [29]:
def cosine_similarity(x, y):
     return x.dot(y)/(x.norm()*y.norm())


In [30]:
x = model.transform('man')
y = model.transform('woman')
cosine_similarity(x, y)


AttributeError: 'str' object has no attribute '_jdf'

In [20]:
from pyspark.sql.functions import format_number as fmt
model.findSynonyms("politics", 2).select("word", fmt("similarity", 5).alias("similarity")).show()

+------------+----------+
|        word|similarity|
+------------+----------+
|   political|   0.72774|
|conservatism|   0.63732|
+------------+----------+



In [22]:
model.findSynonymsArray("leadership", 6)

[('fractievoorzitter', 0.7178506851196289),
 ('entryist', 0.6289839744567871),
 ('nfp', 0.6286091208457947),
 ('partys', 0.6178027987480164),
 ('neo-destour', 0.6160869002342224),
 ('leader', 0.6095247268676758)]

In [37]:
model.findSynonyms("doctor", 7).select("word", fmt("similarity", 5).alias("similarity")).show()

+---------+----------+
|     word|similarity|
+---------+----------+
| dolittle|   0.59873|
|silurians|   0.59751|
| mid-nite|   0.57907|
|chronotis|   0.56858|
|      sjd|   0.56078|
|doctorate|   0.56057|
|    juris|   0.55439|
+---------+----------+



In [23]:
model.findSynonymsArray("man", 6)

[('woman', 0.7367258071899414),
 ('spider-plant', 0.6604211926460266),
 ('girl', 0.6590791940689087),
 ('handsome', 0.6530117988586426),
 ('meanest', 0.6431087851524353),
 ('thief', 0.6402087211608887)]

In [25]:
model.findSynonyms("testosterone", 6).select("word", fmt("similarity", 5).alias("similarity")).show()

+------------+----------+
|        word|similarity|
+------------+----------+
|    estrogen|   0.83971|
|    androgen|   0.82857|
|   androgens|   0.82131|
|   estradiol|   0.81995|
|   prolactin|   0.81540|
|progesterone|   0.81341|
+------------+----------+



In [26]:
os.getcwd()

'/home/schen/Desktop/Laura'

In [27]:
test = ["trump is so great. I love him", "Joe Biden for the Win"]

In [28]:
model.findSynonyms("president", 6).select("word", fmt("similarity", 5).alias("similarity")).show()

+---------------+----------+
|           word|similarity|
+---------------+----------+
|president-elect|   0.82420|
| vice-president|   0.79402|
|     presidency|   0.77758|
| then-president|   0.73219|
|     presidents|   0.72987|
|           vice|   0.70748|
+---------------+----------+



In [34]:
from sklearn.metrics.pairwise import cosine_similarity

cosine_similarity([[1,0,-1]],[[-1, -1, 0]])

ModuleNotFoundError: No module named 'sklearn'

In [33]:
pip install sklearn

Collecting sklearn
  Downloading sklearn-0.0.tar.gz (1.1 kB)
Collecting scikit-learn
  Downloading scikit_learn-0.23.2-cp38-cp38-manylinux1_x86_64.whl (6.8 MB)
[K     |████████████████████████████████| 6.8 MB 4.1 MB/s eta 0:00:01
Collecting threadpoolctl>=2.0.0
  Downloading threadpoolctl-2.1.0-py3-none-any.whl (12 kB)
Collecting joblib>=0.11
  Downloading joblib-0.16.0-py3-none-any.whl (300 kB)
[K     |████████████████████████████████| 300 kB 8.2 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: sklearn
  Building wheel for sklearn (setup.py) ... [?25ldone
[?25h  Created wheel for sklearn: filename=sklearn-0.0-py2.py3-none-any.whl size=1315 sha256=b93f2fd6dd83e185ca27110f6eee718a847dab01edfdf5d92898c322a05e96b9
  Stored in directory: /home/schen/.cache/pip/wheels/22/0b/40/fd3f795caaa1fb4c6cb738bc1f56100be1e57da95849bfc897
Successfully built sklearn
Installing collected packages: threadpoolctl, joblib, scikit-learn, sklearn
Successfully installed joblib-0.16.0 scikit-l

In [35]:
model.save("one_million_twoptone")