### Load Citigroup dataset from json

In [4]:
import json

citi_data = []
with open('webhose_citigroup.json', 'r') as f:
    for line in f.readlines():
        citi_data.append(json.loads(line))

citi_titles = [a['title'] for a in citi_data]

In [5]:
%env PYSPARK_PYTHON=python3

env: PYSPARK_PYTHON=python3


### Set comes with duplicates:

In [6]:
len(citi_titles), len(set(citi_titles))

(11358, 9277)

### We can use spark's MinHash over character n-grams
#### We create a dataframe containing the title as a string in one column and a list of characters in another

In [8]:
#pip install pyspark

In [9]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)
df = spark.createDataFrame([
    (k, t, list(t)) for k, t in enumerate(citi_titles) if len(list(t)) >=3],
    ['id', 'title', 'title_characters'])
df.select('title').show()

+--------------------+
|               title|
+--------------------+
|Amarin (NASDAQ:AM...|
|Amarin (NASDAQ:AM...|
|Hess (NYSE:HES) G...|
|Standard Chartere...|
|Apergy Target of ...|
|WPP (LON:WPP) Sto...|
|XP Inc.’s (NYSE:X...|
|Caci Internationa...|
|Caci Internationa...|
|DA Davidson Reite...|
|State Street Corp...|
|FY2020 EPS Estima...|
|Zoetis Inc (NYSE:...|
|FY2020 EPS Estima...|
|Royal Bank of Can...|
|FY2020 EPS Estima...|
|State Street Corp...|
|ST. LOUIS -- (Bus...|
|BETHLEHEM, Pa., J...|
|Reinsurance Group...|
+--------------------+
only showing top 20 rows



#### Now we use spark to select character n-grams (shingles)

In [10]:
from pyspark.ml.feature import NGram

ngram = NGram(n=3, inputCol='title_characters', outputCol='ngrams')
ngram_df = ngram.transform(df)
ngram_df.select('ngrams').show()

+--------------------+
|              ngrams|
+--------------------+
|[A m a, m a r, a ...|
|[A m a, m a r, a ...|
|[H e s, e s s, s ...|
|[S t a, t a n, a ...|
|[A p e, p e r, e ...|
|[W P P, P P  , P ...|
|[X P  , P   I,   ...|
|[C a c, a c i, c ...|
|[C a c, a c i, c ...|
|[D A  , A   D,   ...|
|[S t a, t a t, a ...|
|[F Y 2, Y 2 0, 2 ...|
|[Z o e, o e t, e ...|
|[F Y 2, Y 2 0, 2 ...|
|[R o y, o y a, y ...|
|[F Y 2, Y 2 0, 2 ...|
|[S t a, t a t, a ...|
|[S T ., T .  , . ...|
|[B E T, E T H, T ...|
|[R e i, e i n, i ...|
+--------------------+
only showing top 20 rows



#### And transform those in-grams into binary vectors we can use in MinHash

In [11]:
from pyspark.ml.feature import CountVectorizer 

count_vectorizer = CountVectorizer(inputCol='ngrams', outputCol='vector', binary=True)
model = count_vectorizer.fit(ngram_df)
cv_df = model.transform(ngram_df)

# the vectors are displayed in 'sparse' format, 
# i.e. the numbers shown are the indices i of vector x where x[i]=1, 
# and x[k]=0 for all other k
cv_df.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(12947,[0,5,6,8,9...|
|(12947,[0,5,6,8,9...|
|(12947,[0,1,2,3,4...|
|(12947,[5,6,7,8,9...|
|(12947,[0,1,2,3,4...|
|(12947,[8,9,11,12...|
|(12947,[0,1,2,3,4...|
|(12947,[0,1,2,3,4...|
|(12947,[0,1,2,3,4...|
|(12947,[0,7,8,11,...|
|(12947,[0,1,2,3,4...|
|(12947,[1,7,9,12,...|
|(12947,[0,1,2,3,4...|
|(12947,[7,9,11,28...|
|(12947,[0,1,2,3,4...|
|(12947,[1,7,9,12,...|
|(12947,[0,1,2,3,4...|
|(12947,[12,17,20,...|
|(12947,[38,292,32...|
|(12947,[7,8,14,17...|
+--------------------+
only showing top 20 rows



#### Now we can apply MinHash

In [12]:
from pyspark.ml.feature import MinHashLSH

min_hash = MinHashLSH(inputCol='vector', outputCol='minHash', seed=0, numHashTables=10)
model = min_hash.fit(cv_df)
hash_df = model.transform(cv_df)

# We now have the min hash values for the dataset of article titles
hash_df.select('minHash').show()

+--------------------+
|             minHash|
+--------------------+
|[[6498880.0], [95...|
|[[6498880.0], [95...|
|[[1.5733472E7], [...|
|[[4.1623735E7], [...|
|[[6.7513998E7], [...|
|[[6333835.0], [7....|
|[[1.5898517E7], [...|
|[[1.3424824E7], [...|
|[[1.3424824E7], [...|
|[[1.3754914E7], [...|
|[[4.6406076E7], [...|
|[[2.2989506E7], [...|
|[[1.5898517E7], [...|
|[[2.2989506E7], [...|
|[[4.1623735E7], [...|
|[[2.2989506E7], [...|
|[[4.6406076E7], [...|
|[[1.8207165E7], [...|
|[[1.14842273E8], ...|
|[[5.3497065E7], [...|
+--------------------+
only showing top 20 rows



In [13]:
joined_rows = model.approxSimilarityJoin(cv_df, cv_df, threshold=0.05, distCol='jaccard_distance')
# the returned dataframe will be pairs of rows where each pair has an estimated distance of at most the threshold
joined_rows.show()

+--------------------+--------------------+----------------+
|            datasetA|            datasetB|jaccard_distance|
+--------------------+--------------------+----------------+
|{8602, Aston Mart...|{8602, Aston Mart...|             0.0|
|{1167, On The Bea...|{138, On The Beac...|             0.0|
|{7158, EVO Paymen...|{7160, EVO Paymen...|             0.0|
|{10912, Tandem Di...|{10913, Tandem Di...|             0.0|
|{6500, CMS Energy...|{5318, CMS Energy...|             0.0|
|{9437, PhaseBio P...|{9464, PhaseBio P...|             0.0|
|{4768, Facebook (...|{4768, Facebook (...|             0.0|
|{9846, News Highl...|{9576, News Highl...|             0.0|
|{6795, Canary Wha...|{6795, Canary Wha...|             0.0|
|{5285, Brokerages...|{5285, Brokerages...|             0.0|
|{2534, SYSMEX COR...|{2534, SYSMEX COR...|             0.0|
|{3044, Bank of Am...|{2952, Bank of Am...|             0.0|
|{10846, Royal Dut...|{10846, Royal Dut...|             0.0|
|{7291, NVIDIA NVD...|{7

#### We can now see all duplicates of a title

In [14]:
from pyspark.sql.functions import col
joined_rows.filter(joined_rows.datasetA.id == 1561).select(col('datasetA.title'), col('datasetB.title')).show()

+--------------------+--------------------+
|               title|               title|
+--------------------+--------------------+
|Okta (NASDAQ:OKTA...|Okta (NASDAQ:OKTA...|
|Okta (NASDAQ:OKTA...|Okta (NASDAQ:OKTA...|
+--------------------+--------------------+



#### And we can deduplicate the whole dataset if we want to

In [15]:
# this will return the first id for each title in the dataset
deduplicated_df = joined_rows.groupby(col('datasetA.title')).min('datasetA.id')
deduplicated_df.show()

+--------------------+----------------------+
|               title|min(datasetA.id AS id)|
+--------------------+----------------------+
|Hoertkorn Richard...|                 10066|
|AMC Entertainment...|                  1253|
|Citigroup Reaffir...|                  5435|
|ITV (LON:ITV) Sto...|                  7801|
|Cellectis’ (CLLS)...|                  9734|
|Credit Suisse Gro...|                  3201|
|Royal Mail (LON:R...|                  3902|
|LYFT (NASDAQ:LYFT...|                  6689|
|Laureate Educatio...|                 11192|
|Marks and Spencer...|                  2986|
|BidaskClub Upgrad...|                  2384|
|Playa Hotels & Re...|                  7578|
|National Bank Fin...|                   749|
|Avalara (NYSE:AVL...|                  8328|
|Welbilt, Inc to P...|                  9904|
|Green Dot (NYSE:G...|                  3808|
|Brokerages Set Da...|                  8665|
|Xilinx (NASDAQ:XL...|                  8327|
|Warrior Met Coal ...|            

In [16]:
deduplicated_titles = deduplicated_df.toPandas()
deduplicated_titles = deduplicated_titles.rename(columns = {'min(datasetA.id AS id)':'index'})

print("Number of duplicates (MinHash): " + str(len(citi_titles)-len(deduplicated_titles)))
print("Number of deduplicates (MinHash): " + str(len(deduplicated_titles)))

Number of duplicates (MinHash): 2081
Number of deduplicates (MinHash): 9277


In [17]:
import pandas as pd
df_citi_newsfeeds = pd.read_json("webhose_citigroup.json", lines=True)
df_citi_newsfeeds.reset_index(inplace=True)

In [18]:
new_citi_news_df = deduplicated_titles.merge(df_citi_newsfeeds, on='index', how='left')
new_citi_news_df.drop(['title_x', 'index'], axis=1, inplace=True)
new_citi_news_df.to_json('deduplicated_newsfeeds.json', orient = 'split', compression = 'infer', index = 'true')