In [1]:
!pip3 install pyspark
!pip3 install graphframes

Collecting pyspark
Collecting py4j==0.10.7 (from pyspark)
  Using cached https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4
Collecting graphframes
  Using cached https://files.pythonhosted.org/packages/0b/27/c7c7e1ced2fe9a905f865dd91faaec2ac8a8e313f511678c8ec92a41a153/graphframes-0.6-py2.py3-none-any.whl
Collecting nose (from graphframes)
  Using cached https://files.pythonhosted.org/packages/15/d8/dd071918c040f50fa1cf80da16423af51ff8ce4a0f2399b7bf8de45ac3d9/nose-1.3.7-py3-none-any.whl
Collecting numpy (from graphframes)
  Using cached https://files.pythonhosted.org/packages/d2/ab/43e678759326f728de861edbef34b8e2ad1b1490505f20e0d1f0716c3bf4/numpy-1.17.4-cp36-cp36m-manylinux1_x86_64.whl
Installing collected packages: nose, numpy, graphframes
Successfully installed graphframes-0.6 nose-1.3.7 numpy-1.17.4


In [2]:
import pyspark as ps
import os
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 pyspark-shell"
)

### Usefull functions

In [3]:
def init_spark(app_name, master_config):
    """
    :params app_name: Name of the app
    :params master_config: eg. local[4]
    :returns SparkContext, SQLContext, SparkSession:
    """
    conf = (ps.SparkConf().setAppName(app_name).setMaster(master_config))

    sc = ps.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    sql_ctx = SQLContext(sc)
    spark = SparkSession(sc)

    return (sc, sql_ctx, spark)

In [4]:
@udf(returnType=BooleanType())
def check_map(dist_map):
    return bool(dist_map)

In [5]:
def get_id_by_title(title, vertices):
    return vertices.filter(vertices['title']==title).head()[0]

### Queries

In [6]:
def first_query(g, year, art_id):
    sp = g.shortestPaths(landmarks=[art_id])
    results = sp.filter(sp['year']==year)
    results = results.withColumn("isLinked", check_map("distances"))
    results = results.filter(results['isLinked'])
    results = results.drop(*['distances','isLinked'])
    return results

In [7]:
def second_query(g, art_id):
    sp = g.shortestPaths(landmarks=[art_id])
    sp = sp.withColumn("isLinked", check_map("distances"))
    popular_by_year = sp.filter(sp['isLinked']).groupBy("year").count()
    return popular_by_year.orderBy(popular_by_year['count'].desc())

In [8]:
def third_query(g, rp=0.15, mi=10):
    query_result = g.pageRank(resetProbability=rp, maxIter=mi).vertices
    return query_result.orderBy(query_result['pagerank'].desc())

In [9]:
def lab_prop(g, mi=10):
    communities = g.labelPropagation(maxIter=10)
    com = communities.groupBy('label').count()
    return com.orderBy(com['count'].desc()), communities

### Create spark env

In [10]:
sc, sql_ctx, spark = init_spark("App_name", "local[*]")

## Work with small dataset
### Reading

In [11]:
path_to_vertices = "/home/kostin_001/citation-data/small/v.txt"
path_to_edges = "/home/kostin_001/citation-data/small/e.txt"

In [12]:
vertexScheme = StructType([
  StructField("id", StringType(), False),
  StructField("title", StringType(), False),
  StructField("year", IntegerType(), False),
  StructField("publication_venue", StringType(), False),
  StructField("authors", StringType(), False)]
)

In [13]:
edgeScheme = StructType([
  StructField("src", StringType(), False),
  StructField("dst", StringType(), False),
  StructField("count", IntegerType(), False)]
)

In [14]:
vertices = spark.read.option("delimiter", "\t").option("schema", vertexScheme).option("header", False).csv(path_to_vertices).toDF("id", "title","year", "publication_venue", "authors")

In [15]:
edges = spark.read.option("delimiter", "\t").option("schema", edgeScheme).option("header", False).csv(path_to_edges).toDF("src", "dst","count")
edges = edges.drop('count')

## Creating GraphFrame

In [16]:
from graphframes import *
g = GraphFrame(vertices,edges)

## Queries

Return all the papers that were written in 2001 and can be traced back (through citations, direct or indirect) to the paper ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging.

In [17]:
title = 'ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging'
art_id = get_id_by_title(title, vertices)
year = '2001'

In [18]:
first_query(g, year, art_id).show()

+----+--------------------+----+-----------------+--------------------+
|  id|               title|year|publication_venue|             authors|
+----+--------------------+----+-----------------+--------------------+
|2276|Career-Enhancing ...|2001|    SIGMOD Record|Alexandros Labrin...|
|1640|Scalable Distribu...|2001|          RIDE-DM|Torsten Grabs,Kle...|
|1269|Approximate Query...|2001|          VLDB J.|Kaushik Chakrabar...|
|2268|Efficiently Publi...|2001|          VLDB J.|Jayavel Shanmugas...|
|2323|Flexible Data Cub...|2001|             ICDT|Mirek Riedewald,D...|
+----+--------------------+----+-----------------+--------------------+



On which year, there is the most papers that trace back to the paper mentioned above?

In [19]:
sq_ans = second_query(g, art_id)
sq_ans.show()

+----+-----+
|year|count|
+----+-----+
|1999|   74|
|1998|   68|
|1996|   55|
|1997|   53|
|1993|   50|
|1995|   49|
|2000|   48|
|1994|   41|
|1992|   29|
|1991|    8|
|1990|    7|
|2001|    5|
+----+-----+



In [20]:
sq_ans.head()['year']

'1999'

Return the most influential papers in the citation graph.

In [21]:
third_query(g).show(1)

+----+--------------------+----+-----------------+--------------------+-----------------+
|  id|               title|year|publication_venue|             authors|         pagerank|
+----+--------------------+----+-----------------+--------------------+-----------------+
|1575|A Study of Three ...|1990|             VLDB|David J. DeWitt,P...|38.03636673275022|
+----+--------------------+----+-----------------+--------------------+-----------------+
only showing top 1 row



Discover the five largest communities

In [22]:
com, communities = lab_prop(g)
com.show(5)

+-------------+-----+
|        label|count|
+-------------+-----+
| 584115552258|  137|
| 197568495616|  103|
|1571958030339|   91|
|  60129542155|   81|
| 283467841543|   78|
+-------------+-----+
only showing top 5 rows



In [23]:
top5 = com.orderBy(com["count"].desc()).collect()[:5]
for i in range(5):
    communities[communities['label']==top5[i]["label"]].show()

+----+--------------------+----+--------------------+--------------------+------------+
|  id|               title|year|   publication_venue|             authors|       label|
+----+--------------------+----+--------------------+--------------------+------------+
| 529|Itemset Materiali...|1998|               ADBIS|Marek Wojciechows...|584115552258|
| 659|Knowledge Discove...|1996|                ICDE|Kimmo Hatonen,Mik...|584115552258|
| 299|Mining Quantitati...|1996|   SIGMOD Conference|Ramakrishnan Srik...|584115552258|
|1883|PUBLIC: A Decisio...|2000|Data Min. Knowl. ...|Rajeev Rastogi,Ky...|584115552258|
|2208|Processing Time-C...|1993|ACM Trans. Databa...|Wen-Chi Hou,Gulte...|584115552258|
| 571|Efficient Attribu...|1998|IEEE Trans. Knowl...|Colin L. Carter,H...|584115552258|
|1794|Efficiently Suppo...|1997|   SIGMOD Conference|Flip Korn,H. V. J...|584115552258|
| 262|Mining Sequential...|1996|                EDBT|Ramakrishnan Srik...|584115552258|
| 710|Automatic Subspac...|1998|

+----+--------------------+----+--------------------+--------------------+------------+
|  id|               title|year|   publication_venue|             authors|       label|
+----+--------------------+----+--------------------+--------------------+------------+
|1249|On Indexing Line ...|1990|                VLDB|      H. V. Jagadish|283467841543|
|1514|Approximate Analy...|1990|          SIGMETRICS|    Theodore Johnson|283467841543|
|1943|The Design and Im...|1998|IEEE Trans. Knowl...|Ming-Ling Lo,Chin...|283467841543|
|1191|The hB-Tree: A Mu...|1990|ACM Trans. Databa...|David B. Lomet,Be...|283467841543|
|1797|Approximation-Bas...|1998|      GeoInformatica|Hans-Peter Kriege...|283467841543|
|1430|Key Range Locking...|1993|                VLDB|      David B. Lomet|283467841543|
|1787|Dynamically Optim...|2000|                EDBT|Christian Bohm,Ha...|283467841543|
|2457|A Scientific Mult...|1996|               SSDBM|Taekyong Lee,Tolg...|283467841543|
|1342|The hBP-tree: A M...|1995|

## Working with large dataset

In [24]:
path_to_vertices_title = "/home/kostin_001/citation-data/large/paper_title.tsv"
path_to_vertices_year = "/home/kostin_001/citation-data/large/paper_year.tsv"
path_to_edges = "/home/kostin_001/citation-data/large/ref.tsv"

In [25]:
vertices_large = spark.read.option("delimiter", "\t").option("header", True).csv(path_to_vertices_title).toDF("id", "title")
vertices_l_title = spark.read.option("delimiter", "\t").option("header", True).csv(path_to_vertices_year).toDF("id", "year")

In [26]:
edges_large = spark.read.option("delimiter", "\t").option("header", True).csv(path_to_edges).toDF("src", "dst")

### Preprocess vertices
#### Join and remove empty vertices (with empty title or empty year)

In [27]:
vertices_large = vertices_large.filter(vertices_large['title']!='\\N')

In [28]:
vertices_large = vertices_large.join(vertices_l_title, "id", how='inner')

## Creating GraphFrame

In [29]:
g_large = GraphFrame(vertices_large,edges_large)

## Queries

Return all the papers that were written in 2001 and can be traced back (through citations, direct or indirect) to the paper Machine Learning.

In [30]:
title='Machine Learning'
art_id = get_id_by_title(title, vertices_large)
year = '2001'

In [31]:
first_query(g_large, year, art_id).show()

+---------+--------------------+----+
|       id|               title|year|
+---------+--------------------+----+
|118752797|Regularized Winno...|2001|
| 44682603|A Preliminary Inv...|2001|
| 21500490|Latent Semantic K...|2001|
|124450490|Automated Negotia...|2001|
| 25469601|Computerized Eval...|2001|
|   199774|     Image analogies|2001|
| 48238347|One More Revoluti...|2001|
| 47789932|Learning of Ontol...|2001|
|116179987|Evolving Neural N...|2001|
| 32061053|Computational mod...|2001|
|135271887|A bayesian approa...|2001|
|152219899|A SURVEY OF RECOM...|2001|
| 58493275|Text Categorizati...|2001|
| 32082883|A Rough Set-Aided...|2001|
| 58963568|Using unlabeled d...|2001|
| 73242454|On Clustering Val...|2001|
|106231930|An empirical stud...|2001|
|145282097|Ensembles of Clas...|2001|
| 68991837|Bayesian Network ...|2001|
| 98310893|Application of da...|2001|
+---------+--------------------+----+
only showing top 20 rows



On which year, there is the most papers that trace back to the paper mentioned above?

In [32]:
sq_ans = second_query(g_large, art_id)
sq_ans.show()

+----+-----+
|year|count|
+----+-----+
|2011|   63|
|2010|   62|
|2012|   60|
|2013|   57|
|2009|   49|
|2007|   45|
|2006|   44|
|2008|   43|
|2003|   42|
|2005|   41|
|2015|   39|
|2014|   36|
|2004|   35|
|2002|   31|
|2001|   24|
|2016|   22|
|2000|   22|
|1998|   10|
|1999|    6|
|1996|    5|
+----+-----+
only showing top 20 rows



In [33]:
sq_ans.head()['year']

'2011'

Return the most influential papers in the citation graph.

In [34]:
third_query(g_large).show(1)

+---------+--------------------+----+----------------+
|       id|               title|year|        pagerank|
+---------+--------------------+----+----------------+
|109151907|Maximum likelihoo...|1976|48.0570681047734|
+---------+--------------------+----+----------------+
only showing top 1 row



Discover the five largest communities

In [35]:
com, communities = lab_prop(g_large)
com.show(5)

+-------------+-----+
|        label|count|
+-------------+-----+
| 730144440403|  525|
|1022202216587|  486|
| 901943132216|  332|
| 738734374950|  252|
|1408749273125|  211|
+-------------+-----+
only showing top 5 rows



In [36]:
top5 = com.orderBy(com["count"].desc()).collect()[:5]
for i in range(5):
    communities[communities['label']==top5[i]["label"]].show()

+---------+--------------------+----+------------+
|       id|               title|year|       label|
+---------+--------------------+----+------------+
|  4121452|NETTOOL: A Hybrid...|1995|730144440403|
| 69981149|Modeling for Opti...|2002|730144440403|
| 65706799|Divergence Measur...|2005|730144440403|
| 69230391|Some methods for ...|1967|730144440403|
|135363541|Estimation with Q...|1961|730144440403|
| 50427228|Introducing dendr...|2005|730144440403|
| 41547217|Off-Line Learning...|2007|730144440403|
|114826224|Classifier Combin...|2000|730144440403|
|145868677|    Visualizing Data|1993|730144440403|
| 23331361|RightSPOT: A Nove...|2003|730144440403|
|  6375526|Refining Initial ...|1998|730144440403|
|149533720|A Hybrid Approach...|2004|730144440403|
|114610098|SMOTE: synthetic ...|2002|730144440403|
| 61245463|“Everything Perso...|2003|730144440403|
|139219710|On the Use of Ske...|2000|730144440403|
| 59731700|Adaptive Learning...|2009|730144440403|
|111877296|A quantitative me...

In [37]:
spark.stop()