In [1]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext

In [2]:
spark = SparkSession.builder.appName("t1").getOrCreate()
sc = SparkContext.getOrCreate()

In [3]:
articles = spark.read.format('csv').options(header='true',inferschema='true').load("C:/Users/poona/Downloads/articles1.csv/articles1.csv")
articles.createOrReplaceTempView("articles")
articles.count()

50004

In [4]:
articles = articles.withColumn("content", lower(articles["content"]))
articles.createOrReplaceTempView("articles")

In [5]:
articles = articles.withColumn("content", regexp_replace(col("content"), "[^a-zA-Z0-9]", " "))
articles.createOrReplaceTempView("articles")

In [6]:
query = "SELECT * FROM articles WHERE id=69716" #Testing to make sure the amounts from zero and fail add up
ref = spark.sql(query)
ref.createOrReplaceTempView("ref")
ref.count()

1

In [7]:
ref.show()

+-----+-----+--------------------+----------------+------+----------+------+-----+----+--------------------+
|  _c0|   id|               title|     publication|author|      date|  year|month| url|             content|
+-----+-----+--------------------+----------------+------+----------+------+-----+----+--------------------+
|50097|69716|California lifted...|Business Insider|  null|2016-05-21|2016.0|  5.0|null|    on wednesday ...|
+-----+-----+--------------------+----------------+------+----------+------+-----+----+--------------------+



In [8]:
query = "SELECT * FROM articles WHERE id NOT IN (SELECT id FROM ref)" #Testing to make sure the amounts from zero and fail add up
noref = spark.sql(query)
noref.createOrReplaceTempView("noref")
noref.count()

50003

In [9]:
rlist = ref.collect()[0][9]

In [10]:
from pyspark.sql.functions import lit
noref=noref.withColumn("contr", lit(rlist))

In [11]:
split_weights = [1.0] * 6 #Memory became an issue so splitting up data into parts
splits = noref.randomSplit(split_weights)
splits

[DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string, month: string, url: string, content: string, contr: string],
 DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string, month: string, url: string, content: string, contr: string],
 DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string, month: string, url: string, content: string, contr: string],
 DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string, month: string, url: string, content: string, contr: string],
 DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string, month: string, url: string, content: string, contr: string],
 DataFrame[_c0: string, id: string, title: string, publication: string, author: string, date: string, year: string,

In [12]:
from datasketch import MinHash

In [13]:
def mh(y,x):
    m1, m2 = MinHash(), MinHash()
    for i in x.split(" "):
        m1.update(i.encode('utf8'))
    if y is None: 
        return 0
    else:
        for j in y.split(" "):
            m2.update(j.encode('utf8'))
        return m1.jaccard(m2)

In [14]:
mh_udf = udf(lambda x,y: mh(x,y))

In [15]:
splits[0]=splits[0].withColumn("dist", mh_udf('content','contr'))
splits[0].createOrReplaceTempView("s1")
splits[1]=splits[1].withColumn("dist", mh_udf('content','contr'))
splits[1].createOrReplaceTempView("s2")
splits[2]=splits[2].withColumn("dist", mh_udf('content','contr'))
splits[2].createOrReplaceTempView("s3")
splits[3]=splits[3].withColumn("dist", mh_udf('content','contr'))
splits[3].createOrReplaceTempView("s4")
splits[4]=splits[4].withColumn("dist", mh_udf('content','contr'))
splits[4].createOrReplaceTempView("s5")
splits[5]=splits[5].withColumn("dist", mh_udf('content','contr'))
splits[5].createOrReplaceTempView("s6")

In [16]:
query = "SELECT id,dist FROM s1 ORDER BY dist DESC LIMIT 10"
F1 = spark.sql(query)
F1.createOrReplaceTempView("F1")
F1.count()

10

In [17]:
query = "SELECT id,dist FROM s2 ORDER BY dist DESC LIMIT 10"
F2 = spark.sql(query)
F2.createOrReplaceTempView("F2")
F2.count()

10

In [18]:
query = "SELECT id,dist FROM s3 ORDER BY dist DESC LIMIT 10"
F3 = spark.sql(query)
F3.createOrReplaceTempView("F3")
F3.count()

10

In [19]:
query = "SELECT id,dist FROM s4 ORDER BY dist DESC LIMIT 10"
F4 = spark.sql(query)
F4.createOrReplaceTempView("F4")
F4.count()

10

In [20]:
query = "SELECT id,dist FROM s5 ORDER BY dist DESC LIMIT 10"
F5 = spark.sql(query)
F5.createOrReplaceTempView("F5")
F5.count()

10

In [21]:
query = "SELECT id,dist FROM s6 ORDER BY dist DESC LIMIT 10"
F6 = spark.sql(query)
F6.createOrReplaceTempView("F6")
F6.count()

10

In [22]:
G1=F1.union(F2)

In [23]:
G2=F3.union(F4)

In [24]:
G3=F5.union(F6)

In [25]:
G4=G1.union(G2)

In [26]:
G5=G3.union(G4)
G5.createOrReplaceTempView("G5")
G5.count()

60

In [27]:
query = "SELECT id,dist FROM G5 ORDER BY dist DESC LIMIT 10"
G6 = spark.sql(query)
G6.createOrReplaceTempView("G6")
G6.count()

10

In [28]:
G6.show()

+-----+---------+
|   id|     dist|
+-----+---------+
|52787|  0.21875|
|72676|  0.21875|
|72796|  0.21875|
|66574|0.2109375|
|39873|0.2109375|
|22240|0.2109375|
|44695| 0.203125|
|22491| 0.203125|
|43661| 0.203125|
|69968| 0.203125|
+-----+---------+

