In [1]:
import os
import sys

spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()

import pyspark

number_cores = 4
memory_gb = 8
conf = (pyspark.SparkConf().setMaster('local[{}]'.
                                      format(number_cores)).
        set('spark.driver.memory', '{}g'.format(memory_gb)))
sc = pyspark.SparkContext(conf=conf)

In [2]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
spark = pyspark.sql.SparkSession(sc)

In [3]:
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),),
         (3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]

In [4]:
dfA = spark.createDataFrame(dataA, ["id", "features"])

In [5]:
dfA.take(6)

[Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0})),
 Row(id=1, features=SparseVector(6, {2: 1.0, 3: 1.0, 4: 1.0})),
 Row(id=2, features=SparseVector(6, {0: 1.0, 2: 1.0, 4: 1.0})),
 Row(id=3, features=SparseVector(6, {1: 1.0, 3: 1.0, 5: 1.0})),
 Row(id=4, features=SparseVector(6, {2: 1.0, 3: 1.0, 5: 1.0})),
 Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}))]

In [6]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)

In [7]:
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[1.52232126E8], ...|
|  1|(6,[2,3,4],[1.0,1...|[[1.52232126E8], ...|
|  2|(6,[0,2,4],[1.0,1...|[[1.52232126E8], ...|
|  3|(6,[1,3,5],[1.0,1...|[[3.82038881E8], ...|
|  4|(6,[2,3,5],[1.0,1...|[[1.52232126E8], ...|
|  5|(6,[1,2,4],[1.0,1...|[[1.52232126E8], ...|
+---+--------------------+--------------------+



In [8]:
final = model.approxSimilarityJoin(dfA, dfA, 1.0, distCol="distance")

final.createOrReplaceTempView("final")
final.printSchema()

root
 |-- datasetA: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- features: vector (nullable = true)
 |    |-- hashes: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- datasetB: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- features: vector (nullable = true)
 |    |-- hashes: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- distance: double (nullable = false)



In [9]:
sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from final where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

+---+--------------------+---+--------------------+--------+
| id|            features| id|            features|distance|
+---+--------------------+---+--------------------+--------+
|  0|(6,[0,1,2],[1.0,1...|  1|(6,[2,3,4],[1.0,1...|     0.8|
|  0|(6,[0,1,2],[1.0,1...|  2|(6,[0,2,4],[1.0,1...|     0.5|
|  0|(6,[0,1,2],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.8|
|  0|(6,[0,1,2],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  0|(6,[0,1,2],[1.0,1...|     0.8|
|  1|(6,[2,3,4],[1.0,1...|  2|(6,[0,2,4],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  0|(6,[0,1,2],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  1|(6,[2,3,4],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.8|
|  2|(6,[0,2,4],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  3|(6,[1,3,5],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.5|
|  4|(6,[2,3,5],[1.0,1..

In [10]:
dataB = [("C1", Vectors.sparse(7, [0, 1, 5, 6], [1.0, 1.0, 1.0, 1.0]),),
         ("C2", Vectors.sparse(7, [2, 3, 4], [1.0, 1.0, 1.0]),),
         ("C3", Vectors.sparse(7, [0, 5, 6], [1.0, 1.0, 1.0]),),
         ("C4", Vectors.sparse(7, [1, 2, 3, 4], [1.0, 1.0, 1.0, 1.0]),)]

In [11]:
dfB = spark.createDataFrame(dataB, ["id", "features"])
mhB = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=100)
modelB = mhB.fit(dfB)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelB.transform(dfB).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
| C1|(7,[0,1,5,6],[1.0...|[[3.82038881E8], ...|
| C2|(7,[2,3,4],[1.0,1...|[[1.52232126E8], ...|
| C3|(7,[0,5,6],[1.0,1...|[[6.11845636E8], ...|
| C4|(7,[1,2,3,4],[1.0...|[[1.52232126E8], ...|
+---+--------------------+--------------------+



In [12]:
finalB = model.approxSimilarityJoin(dfB, dfB, 1.0, distCol="distance")
finalB.createOrReplaceTempView("finalB")

In [13]:
sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalB where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

+---+--------------------+---+--------------------+--------+
| id|            features| id|            features|distance|
+---+--------------------+---+--------------------+--------+
| C1|(7,[0,1,5,6],[1.0...| C3|(7,[0,5,6],[1.0,1...|    0.25|
| C2|(7,[2,3,4],[1.0,1...| C4|(7,[1,2,3,4],[1.0...|    0.25|
| C3|(7,[0,5,6],[1.0,1...| C1|(7,[0,1,5,6],[1.0...|    0.25|
| C4|(7,[1,2,3,4],[1.0...| C2|(7,[2,3,4],[1.0,1...|    0.25|
+---+--------------------+---+--------------------+--------+



In [14]:
dataB = [("C1", Vectors.sparse(7, [0, 5, 1, 6], [1.0, 1.0, 1.0, 1.0]),),
         ("C2", Vectors.sparse(7, [2, 3, 4], [1.0, 1.0, 1.0]),),
         ("C3", Vectors.sparse(7, [0, 5, 6], [1.0, 1.0, 1.0]),),
         ("C4", Vectors.sparse(7, [1, 3, 2, 4], [1.0, 1.0, 1.0, 1.0]),)]

dfB = spark.createDataFrame(dataB, ["id", "features"])
mhB = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
modelB = mhB.fit(dfB)

finalB = model.approxSimilarityJoin(dfB, dfB, 1.0, distCol="distance")
finalB.createOrReplaceTempView("finalB")

sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalB where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

TypeError: Indices 5 and 1 are not strictly increasing

### Inaugural speeches

In [29]:
raw_data = sc.wholeTextFiles("./data/inaugural_speeches/").cache()

In [31]:
import glob

data_path = "./data/inaugural_speeches"
all_files = glob.glob("./data/inaugural_speeches/*.txt")


file_names = sc.parallelize(all_files)
file_names.collect()

['./data/inaugural_speeches/13_van_buren_1837.txt',
 './data/inaugural_speeches/47_nixon_1973.txt',
 './data/inaugural_speeches/1_washington_1789.txt',
 './data/inaugural_speeches/50_reagan_1985.txt',
 './data/inaugural_speeches/53_clinton_1997.txt',
 './data/inaugural_speeches/17_pierce_1853.txt',
 './data/inaugural_speeches/14_harrison_1841.txt',
 './data/inaugural_speeches/56_obama_2009.txt',
 './data/inaugural_speeches/25_cleveland_1885.txt',
 './data/inaugural_speeches/9_monroe_1821.txt',
 './data/inaugural_speeches/12_jackson_1833.txt',
 './data/inaugural_speeches/11_jackson_1829.txt',
 './data/inaugural_speeches/36_hoover_1929.txt',
 './data/inaugural_speeches/45_johnson_1965.txt',
 './data/inaugural_speeches/51_bush_george_h_w_1989.txt',
 './data/inaugural_speeches/21_grant_1869.txt',
 './data/inaugural_speeches/41_truman_1949.txt',
 './data/inaugural_speeches/33_wilson_1917.txt',
 './data/inaugural_speeches/49_reagan_1981.txt',
 './data/inaugural_speeches/30_roosevelt_theodore

In [32]:
raw_data = file_names.map(lambda f: (f.split("speeches")[1][2:], open(f,mode='r').read()))
raw_data.take(5)

[('3_van_buren_1837.txt',
  'Martin Van Buren\t1837-03-04\tFellow-Citizens: The practice of all my predecessors imposes on me an obligation I cheerfully fulfill--to accompany the first and solemn act of my public trust with an avowal of the principles that will guide me in performing it and an expression of my feelings on assuming a charge so responsible and vast. In imitating their example I tread in the footsteps of illustrious men, whose superiors it is our happiness to believe are not found on the executive calendar of any country. Among them we recognize the earliest and firmest pillars of the Republic--those by whom our national independence was first declared, him who above all others contributed to establish it on the field of battle, and those whose expanded intellect and patriotism constructed, improved, and perfected the inestimable institutions under which we live. If such men in the position I now occupy felt themselves overwhelmed by a sense of gratitude for this the high

In [33]:
%%time
raw_data.count()

CPU times: user 8.87 ms, sys: 4.71 ms, total: 13.6 ms
Wall time: 62.9 ms


57

In [34]:
raw_data.take(5)

[('3_van_buren_1837.txt',
  'Martin Van Buren\t1837-03-04\tFellow-Citizens: The practice of all my predecessors imposes on me an obligation I cheerfully fulfill--to accompany the first and solemn act of my public trust with an avowal of the principles that will guide me in performing it and an expression of my feelings on assuming a charge so responsible and vast. In imitating their example I tread in the footsteps of illustrious men, whose superiors it is our happiness to believe are not found on the executive calendar of any country. Among them we recognize the earliest and firmest pillars of the Republic--those by whom our national independence was first declared, him who above all others contributed to establish it on the field of battle, and those whose expanded intellect and patriotism constructed, improved, and perfected the inestimable institutions under which we live. If such men in the position I now occupy felt themselves overwhelmed by a sense of gratitude for this the high

In [59]:
import string
translator = str.maketrans('', '', string.punctuation)

tokenized_data = raw_data.map(lambda p: (p[0], sorted(list(set((p[1].replace('-', ' ').translate(translator).lower().split(" ")))))))
tokenized_data.take(5)

[('3_van_buren_1837.txt',
  ['',
   '03',
   '04\tfellow',
   'a',
   'abiding',
   'ability',
   'abolish',
   'about',
   'above',
   'abridging',
   'abroad',
   'absent',
   'abundantly',
   'accompany',
   'accordance',
   'account',
   'achieved',
   'act',
   'action',
   'actions',
   'actual',
   'actually',
   'actuated',
   'adapted',
   'add',
   'adequate',
   'adequately',
   'adherence',
   'administered',
   'adopted',
   'advantages',
   'adverse',
   'affairs',
   'affect',
   'affection',
   'again',
   'against',
   'age',
   'aggregate',
   'aggression',
   'agitation',
   'ago',
   'agreeing',
   'aid',
   'aims',
   'alacrity',
   'all',
   'allayed',
   'alleged',
   'alliances',
   'alone',
   'along',
   'already',
   'also',
   'altogether',
   'always',
   'am',
   'america',
   'american',
   'amid',
   'amidst',
   'among',
   'amount',
   'ample',
   'an',
   'ancient',
   'and',
   'anticipate',
   'anticipated',
   'anticipation',
   'anxious',
   'anxi

In [60]:
tokenized_data.flatMap(lambda p: p[1]).map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y).take(10)

[('', 46),
 ('04\tfellow', 13),
 ('above', 18),
 ('abridging', 1),
 ('action', 29),
 ('actual', 8),
 ('actuated', 2),
 ('adapted', 7),
 ('adherence', 6),
 ('advantages', 10)]

In [61]:
''' for each speech
        each word -> word, 1
        reduce -> word, wordcount
        take only word 
        collect
        
    sort

'''

all_words = tokenized_data.flatMap(lambda p: p[1]) \
                        .map(lambda w: (w, 1)) \
                        .reduceByKey(lambda x, y: x + y) \
                        .map(lambda p: p[0]).collect()
all_words = sorted(all_words)

In [62]:
print(all_words[0:20]) # there are still fragment elements
print('...')
print(len(all_words))

['', '03', '04', '04\tabout', '04\tcalled', '04\tcitizens', '04\tfellow', '04\tfriends', '04\ti', '04\tin', '04\tmy', '04\tproceeding', '04\tthe', '04\tunwilling', '04\twhen', '05\telected', '05\tfellow', '1', '100000000', '120000000']
...
9347


In [63]:
total_words = len(all_words)

def buildVector(words):
    
    # build 'C1' : vectors.sparse(7, [0,1,5,6], [1.0, 1.0, 1.0, 1.0])
    
    
    indexList = []
    checkList = []
    
    for w in words:
        # ensure that that word is in these lists
        indexList.append(all_words.index(w))
        checkList.append(1.0)

    # return all    
    return Vectors.sparse(total_words, indexList, checkList)

In [64]:
dataC = tokenized_data.map(lambda p: (p[0], buildVector(p[1])))

In [65]:
dfC = spark.createDataFrame(dataC, ["id", "features"])
mhC = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
modelC = mhC.fit(dfC)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelC.transform(dfC).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+--------------------+--------------------+--------------------+
|                  id|            features|              hashes|
+--------------------+--------------------+--------------------+
|3_van_buren_1837.txt|(9347,[0,1,6,100,...|[[3467931.0], [90...|
|    7_nixon_1973.txt|(9347,[0,60,69,81...|[[4399264.0], [15...|
|_washington_1789.txt|(9347,[0,2,22,77,...|[[3265665.0], [40...|
|   0_reagan_1985.txt|(9347,[0,21,33,40...|[[1605265.0], [21...|
|  3_clinton_1997.txt|(9347,[0,53,63,70...|[[1605265.0], [66...|
|   7_pierce_1853.txt|(9347,[0,1,10,41,...|[[1605265.0], [35...|
| 4_harrison_1841.txt|(9347,[0,1,4,100,...|[[2637731.0], [21...|
|    6_obama_2009.txt|(9347,[0,93,100,1...|[[572799.0], [219...|
|5_cleveland_1885.txt|(9347,[0,1,6,100,...|[[1605265.0], [15...|
|    _monroe_1821.txt|(9347,[1,6,19,34,...|[[3467931.0], [26...|
|  2_jackson_1833.txt|(9347,[0,1,12,100...|[[1706398.0], [66...|
|  1_jackson_182

In [68]:
finalC = model.approxSimilarityJoin(dfC, dfC, 0.8, distCol="distance")
finalC.createOrReplaceTempView("finalC")

sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalC where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show(50)

+--------------------+--------------------+--------------------+--------------------+------------------+
|                  id|            features|                  id|            features|          distance|
+--------------------+--------------------+--------------------+--------------------+------------------+
|0_adams_john_quin...|(9347,[0,1,9,100,...|     5_polk_1845.txt|(9347,[0,1,6,35,1...|0.7608213096559379|
|0_adams_john_quin...|(9347,[0,1,9,100,...|   7_pierce_1853.txt|(9347,[0,1,10,41,...|0.7754982415005862|
|0_adams_john_quin...|(9347,[0,1,9,100,...| 8_buchanan_1857.txt|(9347,[0,1,6,100,...|0.7941747572815534|
|0_adams_john_quin...|(9347,[0,1,9,100,...|  9_lincoln_1861.txt|(9347,[0,1,6,26,2...|0.7923264311814859|
|0_adams_john_quin...|(9347,[0,1,9,100,...|_adams_john_1797.txt|(9347,[1,14,23,10...|0.7783149171270718|
|0_adams_john_quin...|(9347,[0,1,9,100,...| _jefferson_1801.txt|(9347,[0,1,7,100,...|0.7954876273653566|
|0_adams_john_quin...|(9347,[0,1,9,100,...| _jefferson_