In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
data = spark.read.csv("Chinese_proverbs.csv", header=True).rdd
data = data.map(lambda x: (x["ID"], x["text"]))
data.take(5)

                                                                                

[('0', " If you don't do stupid things you won't end up in tragedy."),
 ('1', ' Blessings come in disguise.'),
 ('2', " If small holes aren't fixed, then big holes will bring hardship."),
 ('3', ' Water flows in only to flow out.'),
 ('4',
  " It's better to walk thousands of miles than to read thousands of books.")]

In [4]:
# histogram of frequencies of terms
#return tuples (frequency, number of terms that have that frequency)
from string import punctuation

def splitInTerms(x):
    doc_id, text = x
    text = text.translate(str.maketrans(punctuation, ' '*len(punctuation)))
    text = text.split()
    res = []
    for t in text:
        res.append((t.lower(), +1))
        
    return res

data.flatMap(lambda x: splitInTerms(x))\
    .reduceByKey(lambda l,r : l+r)\
    .map(lambda x: (x[1], +1))\
    .reduceByKey(lambda l,r : l+r)\
    .collect()

                                                                                

[(5, 10),
 (16, 2),
 (7, 4),
 (19, 1),
 (1, 368),
 (2, 72),
 (14, 1),
 (4, 16),
 (3, 26),
 (26, 1),
 (8, 2),
 (21, 1),
 (6, 6),
 (36, 1),
 (68, 1),
 (31, 1),
 (9, 4),
 (17, 1)]

#### TF-iDF

In [5]:
# For each term in the document I create a new tuple (term, doc_id, +1)
# and then distribute the count of occurrences of a term in a specific document

def splitInTerms(x):
    doc_id, text = x
    text = text.translate(str.maketrans(punctuation, ' '*len(punctuation)))
    text = text.split()
    res = []
    for t in text:
        res.append(((t.lower(), doc_id, len(text)), +1))
        
    return res

data_TF = data.flatMap(lambda x: splitInTerms(x))\
    .reduceByKey(lambda l, r: l+r)\
    .map(lambda x: (x[0][0], {"doc":x[0][1], "TF":x[1]/x[0][2]}))

data_TF.take(10)

[('if', {'doc': '0', 'TF': 0.07142857142857142}),
 ('you', {'doc': '0', 'TF': 0.14285714285714285}),
 ('don', {'doc': '0', 'TF': 0.07142857142857142}),
 ('t', {'doc': '0', 'TF': 0.14285714285714285}),
 ('do', {'doc': '0', 'TF': 0.07142857142857142}),
 ('stupid', {'doc': '0', 'TF': 0.07142857142857142}),
 ('things', {'doc': '0', 'TF': 0.07142857142857142}),
 ('won', {'doc': '0', 'TF': 0.07142857142857142}),
 ('end', {'doc': '0', 'TF': 0.07142857142857142}),
 ('up', {'doc': '0', 'TF': 0.07142857142857142})]

In [6]:
from numpy import log
numelem = data.count()

data_iDF = data_TF.map(lambda x: (x[0], +1))\
    .reduceByKey(lambda l,r: l+r)\
    .map(lambda x: (x[0], log(numelem/x[1])))
data_iDF.take(10)

[('if', 3.2347491740244907),
 ('you', 2.5416019934645457),
 ('don', 3.0524276172305362),
 ('t', 2.136136885356381),
 ('do', 4.844187086458591),
 ('stupid', 4.844187086458591),
 ('things', 4.151039905898646),
 ('won', 4.844187086458591),
 ('end', 4.151039905898646),
 ('up', 4.844187086458591)]

In [7]:
#put together TF and iDF

data_TF_iDF = data_TF.join(data_iDF)\
    .map(lambda x: (x[1][0]["doc"], {"term":x[0], "TF-iDF":(x[1][1]*x[1][0]["TF"])}))\
    .filter(lambda x: x[1]["TF-iDF"] != 0)

data_TF_iDF.take(10)

[('0', {'term': 'don', 'TF-iDF': 0.21803054408789543}),
 ('25', {'term': 'don', 'TF-iDF': 0.38155345215381703}),
 ('35', {'term': 'don', 'TF-iDF': 0.3391586241367262}),
 ('54', {'term': 'don', 'TF-iDF': 0.3591091314388866}),
 ('57', {'term': 'don', 'TF-iDF': 0.30524276172305365}),
 ('112', {'term': 'don', 'TF-iDF': 0.6104855234461073}),
 ('0', {'term': 'do', 'TF-iDF': 0.34601336331847077}),
 ('0', {'term': 'stupid', 'TF-iDF': 0.34601336331847077}),
 ('0', {'term': 'things', 'TF-iDF': 0.29650285042133184}),
 ('59', {'term': 'things', 'TF-iDF': 0.5188799882373307})]

#### cosine similarity
La formula di cosine similarity è composta da tre termini: uno a numeratore e due a denominatore. Ciascuno dei due termini a denominatore si riferisce ad un unico documento. Ho scelto prima di calcolare per ogni documento il suo termine denominatore, poi creare un dizionario distribuito {doc:'', den:''}


In [8]:
# calculate denominator for each document

df_denom = data_TF_iDF.map(lambda x: (x[0], x[1]["TF-iDF"]**2))\
    .reduceByKey(lambda l,r: l+r)\
    .map(lambda x: (x[0], x[1]**(-1/2)))

denoMap = sc.broadcast(df_denom.collectAsMap())

Occorre calcolare il termine a numeratore per ogni coppia di documenti. Per calcolarlo bisogna moltiplicare i punteggi TF-iDF di ogni termine in comune e poi calcolare la sommatoria.

In [9]:
# calculate numerator for each pair of documents
df_num = data_TF_iDF.map(lambda x: (x[1]["term"], {"doc":x[0], "TF-iDF":x[1]["TF-iDF"]}))
df_num = df_num.join(df_num)\
    .filter(lambda x: x[1][0]["doc"] < x[1][1]["doc"])\
    .map(lambda x: ((x[1][0]["doc"], x[1][1]["doc"]), x[1][0]["TF-iDF"] * x[1][1]["TF-iDF"]))\
    .reduceByKey(lambda l,r: l+r)

df_num.take(10)

[(('0', '10'), 0.039758601277546955),
 (('1', '10'), 0.13915510447141435),
 (('3', '32'), 0.13252867092515652),
 (('3', '42'), 0.19879300638773478),
 (('3', '43'), 0.08835244728343766),
 (('3', '67'), 0.100826890659524),
 (('10', '14'), 0.1346981234449684),
 (('32', '42'), 0.2319251741190239),
 (('32', '43'), 0.10307785516401062),
 (('32', '67'), 0.07136159203662275)]

In [10]:
df_cosine = df_num.map(lambda x: (x[0], x[1]*(denoMap.value.get(x[0][0])*denoMap.value.get(x[0][1]))))

In [11]:
#for each document return the five most similar to it
def get_best_n(l, r, n=5):
    x = sorted(l+r, key=lambda t: t[1], reverse=True)[0:n]
    return x

df_cosine.flatMap(lambda x: [(x[0][0], [(x[0][1], x[1])]), (x[0][1], [(x[0][0], x[1])])])\
    .reduceByKey(lambda l, r: get_best_n(l,r))\
    .take(10)

[('10',
  [('36', 0.1802748783626977),
   ('14', 0.10346474824158061),
   ('42', 0.10222955623926848),
   ('16', 0.09137078125171756),
   ('100', 0.07708970671626461)]),
 ('26',
  [('69', 0.21184053001829448),
   ('54', 0.17316665285465507),
   ('49', 0.16804707356284873),
   ('14', 0.1443547248544675),
   ('3', 0.13655043369226058)]),
 ('4',
  [('27', 0.2271708145975577),
   ('8', 0.12776085947663285),
   ('26', 0.12298829180489189),
   ('22', 0.12047882762059949),
   ('66', 0.1109930978462955)]),
 ('102',
  [('49', 0.15615371503142822),
   ('74', 0.10728452585597198),
   ('118', 0.09847890104766069),
   ('113', 0.07971450269718441),
   ('122', 0.07966025181512619)]),
 ('113',
  [('54', 0.16416265047281103),
   ('17', 0.14837381152994536),
   ('95', 0.12435533752771984),
   ('97', 0.12394984397667404),
   ('118', 0.11567813316391383)]),
 ('20',
  [('58', 0.32799135387918693),
   ('42', 0.2384297372667188),
   ('55', 0.21536551227772616),
   ('97', 0.15520335171027866),
   ('80', 0.142