We assume that a cassandra database has been already created and filled up with papers. If not please check fetch_papers.py

In [1]:
# Configuratins related to Cassandra connector, graphframes & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0,graphframes:graphframes:0.8.2-spark2.4-s_2.11 --conf spark.cassandra.connection.host=127.0.0.1 --repositories https://repos.spark-packages.org/ pyspark-shell'

In [2]:
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from gensim.parsing.preprocessing import strip_punctuation
from nltk import word_tokenize
from nltk.corpus import stopwords
import numpy as np
import itertools

## LOAD THE DATA FROM CASSANDRA IN A RDD

for tf-idf we only will use 2.5k to overcome computational issues on a laptop


In [3]:
# Creating PySpark Context
sc = SparkContext("local", "arXiv")

https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/mbc96/.ivy2/cache
The jars for the packages stored in: /home/mbc96/.ivy2/jars
:: loading settings :: url = jar:file:/mnt/c/Users/markb/Documents/0kth/data_intensive/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.datastax.spark#spark-cassandra-connector_2.11 added as a dependency
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e5b68fa2-9761-4efe-8dc5-8a2eb9973433;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.11;2.4.0 in central
	found com.twitter#jsr166e;1.1.0 in central
	found io.netty#netty-all;4.0.33.Final in central
	found commons-beanutils#commons-beanutils;1.9.3 in central
	found commons-collections#commons-collections;3.2.2 in central
	found joda-time#joda-time;2.3 in central
	found org.joda#joda-convert;1.2 in central
	found

In [4]:
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()

In [5]:
rdd = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="papers", keyspace="papers_space").load().limit(2500).rdd

In [6]:
number_docs = rdd.count()

                                                                                

In [7]:
stop_words = stopwords.words('english')
def process(pid, title, abstract):
    text = title + ' ' + abstract
    doc = word_tokenize(strip_punctuation(text).lower())
    doc = [w for w in doc if w not in stop_words]
    return [((pid, w), 1) for w in doc]

In [8]:
map1 = rdd.flatMap(lambda x: process(x.pid, x.title, x.abstract))
tf = map1.reduceByKey(lambda x,y: x + y) # number of times that a word appear in a given document

## IDF

In [9]:
words = tf.map(lambda x: (x[0][1], 1)) 
frequency = words.reduceByKey(lambda x,y: x + y) # num of docs that contains the word
# let's remove the number of words smaller than 2
frequency = frequency.filter(lambda x: x[1] >= 2)
freq = frequency.map(lambda x: x[1])


In [10]:
# total number of uniques words
freq.count()

                                                                                

9545

In [10]:
idf = frequency.map(lambda x: (x[0], np.log(number_docs/x[1])))

## tf-idf vector 

In [11]:
re_tf = tf.map(lambda x: (x[0][1], (x[0][0], x[1]))) # we need only the word as a key to easily join the tf with idf

tfidf_word = re_tf.join(idf)
tfidf = tfidf_word.map(lambda x: (x[1][0][0], (x[0], x[1][0][1]*x[1][1])))

l2norm = tfidf.map(lambda x: (x[0], x[1][1]))
l2norm = l2norm.reduceByKey(lambda x, y: np.sqrt(x**2+y**2))

# the tf-idf value for each word normalized
tfidf_norm = tfidf.join(l2norm)
tfidf_norm = tfidf_norm.map(lambda x: (x[0], (x[1][0][0], x[1][0][1]/x[1][1])))

In [12]:
# the tf-idf value for each word normalized
tfidf_norm = tfidf.join(l2norm)
tfidf_norm = tfidf_norm.map(lambda x: (x[0], (x[1][0][0], x[1][0][1]/x[1][1])))

In [13]:
%%time
#this is the normalized tfidf for all the documents
tfidf_norm.take(5)



CPU times: user 35.8 ms, sys: 20.9 ms, total: 56.8 ms
Wall time: 20.3 s


                                                                                

[('2107.05727', ('image', 0.04399861543747763)),
 ('2107.05727', ('step', 0.06269436493223962)),
 ('2107.05727', ('defined', 0.07329196078327006)),
 ('2107.05727', ('resulting', 0.06714407108767272)),
 ('2107.05727', ('based', 0.01712151704858696))]

# Calculate the matrix for all the scores for each paper

This calcul is tedious, it requires to multiply the matrix X = D x W, X * X^T, being W the unique words and D the document set. Due to this is executed in a single laptop, as a demo purposes to try spark, it has been reduced the dimensionality of documents. In a real world scneario spark would be able to make use of multiple resources to scale the task.

In [15]:
# get the transpose of tfidf, that is, using word as a key.
word_key = tfidf_norm.map(lambda x: (x[1][0], (x[0], x[1][1])))
word_key.take(2)

[Stage 19:>                                                         (0 + 1) / 1]                                                                                

[('image', ('2107.05727', 0.04399861543747763)),
 ('step', ('2107.05727', 0.06269436493223962))]

We implement the matrix multiplication in spark, making use of MapReduce. Due to X * X^T is a symmetric matrix, we only need to make half of the calculations

In [16]:
def product(z):
    my_sorted_list = sorted(z[1], key=lambda x: x[1])
    combinations = itertools.combinations(z[1], 2)
    return [((x[0],y[0]), x[1]*y[1]) for x,y in combinations]
    

first_stage = word_key.groupByKey()
multi = first_stage.flatMap(product)
final = multi.reduceByKey(lambda x,y: x+y)

final RDD contains the similarity score for each paper pair in the corpus, having used n=2500 papers it contains a     (n * n-1)/2 pairs, which is over 3 millions pairs. Due to most of this papers are not relevant given a paper, and we are only interested in the most relevant ones, we do not need to store the full matrix. Instead we perform filtering to delete all the paper pairs which are below 0.1 similarity (assuming low similarity being no related). This filering reduces the amount of paper pairs to 28k

In [39]:
final.map(lambda x: x[1]).mean()

0.017161328128935685

In [36]:
%%time
final_relevant = final.filter(lambda x: x[1] >= 0.1)
final_relevant.count()



CPU times: user 32.7 ms, sys: 13.4 ms, total: 46 ms
Wall time: 28.9 s


                                                                                

27976

In [21]:
%%time
df_result = final_relevant.map(lambda x: (x[0][0], x[0][1], float(x[1]))).toDF(['doc1', 'doc2', 'score'])
df_result = df_result.orderBy(desc('score'))

                                                                                

CPU times: user 75.4 ms, sys: 10.2 ms, total: 85.6 ms
Wall time: 9.32 s


In [37]:
%%time
df_result.registerTempTable("df_result")
spark.sql("SELECT max(score), min(score), avg(score) FROM df_result").show()



+------------------+-------------------+------------------+
|        max(score)|         min(score)|        avg(score)|
+------------------+-------------------+------------------+
|0.8553202975106373|0.10000189920035375|0.1490131027699097|
+------------------+-------------------+------------------+

CPU times: user 37.2 ms, sys: 33.1 ms, total: 70.3 ms
Wall time: 2.7 s




Finally, we store the result in a cassandra table and a spark dataframe

In [22]:
%%time
df_result.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="similarity_table", keyspace="papers_space").save()



CPU times: user 199 ms, sys: 112 ms, total: 310 ms
Wall time: 1min 27s




In [35]:
%%time
df_result.write.mode("append").save('data_similarity')

                                                                                

CPU times: user 300 ms, sys: 195 ms, total: 495 ms
Wall time: 1min 29s


In [23]:
%%time
df_result.show(5)

                                                                                

+----------+----------+------------------+
|      doc1|      doc2|             score|
+----------+----------+------------------+
|2106.09770|2109.03663|0.8553202975106373|
|2103.02354|2101.01292|0.6431122622671059|
|1908.06957|2110.01899|0.6427000822557831|
|2106.04300|2004.01935| 0.638596119162532|
|2109.13797|2105.05609|0.6080651106648617|
+----------+----------+------------------+
only showing top 5 rows

CPU times: user 13.1 ms, sys: 29.9 ms, total: 43 ms
Wall time: 1min 12s


In [33]:
%%time
doc = '2106.09770'
df_result.filter((f'doc1 == {doc} or doc2=={doc}')).show(5)

+----------+----------+-------------------+
|      doc1|      doc2|              score|
+----------+----------+-------------------+
|2106.09770|2109.03663| 0.8553202975106373|
|2106.09770|2110.07947| 0.4145166502739236|
|2106.09770|2107.07925|0.41123931371500433|
|2106.09770|2110.04073| 0.3951389001824246|
|2103.00534|2106.09770| 0.3650950311347457|
+----------+----------+-------------------+
only showing top 5 rows

CPU times: user 439 µs, sys: 11.2 ms, total: 11.7 ms
Wall time: 189 ms


Document comparision one by one. We also provide the code to calculate the tf-idf one paper at the time. (this code is unfeasible to use in the complete collection)

In [26]:
# given a document d
d = '2106.09770'

rdd_d = word_key.filter(lambda x: x[1][0] == d)
rest = word_key.filter(lambda x: x[1][0] != d)

In [29]:
%%time

#having only a small RDD (only one paper), using broadcastinc is faster than a normal join

def joining(k,v):
    value = smallLookup.value.get(k)
    value = (0, 0) if value is None else value
    return (k, (v, value))

smallLookup = sc.broadcast(rdd_d.collectAsMap())

our_join = rest.map(lambda x: joining(x[0], x[1]))
our_join = our_join.filter(lambda x: x[1][1] is not None)

#our_join = rest.join(rdd_d)

value_d = our_join.map(lambda x: (x[1][0][0], x[1][0][1]*x[1][1][1]))

value_d.reduceByKey(lambda x, y: x+y).takeOrdered(5, lambda x: -x[1])



CPU times: user 64.1 ms, sys: 35.3 ms, total: 99.4 ms
Wall time: 6.04 s


                                                                                

[('2109.03663', 0.8553202975106372),
 ('2110.07947', 0.4145166502739237),
 ('2107.07925', 0.4112393137150044),
 ('2110.04073', 0.3951389001824246),
 ('2103.00534', 0.3650950311347456)]

# Co-authorship analysis using graph

We use the library for spark DataFrames, supported by Databricks, which is inspired in GraphX and, diferenly of GraphX, provides a Python API

In [None]:
from graphframes import *
from graphframes.lib import AggregateMessages as AM

For graph analysis we use the complete amount of papers

In [None]:
df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="papers", keyspace="papers_space").load().rdd

Due to the inability of defining undirected weighted graph, as it is a co-authorship graph, we replicate the edges as much as need it and are repeated in both direction. That gives the necessity of dividing by two the number of edges to get some statistics

In [None]:
rdd_edges = rdd.flatMap(lambda a: [x for x in itertools.product(a.authors,a.authors) if x[0] != x[1]])
rdd_vertices = rdd.flatMap(lambda x: x.authors).distinct().map(lambda x: (x, x))

edges = sqlContext.createDataFrame(rdd_edges, ['src', 'dst'])
vertices = sqlContext.createDataFrame(rdd_vertices, ['id', 'name'])

In [None]:
graph = GraphFrame(vertices, edges)
vertices.cache()
edges.cache()

## Basic statistics to get insides

Number of unics authors

In [27]:
graph.vertices.distinct().count()

                                                                                

106382

Average of co-authors per author. That is, when an author write a paper it collaborates with a certain number of authors

In [19]:
avg_coauhtors = (graph.edges.count()/2) / graph.vertices.count()
avg_coauhtors

                                                                                

5.03733714350172

inDegree represent a collaboration between authors. Note that in inDegree it doesn't show in the stats the vertices with no edges

In [13]:
%%time
graph.inDegrees.registerTempTable("indegrees")
spark.sql("SELECT max(inDegree), avg(inDegree) FROM indegrees").show()



+-------------+-------------+------------------+
|max(inDegree)|min(inDegree)|     avg(inDegree)|
+-------------+-------------+------------------+
|          732|            1|10.221683897302865|
+-------------+-------------+------------------+

CPU times: user 90.5 ms, sys: 49.5 ms, total: 140 ms
Wall time: 16.2 s


                                                                                

Number of authors with any collaboration

In [33]:
graph.vertices.count() - agg_new.agg({"id":"count"}).collect()[0]['count(id)']

1530

Avarage number of the unique co-authors that each author has

In [62]:
sum_coauthors = agg_new.agg({"unique_coauthors":"sum"}).collect()[0]['sum(unique_coauthors)']
unique_coauthors = (sum_coauthors/2) / graph.vertices.count()
unique_coauthors

4.45911902389502

Average of the times an author collaborates with them co-authors

In [63]:
avg_coauhtors / unique_coauthors

1.1296709319729326

Pagerank calculations of the co-authors graph. PageRank shows us the most relavant auhtors based on its collaborations

In [None]:
# uncomment to read pagerank from cassandra instead
# pagerank_df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="all_pagerank_table", keyspace="papers_space").load()

In [72]:
%%time 
pagerank_df = graph.pageRank(resetProbability=0.15, maxIter=10)
pagerank_df.vertices.select("id", "pagerank").show()



+--------------------+------------------+
|                  id|          pagerank|
+--------------------+------------------+
|         Alois Knoll| 5.286543865009098|
|      Amaury Habrard|1.9593054937387528|
|        Weiqing Wang|1.4275080607067454|
|          Boyuan Liu|1.3161656619492115|
|         Rasha Faqeh|0.9879530695268659|
|    Michael Kaminski|0.8614367662782962|
|  Alfred O. Hero III|1.0298469794062282|
|      Yofti Milkessa|0.6539744868031192|
|      Mattias Marder|0.8444698579560116|
|        Sadik Bessou|1.0123761080685005|
|        Camile Sothe|0.9565246366559708|
|   Sanjeev J. Koppal|1.0123761080685005|
|        Shubham Negi|0.5548129013488963|
|    Artem Vysogorets|1.0123761080685005|
|      Benjamin Ampel|1.0123761080685005|
|        Nitish Kumar|0.6900420997385767|
|Vasileios Mavroeidis|1.0123761080685005|
|       Austen Z. Fan|1.0123761080685005|
|     Shuvalaxmi Dass|0.6580036970356548|
|       Masato Kimura|1.0123761080685005|
+--------------------+------------

                                                                                

In [73]:
pagerank_df.registerTempTable("pagerank_table")
spark.sql("SELECT max(pagerank), min(pagerank), avg(pagerank) FROM pagerank_table").show()



+-------------+-------------+------------------+
|max(pagerank)|min(pagerank)|     avg(pagerank)|
+-------------+-------------+------------------+
|    37.952694|   0.15185642|0.9999999932396952|
+-------------+-------------+------------------+



                                                                                

In [74]:
pagerank_df.orderBy(desc('pagerank')).show(10)



+---------+---------+
|       id| pagerank|
+---------+---------+
| Yang Liu|37.952694|
| Wei Wang| 30.71963|
| Xin Wang| 22.62668|
| Hao Wang|20.407763|
| Jun Wang|20.171339|
|    Bo Li|20.043365|
| Jie Zhou|19.897903|
|Wei Zhang| 18.64152|
|  Yang Li|17.837828|
|Lei Zhang|17.259756|
+---------+---------+
only showing top 10 rows



                                                                                

In [None]:
# saving pageranks results
pagerank_df.vertices.select("id", "pagerank").write.format("org.apache.spark.sql.cassandra").mode('append').options(table="all_pagerank_table", keyspace="papers_space").save()

## Community detection (Label propagation)

Community detection algorithms aim to discover and organize the subgrups of vertices in the graph that are more frequently connectet between them. GraphFrames provides the label propagation algorithm to identify those communities


In [82]:
%%time
communities = graph.labelPropagation(maxIter=5)
communities.select("id", "label").show()



+--------------------+-------------+
|                  id|        label|
+--------------------+-------------+
|         Alois Knoll|1649267442202|
|      Amaury Habrard| 584115552585|
|        Weiqing Wang| 901943132343|
|          Boyuan Liu|1176821039283|
|         Rasha Faqeh|1649267441741|
|    Michael Kaminski|1116691497170|
|  Alfred O. Hero III|1176821039374|
|      Yofti Milkessa|1228360646932|
|      Mattias Marder|1563368096108|
|        Sadik Bessou|1546188226720|
|        Camile Sothe| 257698037846|
|   Sanjeev J. Koppal|1194000908358|
|        Shubham Negi| 188978561025|
|    Artem Vysogorets| 188978561260|
|      Benjamin Ampel|  68719476798|
|        Nitish Kumar| 858993459313|
|Vasileios Mavroeidis|  68719477201|
|       Austen Z. Fan| 137438953705|
|     Shuvalaxmi Dass| 884763263280|
|       Masato Kimura| 627065225436|
+--------------------+-------------+
only showing top 20 rows

CPU times: user 5.51 s, sys: 3.1 s, total: 8.61 s
Wall time: 29min 20s


                                                                                

In [None]:
#save to cassandra
communities.select("id", "label").write.format("org.apache.spark.sql.cassandra").mode('append').options(table="community_table", keyspace="papers_space").save()

In [None]:
# if we want to read from cassandra instead of re calculating
communities = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="community_table", keyspace="papers_space").load()

There is a total of different 2756 communities. For example, Yang Liu, the author with highest PageRank, pertains in the 99 group wich has 159 members

In [86]:
communities.select('label').distinct().count()

                                                                                

27656

In [32]:
communities.filter(("id=='Yang Liu'")).show()

+--------+-----+
|      id|label|
+--------+-----+
|Yang Liu|   99|
+--------+-----+



In [33]:
communities.filter(("label==99"))communities.select("id", "label").write.format("org.apache.spark.sql.cassandra").mode('append').options(table="community_table", keyspace="papers_space").save().count()

                                                                                

159

In [9]:
communities.filter(("label==99")).show()

+--------------------+-----+
|                  id|label|
+--------------------+-----+
|   Melanie Bancilhon|   99|
|           Yuming Ba|   99|
|      Jana de Wiljes|   99|
|          Bert Zwart|   99|
|     Rui Pedro Paiva|   99|
|  S. M. Kamrul Hasan|   99|
|    Stefano Sabatini|   99|
|         Kenji Sagae|   99|
|   Isabel Beckenbach|   99|
|Adriano L. I. Oli...|   99|
| Luis Muñoz-González|   99|
|       Julia Pritzen|   99|
|        Annie Polish|   99|
|   Venera Arnaoudova|   99|
|         Weikai Miao|   99|
|     Du Nguyen Duong|   99|
|      Bruno M. Rocha|   99|
|          Dan Pelleg|   99|
|       Kemele Endris|   99|
|  Andrew Lappalainen|   99|
+--------------------+-----+
only showing top 20 rows

