# SciPi Spark Implementation - Part 1

## Batch Processing

#### Joseph Azzopardi & Andrew Cachia

In [1]:
import time
start = time.time()

In [2]:
start = time.time()

In [3]:
import pyspark
import pyspark.sql.functions as fn

from pyspark import SparkConf
from pyspark import SparkContext

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *

import os
from io import StringIO

In [4]:
sparkMasterURL = "spark://ubuntu:7077"
appName = 'SciPi'
exeMemory = "2g"
driMemory = "1g"

In [5]:
#findspark.init()
    
conf = SparkConf()
    
conf.setMaster(sparkMasterURL)
conf.setAppName(appName)
conf.set("spark.executor.memory", exeMemory)
conf.set("spark.driver.memory", driMemory)


<pyspark.conf.SparkConf at 0x7efc241a5470>

In [None]:
# Start spark cluster

#sc = pyspark.SparkContext(conf=conf)
sc = pyspark.SparkContext()
spark = SparkSession(sc)

# Author-Author Relationships

In [None]:
#function to load the dataset from file

def _1_load_data(full_path_to_file):
    print ("Input file location:\t", full_path_to_file)
    return sc.textFile(full_path_to_file)

def _2_authorAuthor_rel(distData, full_output_path, cSize=1):
    
    try:
    
        #Split columns (tab delimited), filter columns, and convert to DataFrame
        distData  = distData.map(lambda a: a.split("\t")).map(lambda x: (int(x[0]), int(x[1])) ).toDF(["paperid", "authorid"])

        # inner join and use id. Since the paperid is repeated for every author instance an inner join is sufficient to create 
        # the permutations
        df1 = distData.selectExpr("paperid as paperid_1", "authorid as authorid_1").alias("df1")
        df2 = distData.selectExpr("paperid as paperid_2", "authorid as authorid_2").alias("df2")

        distData = df1.join(df2, col("df1.paperid_1") == col("df2.paperid_2"), 'inner')

        # the inner join create relationships between author, so they are filtered out.
        distData = distData.select("authorid_1", "authorid_2").filter(col("authorid_1") != col("authorid_2"))

        # create new column that joins authorid_1 and authorid_2 using ':' as seperator, 
        # Keep new relations column only and add column with value of '1'

        distData = distData.withColumn("author_rel", concat(col("authorid_1"), lit(":"), col("authorid_2"))).select("author_rel").withColumn("count", lit(1))

        # group rows by author_id (1st column), and find total number of collaboration per pair.
        distData = distData.groupby(['author_rel']).agg(sum('count')).withColumn("split_rel", split(distData.author_rel, ':'))
        distData = distData.select("sum(count)", "split_rel").select(col("sum(count)").alias("collaborations"), col("split_rel").alias("author_rel"))

        # remove less interesting collaborations, and also to reduce output size
        distData = distData.filter(distData.collaborations > cSize)

        # Output to CSV - authorid_1, authorid_2, collaborations (frequency)
        distData.select( distData.author_rel[0], distData.author_rel[1], distData.collaborations).write.csv(full_output_path)
        
        print ("\n<Author-Author Relationships> successfully processed.\nFile output at:\t", full_output_path )
        
        # Clear DataFrame
        pubData.unpersist()
    
    except Exception as e:
        
        print ("An ERROR has occured while processing <Author-Author Relationships>.")
        print (e)

In [None]:
# Azure Storage Locations for input/output of Spark scripts.

AzStorage_input_path  = "wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/mag/"
AzStorage_input_filename = "PaperAuthorAffiliations.txt"
#AzStorage_input_path  = "/home/data/input/"
#AzStorage_input_filename = "PaperAuthorAffiliations_sample_10.txt"
AzStorage_input_location = AzStorage_input_path + AzStorage_input_filename


AzStorage_output_path   = "wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/"
#AzStorage_output_path   = "/home/data/results/"
AzStorage_output_folder = "author-author-rel"
AzStorage_output_location = AzStorage_output_path + AzStorage_output_folder

distData = _1_load_data(AzStorage_input_location)

# Create Author-Author Relationships
# set Threshold on for smaller of equal collaborations to be discarded.
collaborationIgnore = 1
_2_authorAuthor_rel(distData, AzStorage_output_location, collaborationIgnore)


# Author-Paper-Rel-Institutions

In [None]:
# DATASET --> { 0:PaperId, 1:AuthorId, 2:AffiliationId, 3:AuthorSequenceNumber, 
#               4:OriginalAuthor, 5:OriginalAffiliation }

def _2_paperAuthorRel(distData, full_output_path):
    
    try:
        # Split columns (tab delimited), filter required columns
        distData = distData.map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), int(x[1]), int(x[3]), str(x[4])) )

        # Convert to DataFrame
        distData = distData.toDF(["paperId", "authorId", "relationType", "institution"])

        # Mark Authors "A", and CoAuthors "Co_A". 
        # Keep labels short in order to reduce data required to store and process data.
        distData = distData.withColumn('mRelationType', when(distData.relationType == 1, "A").otherwise("Co_A"))
        
        # Output to CSV - PaperID, authorId, and relationship type.
        distData.select(distData.paperId, distData.authorId, distData.mRelationType).write.csv(full_output_path)
        
        print ("<Paper-Author Relationships> successfully processed.\nFile output at:\t", full_output_path )
        
    except:
        
        print ("An ERROR has occured while processing <Paper-Author Relationships>.")
    
    
    return (distData)


def _3_institutionsNodes(distData, full_output_path):
    
    try:
        # Keep only required fields
        distData = distData.select( distData.authorId, distData.institution)
        
        # Remove entries that have an empty institution Id
        distData = distData.filter( distData.institution != "")
        
        # Create new dataframe from institution column and drop duplicate names
        insData = distData.select(distData.institution).dropDuplicates(["institution"])
        
        # Assign an id to each institution
        insData = insData.withColumn("institutionId", monotonically_increasing_id())
    
        # Output to CSV - institutionId, and institution name.
        insData.select(insData.institutionId, insData.institution).write.csv(full_output_path)
        
        print ("<Institution Nodes> successfully processed.\nFile output at:\t", full_output_path )
    
    except:
        print ("An ERROR has occured while processing <Institutions Nodes> ")
    
    
    return (distData, insData)
    
    

def _4_institutionsAuthorRel(distData, insData, full_output_path):
    
    try:
        #Change Dataframe Headers and assign alias to dataframe
        df1 = distData.selectExpr("authorid as authorId", "institution as institution_1").alias("df1")
        df2 = insData.selectExpr("institution as institution_2", "institutionId as institutionId").alias("df2")

        # Left Join dataframes so that we include the institutionId with the dataset
        distData = df1.join(df2, col("df1.institution_1") == col("df2.institution_2"), 'left')
        
        # Drop duplicate names
        distData = distData.drop("institution_1").drop("institution_2")
        distData = distData.dropDuplicates(["authorId", "institutionId"])
        
        # Output to CSV - institutionId, and authorId.
        distData.select(distData.authorId, distData.institutionId).write.csv(full_output_path)
        
        print ("<Author-Institutions Relationships> successfully processed.\nFile output at:\t", full_output_path )
        
    except:
        
        print ("An ERROR has occured while processing <Author-Institutions Relationships>.")

In [None]:
#AzStorage_input_path  = "wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/mag/"
#AzStorage_input_filename = "PaperAuthorAffiliations.txt"
AzStorage_input_path  = "/home/data/input/"
AzStorage_input_filename = "PaperAuthorAffiliations_sample_10.txt"
AzStorage_input_location = AzStorage_input_path + AzStorage_input_filename

#AzStorage_output_path    = "wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/"
AzStorage_output_path   = "/home/data/results/"

AzStorage_output_folder_papAuthRel = "papers-author-rel"
AzStorage_output_location_papAuthRel = AzStorage_output_path + AzStorage_output_folder_papAuthRel

AzStorage_output_folder_institutions = "institutions"
AzStorage_output_location_institutions = AzStorage_output_path + AzStorage_output_folder_institutions

AzStorage_output_folder_authInstRel = "author-institution-rel"
AzStorage_output_location_authInstRel = AzStorage_output_path + AzStorage_output_folder_authInstRel

distData = _1_load_data(AzStorage_input_location)

In [None]:
# Extract Paper-Author Relationships
distData = _2_paperAuthorRel(distData, AzStorage_output_location_papAuthRel)

In [None]:
# Extract Institution Nodes
distData, insData = _3_institutionsNodes(distData, AzStorage_output_location_institutions)

In [None]:
# Extract Author-Institution Relationships
_4_institutionsAuthorRel(distData, insData, AzStorage_output_location_authInstRel)

# Author Nodes

In [None]:
# DATASET --> { 0:AuthorId, 1:Rank, 2:NormalizedName, 3:Display Name, 4:LastKnownAffiliationId, 5:PaperCount
#               6:CitationCount, 7:CreateDate }

def _2_author_nodes(distData, full_output_path):

    try:
        # Split columns (tab delimited), filter required columns
        distData = distData.map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), str(x[3]), int(x[5]), int(x[6]) ))
        
        # Convert to DataFrame
        distData = distData.toDF(["authorId", "name", "paperCount", "citationCount"])
        
        # Output to CSV - authorId, name, paper count, and citation count.
        distData.select(distData.authorId, distData.name, distData.paperCount, distData.citationCount).write.csv(full_output_path)
        
        print ("<Author Nodes> successfully processed.\nFile output at:\t", full_output_path )
        
    except:
        
        print ("An error has occured while processing <Author Nodes>.")

In [None]:
AzStorage_input_path  = "wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/mag/"
AzStorage_input_filename = "Authors.txt"
#AzStorage_input_path  = "/home/data/input/"
#AzStorage_input_filename = "Authors_sample_10.txt"
AzStorage_input_location = AzStorage_input_path + AzStorage_input_filename


AzStorage_output_path   = "wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/"
#AzStorage_output_path   = "/home/data/results/"
AzStorage_output_folder = "authors"
AzStorage_output_location = AzStorage_output_path + AzStorage_output_folder

In [None]:
distData = _1_load_data(AzStorage_input_location)

In [None]:
_2_author_nodes(distData, AzStorage_output_location)

# Journal-ConferenceNodes

In [None]:
## Journal
# DATASET --> { 0:JournalId, 1:Rank, 2:NormalizedName, 3:DisplayName, 4:Issn, 5:Publisher
#               6:Webpage, 7:PaperCount, 8:CitationCount, 9:CreatedDate, 
#
#
## Conference Instance
# DATASET --> { 0:ConferenceInstanceId, 1:NormalizedName, 2:DisplayName, 3:ConferenceSeriesId, 4:Location
#               5:OfficialURL, 6:StartDate, 7:EndDate, 8:AbstractRegistrationDate, 9:SubmissionDeadlineDate, 
#               10:NotificationDueDate, 11:FinalVersionDueDate, 12:PaperCount, 13:CitationCount, 14:Latitude,
#               15:Longitude, 16:CreatedDate }

In [None]:
def _2_journal_nodes (journalData, full_output_path_journal):

    try:
    
        # Split columns (tab delimited), filter required columns
        journalData = journalData.map(lambda x: x.split("\t")).map(lambda x: ( x[0], x[2] )) 

        # Convert to DataFrame
        journalData = journalData.toDF(["journalId", "name" ])
        
        # Output to CSV - authorId, name, paper count, and citation count.
        journalData.select(journalData.journalId, journalData.name).write.csv(full_output_path_journal)
        
        print ("<Journal Nodes> successfully processed.\nFile output at:\t", full_output_path_journal )
        
        # Clear DataFrame
        journalData.unpersist()

    except:
        
        print ("An ERROR has occured while processing <Journal Nodes>.")
        
def _3_conf_nodes (confData, full_output_path_conf):

    try:
    
        # Split columns (tab delimited), filter required columns
        confData = confData.map(lambda x: x.split("\t")).map(lambda x: ( x[0], x[1], x[4] )) 

        # Convert to DataFrame
        confData = confData.toDF(["conferenceId", "name", "year" ])
        
        # Output to CSV - authorId, name, paper count, and citation count.
        confData.select(confData.conferenceId, confData.name, confData.year).write.csv(full_output_path_conf)
        
        print ("<Conference Instance Nodes> successfully processed.\nFile output at:\t", full_output_path_conf )
        
        # Clear DataFrame
        confData.unpersist()

    except:
        
        print ("An ERROR has occured while processing <Conference Instance Nodes>.")

In [None]:
AzStorage_input_path  = "wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/mag/"
AzStorage_input_filename_journals = "Journals.txt"
#AzStorage_input_path  = "/home/data/input/"
#AzStorage_input_filename_journals  = "Journals_sample_10.txt"
AzStorage_input_location_journals = AzStorage_input_path + AzStorage_input_filename_journals

#AzStorage_input_filename_conf = "ConferenceInstances.txt"
AzStorage_input_filename_conf = "ConferenceInstances_sample_10.txt"
AzStorage_input_location_conf = AzStorage_input_path + AzStorage_input_filename_conf

AzStorage_output_path   = "wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/"
#AzStorage_output_path   = "/home/data/results/"

AzStorage_output_folder_conf   = "conferenceinstance"
AzStorage_output_location_conf = AzStorage_output_path + AzStorage_output_folder_conf

AzStorage_output_folder_journal = "journals"
AzStorage_output_location_journal = AzStorage_output_path + AzStorage_output_folder_journal

In [None]:
# Load Journal Data
journalData = _1_load_data(AzStorage_input_location_journals)

In [None]:
# Extract Journal Node
_2_journal_nodes (journalData, AzStorage_output_location_journal)

In [None]:
# Load ConferenceInstance Data
confData = _1_load_data(AzStorage_input_location_conf)

In [None]:
# Extract ConferenceInstance Node
_3_conf_nodes (confData, AzStorage_output_location_conf )

# PaperNodes

In [None]:
# DATASET --> { 0:PaperId, 1:Rank, 2:Doi, 3:DocType, 4:PaperTitle, 5:OriginalTitle
#               6:BookTitle, 7:Year, 8:Date, 9:Publisher, 10:JournalID, 11:ConferenceSeriesId,
#               12:ConferenceInstanceId, 13:Volume, 14:Issue, 15:FirstPage, 16:LastPage,
#               17:ReferenceCount, 18:CitationCount, 19:EstimatedCitation, 20:OriginalVenue,
#               21:CreatedDate }

In [None]:
def _2_paper_nodes(distData, full_output_path):

    try:
        
        # Split columns (tab delimited), filter required columns
        distData = distData.map(lambda x: x.split("\t")).map(lambda x: ( (x[0]), (x[3]), (x[5]), (x[7]) , (x[9]), (x[10]), (x[12]) )) 
        
        # Convert to DataFrame
        distData = distData.toDF(["paperId", "type", "title", "year", "publisher", "journalId", "conferenceInstanceId", ])
        
        # Add "unkown" type for records with empty type field.
        distData = distData.withColumn('type', when(distData.type == "", "Unknown").otherwise(col("type")))
        
        # Output to CSV - authorId, name, paper count, and citation count.
        distData.select(distData.paperId, distData.type, distData.title, distData.year).write.csv(full_output_path)
        
        print ("<Paper Nodes> successfully processed.\nFile output at:\t", full_output_path )
        
        # Remove unwanted Data
        return distData.select(distData.paperId, distData.publisher, distData.journalId, distData.conferenceInstanceId)       
        
    except:
        
        print ("An ERROR has occured while processing <Paper Nodes>.")
        
def _3_publishers(distData, full_output_path, full_output_path_rel):

    try:
        
        # Create new dataframe from publisher column and drop duplicate names
        pubData = distData.select(distData.paperId, distData.publisher).dropDuplicates(["publisher"])
        
        # Assign an id to each institution
        pubData = pubData.withColumn("publisherId", monotonically_increasing_id())
        
        # Output to CSV - Publisher nodes: publisherId, and publisher Name
        pubData.select(pubData.publisherId, pubData.publisher).write.csv(full_output_path)
        
        print ("<Publisher Nodes> successfully processed.\nFile output at:\t", full_output_path )
        
        # inner join and use publisher, to create paperid to publisher id relationship
        df1 = distData.selectExpr("paperId as paperId", "publisher as publisher1").alias("df1")
        df2 = pubData.selectExpr ("publisher as publisher2", "publisherId as publisherId").alias("df2")
        
        pubRelData = df1.join(df2, col("df1.publisher1") == col("df2.publisher2"), 'inner')
        
        # Output to CSV - Publisher Relationships: publisherId, and publisher Name
        pubRelData.select(pubRelData.paperId, pubRelData.publisherId).write.csv(full_output_path_rel)
        
        print ("\n<Publisher Relationships> successfully processed.\nFile output at:\t", full_output_path_rel )
        
        # Clear DataFrame
        pubData.unpersist()
               
    except:
        
        print ("An ERROR has occured while processing <Publisher Nodes>.")
        
def _4_conf_journal(distData, full_output_path_conference, full_output_path_journal):

    try:
        
        # Remove unwanted Columns
        distData = distData.select(distData.paperId, distData.journalId, distData.conferenceInstanceId)
        
        # Output to CSV - ConferenceInstanceId to PaperId Relationship -  Drop entries with no ConferenceInstanceId 
        distData.select(distData.paperId, distData.conferenceInstanceId).filter(distData.conferenceInstanceId != "").write.csv(full_output_path_conference)
        
        print ("<ConferenceInstance to Paper Relationships> successfully processed.\nFile output at:\t", full_output_path_conference )
               
        # Output to CSV - JournalId to PaperId Relationship - Drop entries with no journalId 
        distData.select(distData.paperId, distData.journalId).filter(distData.journalId != "").write.csv(full_output_path_journal)
        
        print ("\n<Journal to Paper Relationships> successfully processed.\nFile output at:\t", full_output_path_journal )
        
        # Clear DataFrame
        distData.unpersist()
        
    except:
        
        print ("An ERROR has occured while processing <ConferenceInstance or Journal to Paper Relationships>.")

In [None]:
# Azure Storage Locations for input/output of Spark scripts.

AzStorage_input_path  = "wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/mag/"
AzStorage_input_filename = "Papers.txt"
#AzStorage_input_path  = "/home/data/input/"
#AzStorage_input_filename = "Papers_sample_10.txt"
AzStorage_input_location = AzStorage_input_path + AzStorage_input_filename


AzStorage_output_path    = "wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/"
#AzStorage_output_path   = "/home/data/results/"

AzStorage_output_folder_papers = "papers"
AzStorage_output_location_papers = AzStorage_output_path + AzStorage_output_folder_papers

AzStorage_output_folder_publishers = "publishers"
AzStorage_output_location_publishers = AzStorage_output_path + AzStorage_output_folder_publishers

AzStorage_output_folder_publishersRel = "paper-publisher-rel"
AzStorage_output_location_publishersRel = AzStorage_output_path + AzStorage_output_folder_publishersRel

AzStorage_output_folder_confRel   = "paper-confinstance-rel"
AzStorage_output_location_confRel = AzStorage_output_path + AzStorage_output_folder_confRel

AzStorage_output_folder_journalRel = "paper-journal-rel"
AzStorage_output_location_journalRel = AzStorage_output_path + AzStorage_output_folder_journalRel

In [None]:
# Load Data
distData = _1_load_data(AzStorage_input_location)

In [None]:
# Extract Paper Nodes
distData = _2_paper_nodes(distData, AzStorage_output_location_papers)

In [None]:
# Extract Publisher Nodes, and Publisher-to-Paper Relationships
_3_publishers(distData, AzStorage_output_location_publishers, AzStorage_output_location_publishersRel)

In [None]:
# Extract ConferenceInstance-to-Paper Relationships, and Journal-to-Paper Relationships.
_4_conf_journal(distData, AzStorage_output_location_confRel, AzStorage_output_location_journalRel)

# Keywords Scripts

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import udf, explode, lower, log, trim, regexp_replace
from pyspark.sql.types import *
import json
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
import pyspark.sql.functions as func

In [None]:
# Load text file
file = sc.textFile("wasbs://mag-2019-03-22@ics5114mag.blob.core.windows.net/nlp/PaperAbstractsInvertedIndex.txt")
#file = sc.textFile("/home/data/input/PaperAbstractsInvertedIndex_sample_10.txt")
#fileRDD = file.flatMap(lambda k: k.split("\\r\\n")).map(lambda k: k.split("\\t"))

#Split tabbed data into columns
fileRDD = file.map(lambda k: k.split("\t"))

#Load into Dataframe
fileToDf = fileRDD.toDF(["Id","JsonData"])


# Define textfile json schema
schema = StructType(
    [
        StructField('IndexLength', IntegerType(), True),
        StructField('InvertedIndex',  MapType(StringType(), ArrayType(IntegerType())), True)
    ]
)

# Parse Json and split into columns
DF = fileToDf.withColumn("JsonData", from_json("JsonData", schema)).select(col('Id'), col('JsonData.*'))

In [None]:
# Total number of papers in dataset
totalNumDocuments = fileToDf.count()
totalNumDocuments

In [None]:
# Explode dictionary values into multiple rows (one per keyword)
# Remove non alphabetical characters
# Set all keywords as lowercase

newDF = DF.select("Id", "IndexLength", explode("InvertedIndex").alias("Keyword", "Frequency")).withColumn("Frequency", size('Frequency'))\
.withColumn("Keyword", trim(regexp_replace(col('Keyword'),'[^A-Za-z ]+', ''))).where(col('Keyword') != "")\
.withColumn("Keyword", lower(col('Keyword')))

# Calculate Term Frequency
newDF = newDF.withColumn("TF", newDF.Frequency/newDF.IndexLength)

In [None]:
# Determine frequency of keyword across all papers
idf_DF = newDF.groupby('Keyword').count().select(col("Keyword").alias("Keyword"), col("count").alias("Count"))#.sort(col("Count").desc())

# Calcualte Inverse Document Frequency
idf_DF = idf_DF.withColumn("IDF", log(totalNumDocuments/idf_DF.Count))

In [None]:
# Join TF and IDF results by keyword
left_join = newDF.alias('a').join(idf_DF.alias('b'), newDF.Keyword == idf_DF.Keyword, how='left_outer') # Could also use 'left_outer'
left_join = left_join.withColumn("TFIDF", func.round(left_join.TF*left_join.IDF, 4)).select(col("Id"), col("a.Keyword"), col("TFIDF"))

In [None]:
# Create window for each Id, rank by TFIDF score, and select top 5 Keywords
window = Window.partitionBy(left_join['Id']).orderBy(left_join['TFIDF'].desc())

top_5 = left_join.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 5).drop('rank')

top_5.write.csv("wasbs://parsed-csv-files@ics5114mag.blob.core.windows.net/results/keywords")
#top_5.write.csv("/home/data/results/keywords")

In [None]:
end = time.time()
print(end - start)