## Datastream Processing Lab 1: B.2 Application on the SHealth dataset

Arpa MUKHERJEE

In [1]:
import findspark
findspark.init('C:\spark')
import pyspark
from pyspark import SparkContext

In [2]:
# Set Spark Configurations
conf = pyspark.SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '8G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '10G')
        .set('spark.yarn.executor.memoryOverhead', '2048MB')
        .set('spark.default.parallelism', '100'))
sc = SparkContext(conf=conf)

In [3]:
# Get relevant datasets

# Paper auths
papauths_rdd = sc.textFile("Documents/paperauths3.csv").map(lambda line: line.split(",")) 

# Authors
auths_rdd = sc.textFile("Documents/authors.csv").map(lambda line: line.split(",")) 

In [4]:
# # Check length of dataset for minsupport calculation
# import pandas as pd
# import numpy as np

# df = pd.read_csv("Documents/papers.csv")
# tf = np.unique(df.id)
# minpapers = len(tf)*0.0001 
# minpapers # result: 315 -- so find the set of authors that have atleast written this many papers

In [5]:
# Map author names to paper ids

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .appName("temp sql") \
    .getOrCreate()
    
# Infer the schema, and register the DataFrame as a table.
schemaAuths = sqlContext.createDataFrame(auths_rdd)
schemaAuths.createOrReplaceTempView("auths")

schemaPaperAuths = sqlContext.createDataFrame(papauths_rdd)
schemaPaperAuths.createOrReplaceTempView("papauths")

# SQL to map names of authors to paperid
auth_names = spark.sql("SELECT papauths._1, auths._2 from papauths left join auths on papauths._2 = auths._1")

auth_names.show()

+-------+--------------------+
|     _1|                  _2|
+-------+--------------------+
|2847493|            Sheng Yu|
|2847494|    Clelia de Felice|
|2847486|         Vesa Halava|
|2847488|         Vesa Halava|
|2847491|         Vesa Halava|
|2847486|          Tero Harju|
|2847488|          Tero Harju|
|2847491|          Tero Harju|
|2847494|     Paola Bonizzoni|
|2847489|   Jean-Pierre Borel|
|2847491|     Jeffrey Shallit|
|2847494|       Rosalba Zizza|
|2847494|     Giancarlo Mauri|
|2847491|     Narad Rampersad|
|2847493|         Kai Salomaa|
|2847490|Alessandra Cherubini|
|2847486|         Michel Rigo|
|2847490|         Michel Rigo|
|2847492|         Michel Rigo|
|2847486|          Tomi Kärki|
+-------+--------------------+
only showing top 20 rows



In [6]:
# Convert dataframe to rdd
papauthrdd = auth_names.rdd

In [7]:
# Get unique key-value pairs of: paper_id, [author_ids] and flatten to list of lists of author ids that worked together
papauths_grp = papauthrdd.groupByKey().map(lambda x : (x[0], list(x[1]))) # paper id, [author_ids]
auths_temp = papauths_grp.groupByKey().map(lambda x : (list(x[1]))) # [[author_ids_paper1], [author_ids_paper_2]..]
auth_pairs = auths_temp.flatMap(lambda x: x) # wrapped array not unique (some authors printed multiple times per paper)
auth_list = auth_pairs.map(lambda x: list(set(x))).cache() # get arrays with unique values only

In [8]:
# Conversions for processing under FPGrowth
alist = auth_list.collect()
ardd = sc.parallelize(alist)

In [9]:
# Run FP Growth algorithm to find frequent sets of authors
from pyspark.mllib.fpm import FPGrowth

#model = FPGrowth.train(ardd, minSupport= 0.000005, numPartitions= 10)
#model = FPGrowth.train(ardd, minSupport= 0.00001, numPartitions= 10) #0001, #00015, #00018
model = FPGrowth.train(ardd, minSupport= 0.1, numPartitions= 1)
#res = model.freqItemsets()

#res.saveAsTextFile("hw5resultsx")

In [10]:
# View results
result = sorted(model.freqItemsets().collect())
for fi in result:
    print(fi)

FreqItemset(items=['Alessandra Cherubini'], freq=1)
FreqItemset(items=['Alessandra Cherubini', 'Michel Rigo'], freq=1)
FreqItemset(items=['Alessandra Cherubini', 'Tomi Kärki'], freq=1)
FreqItemset(items=['Alessandra Cherubini', 'Tomi Kärki', 'Michel Rigo'], freq=1)
FreqItemset(items=['Clelia de Felice'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Giancarlo Mauri'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Giancarlo Mauri', 'Paola Bonizzoni'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Giancarlo Mauri', 'Rosalba Zizza'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Giancarlo Mauri', 'Rosalba Zizza', 'Paola Bonizzoni'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Paola Bonizzoni'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Rosalba Zizza'], freq=1)
FreqItemset(items=['Clelia de Felice', 'Rosalba Zizza', 'Paola Bonizzoni'], freq=1)
FreqItemset(items=['Giancarlo Mauri'], freq=1)
FreqItemset(items=['Giancarlo Mauri', 'Paola Bonizzoni'], freq=1)
FreqItemset(items