In [2]:
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

findspark.init()

def demarrer_spark():
    local = "local[*]"
    appName = "dataframeJson"
    config = SparkConf().setAppName(appName).setMaster(local).\
    set("spark.executor.memory", "1G").\
    set("spark.driver.memory", "1G").\
    set("spark.sql.catalogImplementation", "in-memory")
    
    spark = SparkSession.builder.config(conf = config).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
    
    spark.conf.set("spark.sql.shuffle.partitions", "4")
    return spark

spark = demarrer_spark()
print("running")

running


### Simplified TF*IDF

In [5]:
dataset = ['d1,one fish two fish', 'd2,red fish blue', 'd3,one red bird']
data = spark.sparkContext.parallelize(dataset)

lines = data.map(lambda x : x.split(",")).map(lambda x : (x[0], x[1].split(" ")))

lines.take(10)

[('d1', ['one', 'fish', 'two', 'fish']),
 ('d2', ['red', 'fish', 'blue']),
 ('d3', ['one', 'red', 'bird'])]

In [7]:
count = data.count()
count

3

In [8]:
def combine(list, word):
    return [(word, x) for x in list]

In [9]:
list = ['d3', 'one', 'red', 'bird']
word = 'd1'
combine(list, word)

[('d1', 'd3'), ('d1', 'one'), ('d1', 'red'), ('d1', 'bird')]

Starting from `lines` produce an RDD of all pairs

In [12]:
word_doc = lines.flatMapValues(lambda x : x)
word_doc.take(10)

[('d1', 'one'),
 ('d1', 'fish'),
 ('d1', 'two'),
 ('d1', 'fish'),
 ('d2', 'red'),
 ('d2', 'fish'),
 ('d2', 'blue'),
 ('d3', 'one'),
 ('d3', 'red'),
 ('d3', 'bird')]

For each pair (document, word), count its frequency

In [16]:
freq_word = word_doc.map(lambda x : (x, 1)).reduceByKey(lambda a, b : a+b)

freq_word.take(10)

[(('d1', 'one'), 1),
 (('d1', 'two'), 1),
 (('d2', 'fish'), 1),
 (('d3', 'one'), 1),
 (('d2', 'red'), 1),
 (('d2', 'blue'), 1),
 (('d3', 'red'), 1),
 (('d1', 'fish'), 2),
 (('d3', 'bird'), 1)]

For each word, produce a pair (d, freq)

In [17]:
freq_word_bis = freq_word.map(lambda x : (x[0][1], (x[0][0], x[1])))
freq_word_bis.take(10)

[('one', ('d1', 1)),
 ('two', ('d1', 1)),
 ('fish', ('d2', 1)),
 ('one', ('d3', 1)),
 ('red', ('d2', 1)),
 ('blue', ('d2', 1)),
 ('red', ('d3', 1)),
 ('fish', ('d1', 2)),
 ('bird', ('d3', 1))]

For each word, compute its document frequency, i.e # of documents in which it appears

In [25]:
doc_freq = freq_word_bis.map(lambda x : (x[0], 1)).reduceByKey(lambda a,b : a+b).map(lambda x : (x[0], 3.0/x[1]))
doc_freq.take(10)

[('two', 3.0),
 ('bird', 3.0),
 ('blue', 3.0),
 ('fish', 1.5),
 ('red', 1.5),
 ('one', 1.5)]

For each word, compute its simplified score

In [28]:
final = word_doc.map(lambda x : (x[1], x[0])).join(doc_freq).map(lambda x : (x[0], x[1][0], x[1][1]))
final.take(10)



[('two', 'd1', 3.0),
 ('bird', 'd3', 3.0),
 ('blue', 'd2', 3.0),
 ('fish', 'd1', 1.5),
 ('fish', 'd1', 1.5),
 ('fish', 'd2', 1.5),
 ('red', 'd2', 1.5),
 ('red', 'd3', 1.5),
 ('one', 'd1', 1.5),
 ('one', 'd3', 1.5)]

### Text Analytics

The goal is to compute the SPMI (simplified pointwise mutual information) metric for words in a corpus.

In [29]:
def removespecialchars(word):
    charstoremove = ['[',']','*','#','.','_',':','?','!',',',';','“','”','\n']
    for ch in word:
        if ch in charstoremove:
            word = word.replace(ch, '')
    return word

In [30]:
dataset = ''

In [33]:
data = spark.sparkContext.textFile("file:///home/maxime/smallwordcount.txt", use_unicode="False")
print(f"before filtering there are {data.count()} lines")
lines = data.filter(lambda x : x != '') \
            .map(lambda x : removespecialchars(x)) \
            .map(lambda x : x.split(" "))
print(f"after filtering there are {lines.count()} lines")

before filtering there are 14594 lines
after filtering there are 12153 lines


Retrieve the list of words

In [41]:
words = lines.flatMap(lambda x : x).filter(lambda w : len(w) > 0)
words.take(10)

['The',
 'Project',
 'Gutenberg',
 'EBook',
 'of',
 'Pride',
 'and',
 'Prejudice',
 'by',
 'Jane']

Compute the frequency of each word

In [42]:
wordcount = words.map(lambda x : (x, 1)).reduceByKey(lambda a, b : a+b)
wordcount.take(10)

[('The', 285),
 ('Project', 83),
 ('EBook', 3),
 ('of', 3693),
 ('Pride', 6),
 ('Jane', 263),
 ('Austen', 4),
 ('is', 859),
 ('use', 23),
 ('anyone', 29)]

Count the number of distinct words

In [43]:
nb_distinct_word = words.distinct().count()
nb_distinct_word

7761

Compute the proba of each word

In [44]:
word_proba = wordcount.map(lambda x : (x[0], x[1]/nb_distinct_word))
word_proba.take(10)

[('The', 0.03672207189795129),
 ('Project', 0.010694498131684061),
 ('EBook', 0.00038654812524159255),
 ('of', 0.47584074217240047),
 ('Pride', 0.0007730962504831851),
 ('Jane', 0.033887385646179616),
 ('Austen', 0.0005153975003221235),
 ('is', 0.110681613194176),
 ('use', 0.0029635356268522097),
 ('anyone', 0.003736631877335395)]

Collocation of words : build an RDD of bigrams. A bigram is a pair of word appearing in consecutive order

In [55]:
word_index = words.zipWithIndex().map(lambda x : (x[1], x[0]))
word_index.take(10)

[(0, 'The'),
 (1, 'Project'),
 (2, 'Gutenberg'),
 (3, 'EBook'),
 (4, 'of'),
 (5, 'Pride'),
 (6, 'and'),
 (7, 'Prejudice'),
 (8, 'by'),
 (9, 'Jane')]

In [56]:
word_index_bis = words.zipWithIndex().map(lambda x : (x[1]-1, x[0]))
word_index_bis.take(10)

[(-1, 'The'),
 (0, 'Project'),
 (1, 'Gutenberg'),
 (2, 'EBook'),
 (3, 'of'),
 (4, 'Pride'),
 (5, 'and'),
 (6, 'Prejudice'),
 (7, 'by'),
 (8, 'Jane')]

In [58]:
bigram = word_index.join(word_index_bis).map(lambda x : x[1])
bigram.take(10)



[('The', 'Project'),
 ('of', 'Pride'),
 ('by', 'Jane'),
 ('eBook', 'is'),
 ('use', 'of'),
 ('at', 'no'),
 ('with', 'almost'),
 ('whatsoever', 'You'),
 ('it', 'give'),
 ('or', 're-use')]

Count the frequency of each bigram

In [74]:
bigramcount = bigram.map(lambda x : (x, 1)).reduceByKey(lambda a, b : a+b)
bigramcount.take(10)



[(('use', 'of'), 8),
 (('at', 'no'), 6),
 (('the', 'terms'), 14),
 (('12', '2019'), 1),
 (('PREJUDICE', 'Produced'), 1),
 (('Volunteers', 'and'), 3),
 (('and', 'Prejudice'), 4),
 (('Chapter', '4'), 2),
 (('Chapter', '10'), 2),
 (('Chapter', '12'), 2)]

Attach to each bigram the probability of its words, i.e $$P(a)*P(b)$$

In [76]:
bigramproba = bigramcount.map(lambda x : x[0]).join(word_proba).map(lambda x : (x[1][0], (x[0], x[1][1]))) \
                         .join(word_proba).map(lambda x : ((x[1][0][0], x[0]), x[1][0][1]*x[1][1]))
bigramproba.take(10)

[(('of', 'Pride'), 0.0003678706936006188),
 (('Title', 'Pride'), 9.961296875185996e-08),
 (('vain', 'Pride'), 1.7930334375334792e-06),
 (('mine', 'Pride'), 1.4941945312778994e-06),
 (('cover', 'Pride'), 1.9922593750371993e-07),
 (('of', 'them'), 0.026609313503778098),
 (('like', 'them'), 0.0005476056935518915),
 (('seen', 'them'), 0.0005404003554788403),
 (('before', 'them'), 0.0016139957283634696),
 (('make', 'them'), 0.0012104967962726025)]

Count the number of distinct bigrams

In [72]:
bigram_distinct = bigram.distinct().count()
bigram_distinct

59099

Attach to each bigram its probability, i.e. $$P(ab)$$

In [75]:
bigramjointproba = bigramcount.map(lambda x : (x[0], x[1]/bigram_distinct))
bigramjointproba.take(10)

[(('use', 'of'), 0.00013536608064434254),
 (('at', 'no'), 0.0001015245604832569),
 (('the', 'terms'), 0.00023689064112759946),
 (('12', '2019'), 1.6920760080542818e-05),
 (('PREJUDICE', 'Produced'), 1.6920760080542818e-05),
 (('Volunteers', 'and'), 5.076228024162845e-05),
 (('and', 'Prejudice'), 6.768304032217127e-05),
 (('Chapter', '4'), 3.3841520161085635e-05),
 (('Chapter', '10'), 3.3841520161085635e-05),
 (('Chapter', '12'), 3.3841520161085635e-05)]

Compute the SPMI $$SPMI(a,b) = P(ab)/P(a)*P(b)$$ and sort in descending order on the score

In [79]:
SPMI = bigramjointproba.join(bigramproba).map(lambda x : (x[0], x[1][0]/x[1][1])).sortBy(lambda x : x[1], False)
SPMI.take(10)

                                                                                

[(('staff', 'Please'), 1019.1901893433051),
 (('he—poor', 'Eliza—to'), 1019.1901893433051),
 (('net', 'purses'), 1019.1901893433051),
 (('Italian', 'songs'), 1019.1901893433051),
 (('barbarously', 'misused'), 1019.1901893433051),
 (('weighty', 'accusation'), 1019.1901893433051),
 (('vicious', 'propensities—the'), 1019.1901893433051),
 (('untamed', 'unabashed'), 1019.1901893433051),
 (('City', 'UT'), 1019.1901893433051),
 (('Kenilworth', 'Birmingham'), 1019.1901893433051)]

### Matrix multiplication

Write a program to perform matric multiplication using the spark RDD operators. The input matrices are represented with their coordinates : each matrix is an RDD of tuples `(i,j,val)` where `i` is the line number, `j` is the column number and `val` the value. The input matrices are provided (`matM` and `matN`). THe output matrix is also represented with its coordinates :

In [81]:
M = [(1,1,1),(1,2,2),(1,3,3),(2,1,2),(2,2,5),(2,3,7)]
N = [(1,1,2),(1,2,4),(1,3,8),(2,1,1),(2,2,5),(2,3,10),(3,1,3),(3,2,6),(3,3,9)]

matM = spark.sparkContext.parallelize(M)
matN = spark.sparkContext.parallelize(N)

In [83]:
matMElem = matM.map(lambda x : (x[1], (x[0], x[2])))
matNElem = matN.map(lambda x : (x[0], (x[1], x[2])))
pairwiseProd = matMElem.join(matNElem)
product = pairwiseProd.map(lambda x : ((x[1][0][0], x[1][1][0]), x[1][0][1]*x[1][1][1])) \
                      .reduceByKey(lambda a, b: a+b)
product.collect()

                                                                                

[((1, 1), 13),
 ((2, 3), 129),
 ((1, 2), 32),
 ((2, 2), 75),
 ((1, 3), 55),
 ((2, 1), 30)]