<a href="https://colab.research.google.com/github/arva29/AMDproject/blob/main/UkraineWarTweetsProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **AMD PROJECT**

## *Project 1 - Finding similar items*


> *The task is to implement from scratch a detector of pairs of similar tweets, considering the text column of the dataset and selecting tweets written in a given language.*


## **Environment preparation**

These lines of code are meant to be run every time in order to properly set the work environment

In [7]:
# this is meant to be run on google colab

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init("spark-2.4.5-bin-hadoop2.7") #SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

sc = spark.sparkContext

TypeError: ignored

## **Getting dataset**

I'm going to download directly the dataset with the kaggle API. The dataset is a collection of tweets about Ukraine war divided into csv files, one per day.

The dataset could be found at the following link:
[Ukraine War Tweets dataset](https://www.kaggle.com/datasets/bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows?resource=download) 

In [None]:
from google.colab import files

#Upload the file kaggle.json from your disk
uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
  
# Then move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
User uploaded file "kaggle.json" with length 70 bytes


In [None]:
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows

Downloading ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip to /content
100% 12.7G/12.7G [01:42<00:00, 146MB/s]
100% 12.7G/12.7G [01:42<00:00, 134MB/s]


In [None]:
!unzip ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip

## **Datasets extraction**

Extracting the data from a chosen dataset and transform them in an rdd

### *Single day dataset*

In [None]:
import csv
import gzip

data_list = []

with gzip.open('1008_UkraineCombinedTweetsDeduped.csv.gzip', 'rt') as f:
    reader = csv.DictReader(f)
    for row in reader:
        data_list.append(row)

rdd = sc.parallelize(data_list)
rdd.cache()

ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195

### *Single day dataset (5000 rows)* 


In [None]:
import csv
import gzip

data_list = []
count = 0

with gzip.open('1008_UkraineCombinedTweetsDeduped.csv.gzip', 'rt') as f:
    reader = csv.DictReader(f)
    for row in reader:
        count += 1
        data_list.append(row)
        if(count >= 5000): break

rdd = sc.parallelize(data_list)
rdd.cache()

### Test rdd

In [None]:
rdd.count()

In [None]:
rdd.first()

OrderedDict([('', '130404'),
             ('userid', '1603709857'),
             ('username', 'trajanpublisher'),
             ('acctdesc',
              "We are Trajan Media, owner of Canada's premier numismatic and philatelic publications –\xa0Canadian Coin News and Canadian Stamp News –\xa0plus Coin & Stamp Supplies."),
             ('location', 'Canada'),
             ('following', '190'),
             ('followers', '916'),
             ('totaltweets', '7215'),
             ('usercreatedts', '2013-07-18 15:23:57'),
             ('tweetid', '1578535646146199554'),
             ('tweetcreatedts', '2022-10-08 00:00:00'),
             ('retweetcount', '1'),
             ('text',
              'Canada joined 15 countries whose flags grace the obverse of Ukraine’s first commemorative coin issued since the beginning of Russia’s renewed invasion this February.\n\nRead more: https://t.co/vRhROceH9x\n\n#numismatics #Ukraine️ #collectcoins'),
             ('hashtags',
              "[{'text':

Tokenize tweet by removing puntuation and stop words in order to have only meaningful words to understand the meaning of the tweet. I also put all the words lower case and remove the link at the end of the tweets.

## **Tokenization**

In order to have tweets ready to work on, I need to do the following steps:


1.   *Filter* only english tweets
2.   *Clear* tweets by removing emojis, tags, urls and punctuation (keeping hashtags because they give important information about the context and are useful in finding similarity)
3.   *Tokenized* cleared tweets



Clean-text is a library useful to remove from string undesired substrings like email, tags, urls, line-breaks, etc.. 

In [None]:
!pip install clean-text

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting clean-text
  Downloading clean_text-0.6.0-py3-none-any.whl (11 kB)
Collecting emoji<2.0.0,>=1.0.0
  Downloading emoji-1.7.0.tar.gz (175 kB)
[K     |████████████████████████████████| 175 kB 4.9 MB/s 
[?25hCollecting ftfy<7.0,>=6.0
  Downloading ftfy-6.1.1-py3-none-any.whl (53 kB)
[K     |████████████████████████████████| 53 kB 2.1 MB/s 
Building wheels for collected packages: emoji
  Building wheel for emoji (setup.py) ... [?25l[?25hdone
  Created wheel for emoji: filename=emoji-1.7.0-py3-none-any.whl size=171046 sha256=30f3f5cffcc16086a214e4eda40fdb9614569ce0298702ed30d42470bf0275a9
  Stored in directory: /root/.cache/pip/wheels/8a/4e/b6/57b01db010d17ef6ea9b40300af725ef3e210cb1acfb7ac8b6
Successfully built emoji
Installing collected packages: ftfy, emoji, clean-text
Successfully installed clean-text-0.6.0 emoji-1.7.0 ftfy-6.1.1


Two versions of the tokenize function:


1.   Without stopwords
2.   With stopwords

I decided to work with the one with stopwords because, as stated in the textbook: 

> " *for the problem of finding similar news articles, it was found that defining a shingle to be a stop word followed by the next two words, regardless of whether or not they were stop words, formed a useful set of shingles* "

and I thought the case of finding similar tweets was quite similar to the one of finding similar news articles



In [None]:
import nltk
from nltk.tokenize import TweetTokenizer
from nltk.corpus import stopwords
from string import punctuation
from cleantext import clean

nltk.download('stopwords')
stopw = set(stopwords.words('english'))

tokenizer = TweetTokenizer()

# Without stopwords
#tokenize = lambda text: [word for word in tokenizer.tokenize(clean(text, no_urls=True, no_emails=True, no_emoji=True, no_line_breaks=True, replace_with_email="", replace_with_url="")) if word not in punctuation and word not in stopw]

# With stopwords
tokenize = lambda text: [word for word in tokenizer.tokenize(clean(text, no_urls=True, no_emails=True, no_emoji=True, no_line_breaks=True, replace_with_email="", replace_with_url="")) if word not in punctuation]

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


After filtering, clearing and tokenizing I maintain the RDD of the form: 

> *(tweet_id, tokenized_array)*

In [None]:
import numpy as np

def sort(list):
  list.sort()
  return list

en_tweets = rdd.filter(lambda x: x.get("language")=='en').map(lambda x: (x, x.get("text")))

tweets_tokenized = en_tweets.map(lambda x: (x[0].get("tweetid"), tokenize(x[1])))

tweets_tokenized.first()

('1578535646146199554',
 ['canada',
  'joined',
  '15',
  'countries',
  'whose',
  'flags',
  'grace',
  'the',
  'obverse',
  'of',
  "ukraine's",
  'first',
  'commemorative',
  'coin',
  'issued',
  'since',
  'the',
  'beginning',
  'of',
  "russia's",
  'renewed',
  'invasion',
  'this',
  'february',
  'read',
  'more',
  '#numismatics',
  '#ukraine',
  '#collectcoins'])

## ***K*-Shingles from words**

From the tokenized tweet I built the list of shingle of the user desired dimension. I also sorted the list of shingles in order to speed up the next processes

In [None]:
K = int(input("Enter the number K for k-grams dimension: "))

def build_shingles(tokens, K):
  shingles = list()
  tmp = ""

  for i in range(len(tokens)):
    tmp = tokens[i]
    if((len(tokens) - (i)) < K):
      break;
    for k in range(K - 1):
      tmp += " " + tokens[i + (k + 1)]
    shingles.append(tmp)

  return shingles

shingles_rdd = tweets_tokenized.map(lambda x: (x[0], sort(build_shingles(x[1], K))))
shingles_rdd.count()

Enter the number K for k-grams dimension: 3


40613

## **Building Bag of Shingles**

I build an ordered bag of shingles from the shingles of all the tweets 

In [None]:
shingles_bag = sort(shingles_rdd.flatMap(lambda x: x[1]).distinct().collect())
shingles_bag[:20]

['##biggerthanme #osimhen #dazn',
 '##crimea #tma2022 #thefactmusicawards2022',
 '##crimea as #ukraine',
 '##dpr allied forces',
 '##droz #democraticviolence #gpexplorer',
 '##iranianlivesmatter #iranprotests2022 #tv3newday',
 '##kimjongun and #danielortega',
 '##kpworldtoursingapore #kerch #weverse',
 '##marvel #ironman #thor',
 '##nctdream #pakistanzindabad #case143',
 '##osce #un #ukrainerussiawar',
 '##pcr #communist regime',
 '##putin our beloved',
 "##putin's #dictatorship #collapse",
 '##raska appointed the',
 '##russia #ukraine is',
 '##russia and #putin',
 '##seonghwa #seonghwa_closet #tma2022',
 '##transform the #battle',
 '##trump wanted the']

I use integers from 0 to *n* (with n equal to the length of shingles_bag) to store the shingles list.

In [None]:
shingles_bag_id = list(range(0, len(shingles_bag)))
len(shingles_bag_id)

572145

### **Converting shingles**

For each tweet I convert its shingles into integer corresponding to the position of the shingle into the Bag of Shingle array. In this way I have all the shingles encoded as integers and not as strings.

In [None]:
def convertShingles(shingles, BoS):
  listToReturn = list()
  start_index = 0;

  for s in shingles:
    for i in range(start_index, len(BoS)):
      if(s == BoS[i]):
        listToReturn.append(i)
        start_index = i
        break;

  return listToReturn

shingles_int_rdd = shingles_rdd.map(lambda x: (x[0], convertShingles(x[1], shingles_bag)))
#shingles_int_rdd.collect()
shingles_int_rdd.cache()
shingles_int_rdd.first()

('1578535646146199554',
 [33966,
  73051,
  154698,
  177094,
  185133,
  186882,
  192327,
  228680,
  232078,
  232769,
  253842,
  292272,
  300909,
  307414,
  338727,
  358963,
  366613,
  369433,
  407541,
  411981,
  420906,
  437407,
  469760,
  478355,
  493942,
  522597,
  552387])

## **Permutation matrix**

Now that I have the shingles encoded as integers I can take the Bag of Shingles and generate *n* permutations of it.

The permutations are generated by applying the following equation to every element of the array: 

\begin{equation}
element_{permuted} = (a \cdot element_{before\_permutation} + b) \mod length_{bag\_of\_shingles}
\end{equation}

Where *a* and *b* are two integers randomly generated between 0 and 1000. In this way I can generate *n* permutation only by chamging the values of *a* and *b*. This equation also guarantees that the integer resulting from it is between 0 and the length of the array, so the final array is actually a permutation of the initial one.

In [None]:
def generatePermutation(arrayToHash, a, b):
  arrayToReturn = arrayToHash.copy()
  
  for i in range(len(arrayToHash)):
    arrayToReturn[i] = (a * arrayToHash[i] + b) % len(arrayToHash)

  return arrayToReturn

n = int(input("Insert number n of permutation desired: "))
permutation_matrix = list()

for i in range(n):
  #Generate 2 random numbers in order to have different hashing by randomly changing only these two numbers
  a = np.random.randint(len(shingles_bag_id)) 
  b = np.random.randint(len(shingles_bag_id))
  #print(a, b)

  permutation_matrix.append(generatePermutation(shingles_bag_id, a, b))
  #print(permutation_matrix[i][:30])

Insert number n of permutation desired: 40


## **Minhashing**

For each tweet I compute the minhash signature based on the *n* permutations generated before

In [None]:
def calculateMinhashSignature(perm_mtx, shingle_array):
  sig_column = [np.inf] * len(perm_mtx)

  for i in shingle_array:
    for j in range(len(sig_column)):
      if(perm_mtx[j][i] < sig_column[j]):
        sig_column[j] = perm_mtx[j][i]

  return sig_column

minhash_sig_rdd = shingles_int_rdd.map(lambda x: (x[0], calculateMinhashSignature(permutation_matrix, x[1])))
minhash_sig_rdd.collect()
minhash_sig_rdd.persist()

PythonRDD[10] at collect at <ipython-input-15-e87837df734b>:12

## **LSH**

I divide the *n* minhash signatures into *b* bands of *r* rows each. I also calculate the threshold *t* based on the values of *b* and *r*.

Every row will be a tuple composed as following:


> *(tweet_information, band_information)*

Where *tweet_information* are composed by *tweet_id* and the list of minhash signatures that represent the tweet, whereas *band_information* are composed by the *band_id* and the list of minhash signatures in that band.





In [None]:
b = int(input("Enter into how many bands you want to divide the " + str(n) +" permutations: "))
r = n / b

#THRESHOLD
t = (1/b) ** (1/r)
print(t)

def band_splitting(tweet_id, arrayToSplit, b, n):
  tmp = list()
  arrayToReturn = list()
  band_id = 1

  r = int(n/b)
  i = 0

  while i < len(arrayToSplit):
    for j in range(r):
      tmp.append(arrayToSplit[i + j])
    
    i += r
    arrayToReturn.append(((tweet_id, arrayToSplit), (band_id, tmp.copy())))
    band_id += 1
    tmp.clear()

  return arrayToReturn

sig_bands_rdd = minhash_sig_rdd.flatMap(lambda x: band_splitting(x[0], x[1], b, n))
sig_bands_rdd.take(10)

Enter into how many bands you want to divide the 40 permutations: 8
0.6597539553864471


[(('1578535646146199554',
   [6261,
    12954,
    15077,
    13942,
    42549,
    5118,
    25944,
    12844,
    34043,
    27712,
    4599,
    8592,
    1600,
    2018,
    9951,
    61563,
    12164,
    6551,
    12187,
    3105,
    52854,
    5788,
    12806,
    29925,
    7713,
    17214,
    49696,
    22082,
    5944,
    48733,
    62109,
    14721,
    52805,
    1891,
    86400,
    12554,
    7762,
    84,
    5083,
    16493]),
  (1, [6261, 12954, 15077, 13942, 42549])),
 (('1578535646146199554',
   [6261,
    12954,
    15077,
    13942,
    42549,
    5118,
    25944,
    12844,
    34043,
    27712,
    4599,
    8592,
    1600,
    2018,
    9951,
    61563,
    12164,
    6551,
    12187,
    3105,
    52854,
    5788,
    12806,
    29925,
    7713,
    17214,
    49696,
    22082,
    5944,
    48733,
    62109,
    14721,
    52805,
    1891,
    86400,
    12554,
    7762,
    84,
    5083,
    16493]),
  (2, [5118, 25944, 12844, 34043, 27712])),
 (('15785356

### **Hashing bands**

I hash every bands in order to get a single integer from a list of *r* integers. To do this, I transform the list of integers into a string composed by the integers separated by a blank space (e.g. [ 1, 3, 45 ] ---> "1 3 45").

From that, I can hash the string to an integer value. In this way, string that are equal will hash to the same number.

Having bands encoded as integers allows me to get the rdd in the form of:

> ( ( *hash_value*, *band_id* ), (*tweet_id*, *signatures_array*) )

From that I can perform a *groupByKey* to get the list of tweets that hash to the same bucket in the same band.

In [None]:
import hashlib

def hashArray(array):
  col_str = ' '.join(str(x) for x in array)

  return hash(col_str)

sig_grouped_rdd = sig_bands_rdd.map(lambda x: ((hashArray(x[1][1]), x[1][0]), x[0])).groupByKey().map(lambda x: (x[0], list(x[1])))
sig_grouped_rdd.collect()
sig_grouped_rdd.persist()
sig_grouped_rdd.take(10)

[((635658310934313577, 1),
  [('1578535646146199554',
    [6261,
     12954,
     15077,
     13942,
     42549,
     5118,
     25944,
     12844,
     34043,
     27712,
     4599,
     8592,
     1600,
     2018,
     9951,
     61563,
     12164,
     6551,
     12187,
     3105,
     52854,
     5788,
     12806,
     29925,
     7713,
     17214,
     49696,
     22082,
     5944,
     48733,
     62109,
     14721,
     52805,
     1891,
     86400,
     12554,
     7762,
     84,
     5083,
     16493])]),
 ((7711896064319513945, 2),
  [('1578535646146199554',
    [6261,
     12954,
     15077,
     13942,
     42549,
     5118,
     25944,
     12844,
     34043,
     27712,
     4599,
     8592,
     1600,
     2018,
     9951,
     61563,
     12164,
     6551,
     12187,
     3105,
     52854,
     5788,
     12806,
     29925,
     7713,
     17214,
     49696,
     22082,
     5944,
     48733,
     62109,
     14721,
     52805,
     1891,
     86400,
     12554,
     7

## **Group LSH results**

To get for each tweet the list of others that hash to the same bucket, I do the following:


1.   *Filter* to get only hash value for which there are more than one tweet
2.   Apply a *flatMap* to have a rdd with tuples composed by (tweet_id, [ list_of_similar_tweets ])
3. Apply a *reduceByKey* to remove any duplicates generated by the previous step and to join list of similar tweets corresponding to the same key tweet.

After these 3 steps, I will have an rdd with only tweets that have similar tweets associated and for each of them as key I will have, as value, the list of tweet that are similar to the key tweet.


In [None]:
# For each list of tweets, return a list of tuples with keys the tweets of the list 
# and as values the list of tweets given as parameter
def array_splitting(id_array):
  listToReturn = list()
  

  for id in id_array:
    listToReturn.append((id[0], id_array))

  return listToReturn

def removeDuplicate(x, y):
  listToReturn = x.copy()
  x_id = [i[0] for i in x]
  y_id = [i[0] for i in y]
  
  for i, id in enumerate(y_id):
    if id not in x_id:
      listToReturn.append(y[i]) 

  return listToReturn
  

similar_tweets_rdd = sig_grouped_rdd.filter(lambda x: len(x[1]) > 1).flatMap(lambda x: array_splitting(x[1])).reduceByKey(lambda x,y: removeDuplicate(x , y))

In [None]:
similar_tweets_rdd.take(2)

[('1578554508342272000',
  [('1578535891491954688',
    [6065,
     2447,
     1155,
     1139,
     65379,
     63903,
     19764,
     3808,
     9683,
     1531,
     20157,
     12177,
     11716,
     6680,
     596,
     22323,
     22273,
     3796,
     10245,
     26618,
     15,
     23038,
     2299,
     31546,
     18290,
     2864,
     15126,
     92026,
     34124,
     46466,
     22869,
     22106,
     17414,
     36947,
     20424,
     8558,
     100263,
     17750,
     10177,
     2457]),
   ('1578554508342272000',
    [6065,
     2447,
     1155,
     1139,
     65379,
     63903,
     19764,
     3808,
     9683,
     1531,
     20157,
     12177,
     11716,
     6680,
     596,
     22323,
     22273,
     3796,
     10245,
     26618,
     15,
     23038,
     2299,
     31546,
     18290,
     2864,
     15126,
     92026,
     34124,
     46466,
     22869,
     22106,
     17414,
     36947,
     20424,
     8558,
     100263,
     17750,
     10177,
    

## **Calculate Jaccard Similarity**

Thanks to the LSH method, I have hashed tweets to different buckets. Now I consider tweets in the same buckets as *candidate pairs* to be similar tweet. So, I can now calculate the Jaccard similarity for each of the pair of tweets that are considered *candidate pairs*.
From this, I filter the rdd maintaing only the pairs that have a Jaccard similarity greater or equal than the threshod calculated above.

In [None]:
# Reconstruct as key the tuple (tweet_id, list_of_signatures)
def getSignature(id, tweet_list):
  sig = list()
  for t in tweet_list:
    if(t[0] == id):
      sig = t[1].copy()
      break
  return sig

# From (tweet, list_of_tweets) return a list of couples (tweet, tweet_in_the_list)
def flatFunct(tweet, tweet_list):
  listToReturn = list()
  print(tweet_list)
  for t in tweet_list:
    print(t)
    listToReturn.append((tweet, t))

  return listToReturn

# Calculate the Jaccard Similarities between the two sets given as parameters
def calculateJaccardSimilarity(tweet_1, tweet_2):
  d = len(tweet_1)
  sim_count = 0

  for i in range(d):
    if (tweet_1[i] == tweet_2[i]):
      sim_count += 1

  return sim_count/d

jacc_sim_rdd = similar_tweets_rdd.map(lambda x: ((x[0], getSignature(x[0], x[1])), x[1])).flatMap(lambda x: flatFunct(x[0], x[1])).map(lambda x: ((x[0][0], x[1][0]), calculateJaccardSimilarity(x[0][1], x[1][1])))

Simply removing for each tweet itself from the list of similar tweets

In [None]:
def removeElement(id, array):
  array.remove(id)
  return array

similar_tweets_rdd = jacc_sim_rdd.filter(lambda x: x[1] > t).map(lambda x: x[0]).groupByKey().map(lambda x: (x[0], removeElement(x[0], list(x[1]))))
similar_tweets = similar_tweets_rdd.collect()

## **Print results**

Print for each tweet the text of the similar tweets associated to it.

In [None]:
count = 0

for tweet, listSimilarTweets in similar_tweets:
  count += 1
  print("--- TWEET ID: " + tweet + " ---")
  listSimilarTweets.insert(0, tweet)
  [print(" -  " + i) for i in (rdd.filter(lambda x: x.get("tweetid") in listSimilarTweets).map(lambda x: x.get("text").replace('\n', '')).collect())]
  print("\n")

  if count == 100:
    break


--- TWEET ID: 1578554508342272000 ---
 -  #weekendvibes #Putin and #Zelenskyy should sit down and smoke a spliff together. 💨💭✌️80,s Tunes.🚀🔥💃Aptapogens triangle from TCM.😇👇🍻AstragalusGinsengAshwaghanda #RETWEEET https://t.co/moHjn1XrHC
 -  #weekendvibes #Putin and #Zelenskyy should sit down and smoke a spliff together. 💨💭✌️80,s Tunes.🚀🔥💃Aptapogens triangle from TCM.😇👇🍻AstragalusGinsengAshwaghanda #RETWEEET https://t.co/moHjn1XrHC


--- TWEET ID: 1578535911444254720 ---
 -  #odessa #odesa #ukraine #одессаNow: 14.1°CToday's Min: 14.2°C at 02:33:24Today's Max: 15.3°C at 00:00:00Month's Min: 10.7°C at 2022-10-04 06:33:41Month's Max: 24.2°C at 2022-10-01 13:01:39
 -  #odessa #odesa #ukraine #одессаNow: 13.9°CToday's Min: 13.9°C at 03:43:42Today's Max: 15.3°C at 00:00:00Month's Min: 10.7°C at 2022-10-04 06:33:41Month's Max: 24.2°C at 2022-10-01 13:01:39
 -  #odessa #odesa #ukraine #одессаNow: 13.6°CToday's Min: 13.6°C at 04:54:02Today's Max: 15.3°C at 00:00:00Month's Min: 10.7°C at 2022-10-0

# **Comparing result**

## **LSH from datasketch**

Implementing the same procedure as above by using the [datasketch library](https://pypi.org/project/datasketch/).

In [2]:
!pip install datasketch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting datasketch
  Downloading datasketch-1.5.8-py2.py3-none-any.whl (76 kB)
[K     |████████████████████████████████| 76 kB 3.4 MB/s 
Installing collected packages: datasketch
Successfully installed datasketch-1.5.8


In [None]:
tweets_shingled = shingles_rdd.collect()
tweets_shingled[:10]

In [None]:
from datasketch import MinHash, MinHashLSH

def minhashFunction():
  
  result = lsh.query(m1)

  return result

minhash_array = shingles_rdd.map(lambda x: (x[0], x[1], MinHash(num_perm=n))).map(lambda x: [x[2].update(d.encode('utf8')) for d in x[1]])

In [None]:
lsh = MinHashLSH(threshold=t, num_perm=n)

for e in minhash_array:
  lsh.insert(str(e[0]), e[2])

result = lsh.query(minhash_array[0][2])

In [None]:
from datasketch import MinHash, MinHashLSH

set1 = set(['minhash', 'is', 'a', 'probabilistic', 'data', 'structure', 'for',
            'estimating', 'the', 'similarity', 'between', 'datasets'])
set2 = set(['minhash', 'is', 'a', 'probability', 'data', 'structure', 'for',
            'estimating', 'the', 'similarity', 'between', 'documents'])
set3 = set(['minhash', 'is', 'probability', 'data', 'structure', 'for',
            'estimating', 'the', 'similarity', 'between', 'documents'])
set4 = set(['a','b','c'])

m1 = MinHash(num_perm=128)
m2 = MinHash(num_perm=128)
m3 = MinHash(num_perm=128)
for d in set1:
    m1.update(d.encode('utf8'))
for d in set2:
    m2.update(d.encode('utf8'))
for d in set3:
    m3.update(d.encode('utf8'))

# Create LSH index
lsh = MinHashLSH(threshold=0.5, num_perm=128)
lsh.insert("m2", m2)
lsh.insert("m3", m3)
result = lsh.query(m1)
print("Approximate neighbours with Jaccard similarity > 0.5", result)