In [1]:
# Duplicate Detection

#Minhash/LSH algorithm to detect 10 most similar entries in articles1.csv to a reference article. 
#The reference article is in the dataset, identified as Article ID 69716, 
#The Minhash/LSH algorithm relies on the concept of distances to define similarity. 
#For this project, Jaccard similarity is used.

In [2]:
#Install tools
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install datasketch

Note: you may need to restart the kernel to use updated packages.


In [4]:
#Create spark context
import os
import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/02 17:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/02 17:59:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/02 17:59:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/04/02 17:59:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/04/02 17:59:28 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [5]:
#Load data
data = spark.read.format("csv")\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load("articles1.csv") 

                                                                                

In [6]:
data.createOrReplaceTempView("dfTable")

In [7]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- url: string (nullable = true)
 |-- content: string (nullable = true)



In [8]:
from datasketch import MinHash
import pandas as pd

In [9]:
data.limit(1).toPandas()

23/04/02 17:59:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , id, title, publication, author, date, year, month, url, content
 Schema: _c0, id, title, publication, author, date, year, month, url, content
Expected: _c0 but found: 
CSV file: file:///Users/seungpang/Desktop/Big%20Data%20Midterm/articles1.csv


Unnamed: 0,_c0,id,title,publication,author,date,year,month,url,content
0,0,17283,House Republicans Fret About Winning Their Hea...,New York Times,Carl Hulse,2016-12-31,2016.0,12.0,,WASHINGTON — Congressional Republicans have...


In [10]:
# View reference data

# Article ID 69716, 
# “California lifted its mandatory water restrictions - 
# that could be a huge mistake”
spark.sql("SELECT id, title, content \
        FROM dfTable \
        WHERE id = 69716").show()

+-----+--------------------+--------------------+
|   id|               title|             content|
+-----+--------------------+--------------------+
|69716|California lifted...|’’ ’On Wednesday,...|
+-----+--------------------+--------------------+



In [11]:
#Install NLP package for text preprocessing
pip install spark-nlp

Note: you may need to restart the kernel to use updated packages.


In [12]:
pip install nltk

Note: you may need to restart the kernel to use updated packages.


In [13]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import regexp_replace

In [14]:
#Select relevant columns
data1 = data.select('id', 'title', 'content')
data1.limit(3).show()

+-----+--------------------+--------------------+
|   id|               title|             content|
+-----+--------------------+--------------------+
|17283|House Republicans...|WASHINGTON  —   C...|
|17284|Rift Between Offi...|After the bullet ...|
|17285|Tyrus Wong, ‘Bamb...|When Walt Disney’...|
+-----+--------------------+--------------------+



In [15]:
data1.createOrReplaceTempView("dfTable1")
data1.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)



In [16]:
# Total 50004 rows
data1.count()

50004

In [17]:
# Check for null values
from pyspark.sql.functions import col
data1.filter(data1.content.isNull()).show()

+--------------------+--------------------+-------+
|                  id|               title|content|
+--------------------+--------------------+-------+
|               28110|"UK Jobs Market C...|   null|
|               30419|"Cohn: Trump Beco...|   null|
|               33070|"Facebook ""Fake ...|   null|
|               46670|"DePaul Protester...|   null|
|               48580|Politico Scoop: D...|   null|
|               66261|                   "|   null|
| our Whole Foods ...| after we account...|   null|
|               71949|                   "|   null|
|               72340|                   "|   null|
+--------------------+--------------------+-------+



In [18]:
# Drop null rows - 49995 rows after 
data1 = data1.na.drop()
data1.count()

49995

In [19]:
data1.createOrReplaceTempView("dfTable1")

In [20]:
# Preprocess Text
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import regexp_replace

# regexTokenizer
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="regex", pattern="\\W")

regexTokenized = regexTokenizer.transform(data1)
# regexTokenized.select("regex") \
#     .withColumn("tokens", countTokens(col("regex"))).show(1)

In [21]:
# Remove stop words
# https://spark.apache.org/docs/latest/ml-features#tokenizer
remover = StopWordsRemover(inputCol="regex", outputCol="stop_words")
clean_df = remover.transform(regexTokenized)

In [22]:
# clean_df contains preprossed text
clean_df.show(2)

+-----+--------------------+--------------------+--------------------+--------------------+
|   id|               title|             content|               regex|          stop_words|
+-----+--------------------+--------------------+--------------------+--------------------+
|17283|House Republicans...|WASHINGTON  —   C...|[washington, cong...|[washington, cong...|
|17284|Rift Between Offi...|After the bullet ...|[after, the, bull...|[bullet, shells, ...|
+-----+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [26]:
# Only get id, and stop_words column
final_df = clean_df.select('id', 'stop_words')

In [27]:
final_df.limit(2).show()

+-----+--------------------+
|   id|          stop_words|
+-----+--------------------+
|17283|[washington, cong...|
|17284|[bullet, shells, ...|
+-----+--------------------+



In [28]:
spark.sql("SELECT id, content, title \
        FROM dfTable \
        WHERE content = ' '").show()

+-----+-------+--------------------+
|   id|content|               title|
+-----+-------+--------------------+
|60381|       |Wonders of the un...|
|60745|       |The week in 32 ph...|
|63359|       |Enchanting waterf...|
+-----+-------+--------------------+



In [30]:
from pyspark.sql import *
from pyspark.sql.functions import *

In [32]:
# https://www.geeksforgeeks.org/converting-a-pyspark-dataframe-column-to-a-python-list/
# ref - is the reference article
ref = clean_df.select('stop_words').where(clean_df.id == 69716).rdd.flatMap(lambda x: x).collect()

# final_df to flatMap
d = final_df.rdd.flatMap(lambda x: [x]).collect()

                                                                                

In [33]:
from datasketch import MinHash
from datasketch import MinHashLSH
from pyspark.sql.functions import encode

In [37]:
#Use MinHash and Compute Jaccard Similarty score
m1, m2 = MinHash(), MinHash()
jacard = []

# Reference article
# for j in ref[0]: 
#     m1.update(j.encode('utf8'))
s1 = set(ref[0])

jaccard = []
# ind 0 to 49994
for i in range(0, len(d)):
    # for d in d[i][1]:
    #     m2.update(d.encode('utf8'))
        
    s2 = set(d[i][1])
    actual_jaccard = float(len(s1.intersection(s2)))/float(len(s1.union(s2)))
    # print("Actual Jaccard for ref_article and data is", actual_jaccard)
    jaccard.append(actual_jaccard)

In [39]:
# Convert jaccard list to dataframe
jcrd = spark.createDataFrame([(l,) for l in jaccard], ['Jaccard'])
jcrd.show()

+--------------------+
|             Jaccard|
+--------------------+
| 0.06276747503566334|
| 0.07883026064844247|
|               0.075|
|  0.0585480093676815|
| 0.05901116427432217|
|               0.025|
|0.052547770700636945|
|  0.0922165820642978|
|  0.0673076923076923|
| 0.05611510791366906|
| 0.09114249037227215|
|  0.0859538784067086|
| 0.05646036916395222|
| 0.08774834437086093|
| 0.07119741100323625|
| 0.07622298065984073|
| 0.07158590308370044|
| 0.07328072153325817|
| 0.05132192846034215|
| 0.03177570093457944|
+--------------------+
only showing top 20 rows



In [42]:
data_df = final_df.withColumn("id", final_df["id"].cast(IntegerType()))

In [43]:
data_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- stop_words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [44]:
# ID and pre-processed text, indexed
data_df = data_df.rdd.zipWithIndex()
data_df = data_df.toDF()
data_df.show()

                                                                                

+--------------------+---+
|                  _1| _2|
+--------------------+---+
|{17283, [washingt...|  0|
|{17284, [bullet, ...|  1|
|{17285, [walt, di...|  2|
|{17286, [death, m...|  3|
|{17287, [seoul, s...|  4|
|{17288, [london, ...|  5|
|{17289, [beijing,...|  6|
|{17290, [danny, c...|  7|
|{17291, [hillary,...|  8|
|{17292, [angels, ...|  9|
|{17293, [donald, ...| 10|
|{17294, [thompson...| 11|
|{17295, [west, pa...| 12|
|{17296, [article,...| 13|
|{17297, [season, ...| 14|
|{17298, [finally,...| 15|
|{17300, [pages, j...| 16|
|{17301, [mumbai, ...| 17|
|{17302, [baghdad,...| 18|
|{17303, [sydney, ...| 19|
+--------------------+---+
only showing top 20 rows



In [45]:
data_df = data_df.select(data_df._1.getItem('id').alias('ID'), data_df._1.getItem('stop_words').alias('Content'), data_df._2.alias('Ind'))

In [46]:
data_df.show(3)

+-----+--------------------+---+
|   ID|             Content|Ind|
+-----+--------------------+---+
|17283|[washington, cong...|  0|
|17284|[bullet, shells, ...|  1|
|17285|[walt, disney, ba...|  2|
+-----+--------------------+---+
only showing top 3 rows



In [48]:
#User Defined Function
#New dataframe shows the 10 most similar articles to the reference article
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def add_labels(indx):
    return jaccard[indx] # indx here will start from zero

labels_udf = udf(add_labels, FloatType())

new_df = data_df.withColumn('Jaccard', labels_udf('Ind'))
new_df.show()

[Stage 39:>                                                         (0 + 1) / 1]

+-----+--------------------+---+-----------+
|   ID|             Content|Ind|    Jaccard|
+-----+--------------------+---+-----------+
|17283|[washington, cong...|  0|0.062767476|
|17284|[bullet, shells, ...|  1| 0.07883026|
|17285|[walt, disney, ba...|  2|      0.075|
|17286|[death, may, grea...|  3| 0.05854801|
|17287|[seoul, south, ko...|  4|0.059011165|
|17288|[london, queen, e...|  5|      0.025|
|17289|[beijing, preside...|  6| 0.05254777|
|17290|[danny, cahill, s...|  7| 0.09221658|
|17291|[hillary, kerr, f...|  8|0.067307696|
|17292|[angels, everywhe...|  9| 0.05611511|
|17293|[donald, j, trump...| 10| 0.09114249|
|17294|[thompsons, tex, ...| 11| 0.08595388|
|17295|[west, palm, beac...| 12| 0.05646037|
|17296|[article, part, s...| 13| 0.08774834|
|17297|[season, family, ...| 14| 0.07119741|
|17298|[finally, second,...| 15| 0.07622298|
|17300|[pages, journal, ...| 16|  0.0715859|
|17301|[mumbai, india, b...| 17| 0.07328072|
|17302|[baghdad, suicide...| 18|0.051321927|
|17303|[sy

                                                                                

In [50]:
new_df.createOrReplaceTempView("df")

In [51]:
# 10 Most Similar Articles
spark.sql("SELECT ID, Content, Jaccard \
        FROM df \
        WHERE ID != 69716 \
        ORDER BY Jaccard DESC LIMIT 10").show()



+-----+--------------------+----------+
|   ID|             Content|   Jaccard|
+-----+--------------------+----------+
|19176|[los, angeles, ho...|0.15965167|
|18716|[raining, califor...|0.14714715|
|69080|[el, ni, o, wane,...|0.13731825|
|72730|[ve, written, qui...|0.13376835|
|25667|[spate, extreme, ...|0.13364056|
|46483|[storms, pummelin...|0.13297872|
|26524|[nate, cohn, upsh...|0.13284133|
|57158|[cnn, decade, ago...|0.13219285|
|73373|[americans, conce...|0.13029316|
|62617|[cnn, re, 2, degr...|0.12547052|
+-----+--------------------+----------+



                                                                                