In [1]:
import re
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as f
from pyspark.sql import Window
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import least
from pyspark.sql.functions import greatest
from pyspark.sql import SQLContext

In [2]:
# load author data into dataframe
author_file = "/FileStore/tables/graph/AMiner_Author-81e2b.txt"

def format_data(item=None, replace_term=None, split=False, split_term=";"):
    tmp = item.replace(replace_term, "").strip()
    if split:
        return tmp.split(split_term) if tmp != "" else list()
    return tmp
  
author_rdd = spark.sparkContext \
    .newAPIHadoopFile(path=author_file,
                      inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                      keyClass="org.apache.hadoop.io.LongWritable",
                      valueClass="org.apache.hadoop.io.Text",
                      conf={"textinputformat.record.delimiter": "#index "}) \
    .filter(lambda x: x[1] != "") \
    .map(lambda x: (x[0], x[1].split("\n"))) \
    .flatMap(lambda x: [{"id": int(format_data(x[1][0], "")),
                         "name": format_data(x[1][1], "#n"),
                         "affiliations": format_data(x[1][2], "#a", True),
                         "published papers": int(format_data(x[1][3], "#pc")),
                         "citations": int(format_data(x[1][4], "#cn")),
                         "h-index": int(format_data(x[1][5], "#hi")),
                         "p-index": float(format_data(x[1][6], "#pi")),
                         "up-index": float(format_data(x[1][7], "#upi")),
                         "keyterms": format_data(x[1][8], "#t", True)}])

author = author_rdd.toDF()
print(author.count())
print(author.distinct().count())
author.show(5)

In [3]:
# load paper data into dataframe
paper_file = "/FileStore/tables/graph/AMiner_Paper-478c8.txt"

def format_data(item=None, replace_term=None, split=False, split_term=";"):
    tmp = item.replace(replace_term, "").strip()
    if split:
        return tmp.split(split_term) if tmp != "" else list()
    return tmp

def format_year(replace_term=None, item=None):
    try:
        return int(format_data(item, replace_term))
    except ValueError:
        return None

paper_rdd = spark.sparkContext \
     .newAPIHadoopFile(path=paper_file,
                       inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                       keyClass="org.apache.hadoop.io.LongWritable",
                       valueClass="org.apache.hadoop.io.Text",
                       conf={"textinputformat.record.delimiter": "#index "}) \
     .filter(lambda x: x[1] != "") \
     .map(lambda x: (x[0], x[1].split("\n"))) \
     .flatMap(lambda x: [{"id": int(format_data(x[1][0], "")),
                          "title": format_data(x[1][1], "#*"),
                          "authors": format_data(x[1][2], "#@", True),
                          "affiliations": format_data(x[1][3], "#o", True),
                          "year": format_year("#t", x[1][4]),
                          "publication venue": format_data(x[1][5], "#c")}])

paper = sqlContext.createDataFrame(paper_rdd)
print(paper.count())
print(paper.distinct().count())
paper.show(5)

In [4]:
# load author2paper data into dataframe
author2papersc = sc.textFile("/FileStore/tables/graph/AMiner_Author2Paper-df443.txt")
author2paper = sqlContext.createDataFrame(author2papersc.map(lambda a :tuple(a.split("\t"))),['index','authorid', 'paperid', 'position'])
author2paper = author2paper.withColumn("index", author2paper["index"].cast(IntegerType())).withColumn("authorid", author2paper["authorid"].cast(IntegerType())).withColumn("paperid", author2paper["paperid"].cast(IntegerType())).withColumn("position", author2paper["position"].cast(IntegerType()))
print(author2paper.count())
print(author2paper.distinct().count())
author2paper.show(5)

In [5]:
# load coauthor data into dataframe
coauthorsc = sc.textFile("/FileStore/tables/graph/AMiner_Coauthor-2abe4.txt")
coauthor = sqlContext.createDataFrame(coauthorsc.filter(lambda r: r.startswith('#')).map(lambda x: re.sub('#', '', x)).map(lambda a :tuple(a.split("\t"))),['author1','author2', 'collaborationcount'])
coauthor = coauthor.withColumn("author1", coauthor["author1"].cast(IntegerType())).withColumn("author2", coauthor["author2"].cast(IntegerType())).withColumn("collaborationcount", coauthor["collaborationcount"].cast(IntegerType()))
print(coauthor.count())
print(coauthor.distinct().count())
coauthor.show(5)

In [6]:
# get publish year for author2paper dataframe
authorpaper = author2paper.join(paper[['id', 'year']], author2paper.paperid == paper[['id', 'year']].id, 'inner').drop(paper[['id', 'year']].id)
print(authorpaper.count())
print(authorpaper.distinct().count())
authorpaper.show(5)

In [7]:
# get first year of collaboration between two authors
authorpaper1 = authorpaper[['authorid', 'paperid', 'year']].withColumnRenamed('authorid', 'authorid1').withColumnRenamed('paperid', 'paperid1')
coauthor = coauthor.join(authorpaper1, coauthor.author1 == authorpaper1.authorid1, 'inner').drop(authorpaper1.authorid1)
print(coauthor.count())
print(coauthor.distinct().count())
coauthor.show(5)

authorpaper2 = authorpaper[['authorid', 'paperid']].withColumnRenamed('authorid', 'authorid2').withColumnRenamed('paperid', 'paperid2')
coauthor = coauthor.join(authorpaper2, (coauthor.author2 == authorpaper2.authorid2) & (coauthor.paperid1 == authorpaper2.paperid2), 'inner').drop(authorpaper2.authorid2)
print(coauthor.count())
print(coauthor.distinct().count())
coauthor.show(5)

w = Window.partitionBy('author1', 'author2')
coauthor_final = coauthor.withColumn('1styear', f.min('year').over(w))\
                  .where(f.col('year') == f.col('1styear'))\
                  .drop('1styear')

coauthor_final = coauthor_final[['author1', 'author2', 'year']].distinct()
print(coauthor_final.count())
print(coauthor_final.distinct().count())
coauthor_final.show(5)

In [8]:
w = Window.partitionBy('author1', 'author2')
coauthor_final = coauthor.withColumn('1styear', f.min('year').over(w))\
                  .where(f.col('year') == f.col('1styear'))\
                  .drop('1styear')

coauthor_final = coauthor_final[['author1', 'author2', 'year']].distinct()
print(coauthor_final.count())
print(coauthor_final.distinct().count())
coauthor_final.show(5)

In [9]:
display(coauthor_final.groupBy("year").agg(count(lit(1)).alias("count")).orderBy("year"))

In [10]:
df_writer_coauthor_final = DataFrameWriter(coauthor_final)
df_writer_coauthor_final.saveAsTable("coauthor_final", mode="overwrite")

In [11]:
coauthor_final = spark.sql("select * from coauthor_final")
coauthor_late = coauthor_final.where(col('year') >= 2010)
coauthor_early = coauthor_final.where((col('year') < 2010))
print(coauthor_early.count(), coauthor_late.count())
df_writer_coauthor_late = DataFrameWriter(coauthor_late)
df_writer_coauthor_early = DataFrameWriter(coauthor_early)

df_writer_coauthor_late.saveAsTable("late", mode="overwrite")
df_writer_coauthor_early.saveAsTable("early", mode="overwrite")

In [12]:
coauthor_early = spark.sql("select * from early")
coauthor_late = spark.sql("select * from late")

author_list_early = coauthor_early[['author1']].union(coauthor_early[['author2']]).distinct().withColumnRenamed('author1', 'id')
author_list_late = coauthor_late[['author1']].union(coauthor_late[['author2']]).distinct().withColumnRenamed('author1', 'id')
coauthor_early_final = coauthor_early.withColumn('src', least('author1', 'author2')).withColumn('dst', greatest('author1', 'author2'))[['src', 'dst']]
coauthor_late_final = coauthor_late.withColumn('src', least('author1', 'author2')).withColumn('dst', greatest('author1', 'author2'))[['src', 'dst']]
author_list_final = author_list_early.join(author_list_late, author_list_early.id == author_list_late.id, 'inner').drop(author_list_late.id)

df_writer_edges = DataFrameWriter(coauthor_early_final)
df_writer_vertices = DataFrameWriter(author_list_early)

df_writer_edges.saveAsTable("edges", mode="overwrite")
df_writer_vertices.saveAsTable("vertices", mode="overwrite")

df_writer_coauthor_late_final = DataFrameWriter(coauthor_late_final)
df_writer_author_list_final = DataFrameWriter(author_list_final)

df_writer_coauthor_late_final.saveAsTable("late_final", mode="overwrite")
df_writer_author_list_final.saveAsTable("list_final", mode="overwrite")

In [13]:
spark.sql('select * from list_final').count()

In [14]:
coauthor_late_final = spark.sql("select * from late_final")
author_list_final = spark.sql("select * from list_final")
edges = spark.sql("select * from edges")
coauthor_late_final = coauthor_late_final.withColumn('target', lit(1))
left = author_list_final.withColumnRenamed('id', 'author1')
right = author_list_final.withColumnRenamed('id', 'author2')
data = left.join(right, left.author1 < right.author2, 'cross')
data = data.join(edges, (data.author1 == edges.src) & (data.author2 == edges.dst), 'left_anti')
data = data.join(coauthor_late_final, (data.author1 == coauthor_late_final.src) & (data.author2 == coauthor_late_final.dst), 'left').drop(coauthor_late_final.src).drop(coauthor_late_final.dst)
data = data.fillna({'target':0})
df_writer_data = DataFrameWriter(data)
df_writer_data.saveAsTable("data", mode="overwrite")

In [15]:
data = spark.sql("select * from data")
data.groupBy("target").agg(count(lit(1)).alias("count")).show()

In [16]:
# Sample train and test data. Train data is balanced by undersampling. Test data is supposed to use original dataset but due to computational expensiveness it is also undersampled
trainsplit, testsplit = data.randomSplit([0.8, 0.2], seed=123)
train_pos = trainsplit.filter(f.col('target') == 1)
train_neg = trainsplit.filter(f.col('target') == 0).sample(False, 0.000017, seed=1)
train = train_pos.union(train_neg)
train = train.withColumn('train', lit(1))

test_pos = testsplit.filter(f.col('target') == 1)
test_neg = testsplit.filter(f.col('target') == 0).sample(False, 0.000017, seed=2)
test = test_pos.union(test_neg)
test = test.withColumn('train', lit(0))

train_data = train.union(test)
df_writer_train = DataFrameWriter(train_data)
df_writer_train.saveAsTable("train_data", mode="overwrite")

In [17]:
%scala
// Calculate Common Neighbors, Total Neighbors, Preferential Attachment, Jaccard Similarity, Adamic Adar and Resource Allocation
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.numericRDDToDoubleRDDFunctions
import ml.sparkling.graph.operators.OperatorsDSL._
import ml.sparkling.graph.operators.measures.edge.AdamicAdar
import ml.sparkling.graph.operators.measures.edge.CommonNeighbours
import org.apache.spark.graphx.Graph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import java.lang.Long;

In [18]:
%scala
val PreprocessedRDD = spark.sql("select * from edges").rdd.map{_.toSeq.map{_.toString}.toArray}.cache()
val Node1Vertex = PreprocessedRDD.map(line=>(line(0).toLong,1)).distinct()
val Node2Vertex = PreprocessedRDD.map(line=>(line(1).toLong,1)).distinct()
val completeVertex = Node1Vertex.union(Node2Vertex).distinct
val EdgesRDD = PreprocessedRDD.map(line=>(Edge(line(0).toLong,line(1).toLong,1))).distinct()

val graph=Graph(completeVertex,EdgesRDD).persist().cache();
val neigh =graph.collectNeighborIds(EdgeDirection.Either)
val broadcastVar = sc.broadcast(neigh.collect())

In [19]:
%scala
val PreprocessedRDDLabelRaw = spark.sql("select * from train_data").rdd.map{_.toSeq.map{_.toString}.toArray}.cache()
val labelDataToRdd = sc.broadcast(PreprocessedRDDLabelRaw.collect())
val Degree = graph.degrees.collect().toArray

In [20]:
%scala
import sqlContext.implicits._
import scala.math.sqrt
import scala.math.min
import scala.math.max
val r_rdd = PreprocessedRDDLabelRaw.mapPartitions(rows => {
      	val nvalues = broadcastVar.value.toMap
      	rows.map(row=>{
                val n1 = row(0).toLong            
                val n2 = row(1).toLong
                val n1_neigh =nvalues(n1)
                val n2_neigh =nvalues(n2)  
                val common = n1_neigh.intersect(n2_neigh)
                val total = n1_neigh.union(n2_neigh).distinct
                // Common Neigbors
                val common_neighbors = common.length
                // Total Neigbors
                val total_neighbors = total.length
                // Preferential Attachment
                val preferential_attachment = n1_neigh.length * n2_neigh.length
                // Jacardd Similarity
                val jaccard = common.length/total.length.toDouble
                // Adamic Adar
                val adamic_adar = Degree.filter{case (id, deg) => common.contains(id)}.map(x => 1/math.log(x._2)).sum
                // Resource Allocation
                val resource_allocation = Degree.filter{case (id, deg) => common.contains(id)}.map(x => 1/x._2.toDouble).sum
                // Leicht-Holme-Nerman
                val leicht_holme_nerman = (common.length/(n1_neigh.length * n2_neigh.length)).toDouble
                // Sorensen Index
                val sorensen_index = (common.length/(n1_neigh.length + n2_neigh.length)).toDouble
                // Salton Cosine Similarity
                val salton_cosine_similarity = common.length/sqrt(n1_neigh.length * n2_neigh.length)
                // Hub Promoted
                val hub_promoted = (common.length/min(n1_neigh.length, n2_neigh.length)).toDouble
                // Hub Depressed
                val hub_depressed = (common.length/max(n1_neigh.length, n2_neigh.length)).toDouble
                (n1, n2, common_neighbors, total_neighbors, preferential_attachment, jaccard, adamic_adar, resource_allocation, leicht_holme_nerman, sorensen_index, salton_cosine_similarity, hub_promoted, hub_depressed)
              })
    	}).cache()

In [21]:
%scala
val colNames = Seq("author1", "author2", "common_neighbors", "total_neighbors", "preferential_attachment", "jaccard", "adamic_adar", "resource_allocation", "leicht_holme_nerman", "sorensen_index", "salton_cosine_similarity", "hub_promoted", "hub_depressed")
val neighbor = r_rdd.toDF(colNames: _*)
neighbor.write.format("parquet").mode("overwrite").saveAsTable("neighbor")

In [22]:
# Community Detection
from graphframes import *

graph_edges =  spark.sql('select * from edges').select("src", "dst")
graph_vertices =  spark.sql('select * from vertices')

g = GraphFrame(graph_vertices, graph_edges)

sc.setCheckpointDir('/FileStore/tables/graph')
tc = g.triangleCount()
lp = g.labelPropagation(maxIter=2)
cc = g.connectedComponents()

community = cc.join(tc, cc.id == tc.id).drop(tc.id) 
community_final = community.join(lp, community.id == lp.id).drop(lp.id).withColumnRenamed('count', 'triangle_count').withColumnRenamed('label', 'label_propagation_cluster_no').withColumnRenamed('component', 'connected_component_cluster_no')
df_writer_community = DataFrameWriter(community_final)
df_writer_community.saveAsTable("community", mode="overwrite")

In [23]:
spark.sql('select * from edges').coalesce(1).write.format('json').mode("overwrite").save("dbfs:/FileStore/tables/graph/edges")
spark.sql('select * from vertices').coalesce(1).write.format('json').mode("overwrite").save("dbfs:/FileStore/tables/graph/vertices")

In [24]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
import community 
import json
from networkx.algorithms import community
from networkx.algorithms.community import label_propagation_communities
from community import community_louvain
from random import sample 
import csv
import itertools

numpy.random.seed(seed=123)

'''
Load edges/vertices to networkx and run louvain modularity
'''
table_graph_edges = '/dbfs/FileStore/tables/graph/edges/part-00000-tid-396298111213125007-c7da24a8-57cb-409d-89e9-3b9e58b9144b-4392457-1-c000.json'
table_graph_vertices = '/dbfs/FileStore/tables/graph/vertices/part-00000-tid-1328123797423764871-ba292581-975f-43d9-a96e-bbe7761efc9d-4392458-1-c000.json'

edge_list = []
for line in open(table_graph_edges, 'r'):
    edge_list.append(json.loads(line))
    
edges_list = [(dic['src'],dic['dst']) for dic in edge_list]

vertice_list = []
for line in open(table_graph_vertices, 'r'):
    vertice_list.append(json.loads(line))
    
vertice_list = [(dic['id']) for dic in vertice_list]

g = nx.Graph()
g.add_edges_from(edges_list)
g.add_nodes_from(vertice_list)

In [25]:
partition = community_louvain.best_partition(g)

In [26]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameWriter
louvain = sqlContext.createDataFrame(pd.DataFrame(list(partition.items()),columns=['id','lv_cluster_no']))
df_writer_louvain = DataFrameWriter(louvain)
df_writer_louvain.saveAsTable("community_louvain",mode='overwrite')

In [27]:
community = spark.sql('select * from community')
community_louvain = spark.sql('select * from community_louvain')
community = community.join(community_louvain, community.id == community_louvain.id).drop(community_louvain.id)
community.coalesce(1).write.format('csv').mode("overwrite").save("dbfs:/FileStore/tables/graph/community", header = 'true')

In [28]:
spark.sql('select * from neighbor').coalesce(1).write.format('csv').mode("overwrite").save("dbfs:/FileStore/tables/graph/neighbor", header = 'true')
spark.sql('select * from train_data').coalesce(1).write.format('csv').mode("overwrite").save("dbfs:/FileStore/tables/graph/train", header = 'true')