In [58]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, sum, avg, count, max, min, lower, substring, desc, from_unixtime, unix_timestamp, when
from pyspark.sql.types import *
import os
import time
import pandas

In [3]:
class Timer():
    def __init__(self):
        self.t_0 = time.time()
    def start(self):
        self.t_0 = time.time()
    def stop(self):
        end = time.time()
        print(f"took {end - self.t_0}s")
timer = Timer()
timer.start()
timer.stop()

took 0.0s


In [4]:
spark = SparkSession.\
        builder.\
        appName("project")\
        .getOrCreate()
spark

In [5]:
dir_path = os.path.abspath("") + os.sep + "data" + os.sep
load = lambda filename: spark.read.csv(dir_path + filename, header=True, inferSchema=True)
publications = load("publications.csv")
authors = load("authors.csv")

pub2auth = load("publications2authors.csv")
citations = load("citations.csv")
keywords = load("keywords.csv")

for x in (publications, authors, pub2auth, citations, keywords):
    print(type(x))
    x.printSchema()
    
authors.show()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- booktitle: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- ee: string (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- name: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- affiliations: string (nullable = true)
 |-- bio: string (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- title: string (nullable = true)
 |-- auth_name: string (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- citing: string (nullable = true)
 |-- cited: string (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
r

# C.U.D. operations

In [7]:
#crud 1
#add an author
def add_an_author(df_to_update,\
                  name,\
                  birth_year, email, affiliations, bio):
    val = [(name, birth_year, email, affiliations, bio)]
    cols = df_to_update.schema
    print(cols)
    new_row = spark.createDataFrame(val, cols)
    return new_row.union(df_to_update)
add_an_author(authors, "Mauro", 1950, "Mauro@mail.it", "Università della Calabria", "Hey there!").show()


StructType([StructField('name', StringType(), True), StructField('birth_year', IntegerType(), True), StructField('email', StringType(), True), StructField('affiliations', StringType(), True), StructField('bio', StringType(), True)])
+--------------------+----------+--------------------+--------------------+--------------------+
|                name|birth_year|               email|        affiliations|                 bio|
+--------------------+----------+--------------------+--------------------+--------------------+
|               Mauro|      1950|       Mauro@mail.it|Università della ...|          Hey there!|
|         Paul Kocher|      1961|paul_kocher@mail.com|          ETH Zurich| Pellentesque cur...|
|       Daniel Genkin|      1992|daniel_genkin@mai...|The University of...| Proin vel lobort...|
|        Daniel Gruss|      1983|daniel_gruss@mail...|The University of...| Aenean nisl mass...|
|    Werner Haas 0004|      1968|werner_haas_0004@...|          ETH Zurich| Pellentesque

In [None]:
#crud 2
#remove all traces of a certain author
def delete_author(authors, pub2auth, auth_name):
    authors = authors.filter(col("name") != auth_name)
    pub2auth = pub2auth.filter(col("auth_name") != auth_name)
    return authors, pub2auth

timer.start()
new_auth, newpub2auth = delete_author(authors, pub2auth, "Paul Kocher")
timer.stop()

authors.filter(col("name").contains("Paul")).show()
new_auth.filter(col("name").contains("Paul")).show()

In [None]:
#crud 3
#set 2022 as defualt year in lieu of null
#set january as default year in lieu of null
#replace month string with number of month

#keeps previous month column and adds "month_number" column with int
#replcaces month string with 3 letters equivalent
timer.start()
publications_month_number_col = publications.fillna({"month": "January", "year": 2022})\
.withColumn("month", substring("month",1, 3))\
.withColumn("month_number",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM').cast("int"))
timer.stop()
publications_month_number_col.select("title", "month", "year", "month_number").show()
publications_month_number_col.printSchema()

#alternative
#replaces month column values with a number and casts the values to int (it changes the schema for the month column)
timer.start()
publications_replace_month_col_and_cast = publications.fillna({"month": "January", "year": 2022})\
.withColumn("month", substring("month",1, 3))\
.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM').cast("int"))
timer.stop()
publications_replace_month_col_and_cast.select("title", "month", "year").show()
publications_replace_month_col_and_cast.printSchema()

In [27]:
#query 4 remove all books with a certain tag
def remove_problematic_keywords(publications, keywords, problematic_tags):
    problematic_publications = keywords.filter(col("keyword").isin(problematic_tags))
    return publications\
    .join(
        problematic_publications, publications.title == problematic_publications.title, "leftanti"
    ),\
    keywords.filter(~col("keyword").isin(problematic_tags))


problematic_tags = ["vestibulum", "Sed"]
timer.start()
new_publications, new_keywords = remove_problematic_keywords(publications, keywords, problematic_tags)
timer.stop()

print("old pub keywords vs new pub keywords")
keywords_of_old_publications = publications\
.select("title")\
.join(keywords, publications.title == keywords.title)\
.select("keyword").distinct().sort("keyword")\
.show()

keywords_of_new_publications = new_publications\
.select("title")\
.join(keywords, publications.title == keywords.title)\
.select("keyword").distinct().sort("keyword")\
.show()

print("old keywords vs new keywords")
keywords.select(col("keyword")).distinct().sort("keyword").show()
 
new_keywords.select(col("keyword")).distinct().sort("keyword").show()


took 0.04598402976989746s
old pub keywords vs new pub keywords
+----------+
|   keyword|
+----------+
|       Sed|
|      diam|
|       non|
|   posuere|
|    sapien|
| tincidunt|
|tincidunt,|
|     velit|
|vestibulum|
|     vitae|
+----------+

+----------+
|   keyword|
+----------+
|      diam|
|       non|
|   posuere|
|    sapien|
| tincidunt|
|tincidunt,|
|     velit|
|     vitae|
+----------+

old keywords vs new keywords
+----------+
|   keyword|
+----------+
|       Sed|
|      diam|
|       non|
|   posuere|
|    sapien|
| tincidunt|
|tincidunt,|
|     velit|
|vestibulum|
|     vitae|
+----------+

+----------+
|   keyword|
+----------+
|      diam|
|       non|
|   posuere|
|    sapien|
| tincidunt|
|tincidunt,|
|     velit|
|     vitae|
+----------+



In [None]:
#crud 5
#assign a language to a certain publication
def change_language(publications, title, new_lang):
    return  publications\
            .withColumn("language",
                       when(
                           col("title") == title, new_lang
                       ).otherwise(col("language")))
timer.start()
new_publications = change_language(publications, "Meltdown", "Russian")
timer.stop()
new_publications.select("title", "language").show(truncate=False)

# Queries

In [23]:
#query 1
#info su tutti libri scritti da un certo autore
timer.start()
q = pub2auth\
.filter(pub2auth.auth_name == "Paul Kocher")\
.join(publications, pub2auth.title == publications.title , "leftouter" )\

timer.stop()

q.toPandas().head()

took 0.011957883834838867s


Unnamed: 0,title,auth_name,title.1,type,publisher,journal,month,year,language,booktitle,volume,pages,ee
0,Spectre Attacks: Exploiting Speculative Execut...,Paul Kocher,Spectre Attacks: Exploiting Speculative Execut...,article,,meltdownattack.com,,2021,,,,,https://spectreattack.com/spectre.pdf
1,Meltdown,Paul Kocher,Meltdown,article,,meltdownattack.com,,2020,,,,,https://meltdownattack.com/meltdown.pdf


In [42]:
#query 2
# 4 libri scritti dopo il 1990 che hanno la parola "computer"
timer.start()

q1 = publications\
.filter(publications.year > 1990)\
.filter(col("title").contains("computer") | col("title").contains("Computer"))\
.limit(4)\

timer.stop()

display(q1.toPandas().head())
#second alternative
timer.start()
q2 = publications\
.filter(publications.year > 1990)\
.filter(col("title").rlike("computer|Computer"))\
.limit(4)\

timer.stop()

q2.toPandas().head(20)

took 0.010971546173095703s


Unnamed: 0,title,type,publisher,journal,month,year,language,booktitle,volume,pages,ee
0,Computer Science Curricula 2013,book,ACM Press and IEEE Computer Society Press,,,2013,,,,,https://www.wikidata.org/entity/Q107021707
1,Das Projekt EXCEPT: Expert-System for Computer...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2016,spanish,,114.0,,
2,Zur Systematik morphologischer Paradigmen: Die...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2012,afrikaans,,217.0,,
3,Erster EXCEPT-Workshop: Computerunterst�tzte U...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2018,,,125.0,,


took 0.00800013542175293s


Unnamed: 0,title,type,publisher,journal,month,year,language,booktitle,volume,pages,ee
0,Computer Science Curricula 2013,book,ACM Press and IEEE Computer Society Press,,,2013,,,,,https://www.wikidata.org/entity/Q107021707
1,Das Projekt EXCEPT: Expert-System for Computer...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2016,spanish,,114.0,,
2,Zur Systematik morphologischer Paradigmen: Die...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2012,afrikaans,,217.0,,
3,Erster EXCEPT-Workshop: Computerunterst�tzte U...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2018,,,125.0,,


In [25]:
#query 3 where, in, nested query
#titoli degli articoli, scritti né in inglese né in spagonolo, scritti però da un autore che parla almeno uno tra inglese e spagnolo
#nested query with an "in" statement
langs = ["spanish", "english"]
timer.start()

foreign_speakers = pub2auth\
        .join(
            publications\
            .select(col("title"), col("language"))\
            .filter(col("language").isin(langs)), pub2auth.title == publications.title, "leftsemi"
        )\
        .select(col("auth_name"))

articles_written_by_foreign_speakers = pub2auth\
.join(foreign_speakers, pub2auth.auth_name == foreign_speakers.auth_name, "leftsemi")\

q = publications\
.select(col("title"), col("language"))\
.filter(~(col("language").isin(langs)) | col("language").isNull())\
.select(col("title"))\
.join(articles_written_by_foreign_speakers, publications.title == articles_written_by_foreign_speakers.title, "leftsemi")\


timer.stop()

q.toPandas().head(20)

took 0.0649106502532959s


Unnamed: 0,title
0,Algebraical Optimization of FTA-Expressions
1,An Algebraic Characterization of STUF
2,A Combined Symbolic-Empirical Apprach for the ...
3,Zur Systemarchitektur von LILOG
4,Mengenorientierte Auswertung von Anfragen in d...
5,Definite Resolution over Constraint Languages
6,A Logical Operational Semantics of Full Prolog
7,Ein Fact Manager zur persistenten Speicherung ...
8,How could a good system of practical NLP look ...
9,Attribute Inheritance Implemented on Top of a ...


In [26]:
#query 4
# per ogni autore, data ultima pubblicazione e totale articoli scritti
timer.start()

q = pub2auth\
.join(publications.select(col("title"), col("year")),  publications.title == pub2auth.title)\
.groupBy("auth_name")\
.agg(
    max("year").alias("last wrote in"),
    count(publications.title).alias("wrote") #as
)\
.sort(col("wrote").desc())\

q.toPandas().head(20)



Unnamed: 0,auth_name,last wrote in,wrote
0,Christoph Meinel,2022,54
1,Gerrit Bleumer,2022,35
2,Alex Biryukov,2020,35
3,Bart Preneel,2022,28
4,Friedrich L. Bauer,2021,27
5,Carlisle Adams,2022,26
6,Burt Kaliski,2022,23
7,Peter Landrock,2022,21
8,Dieter Baum,2021,21
9,Christoph Beierle,2022,20


In [27]:
#query 5
#data del primo articolo scritto in inglese per ogni publisher
timer.start()

publications\
.select(col("language"), col("year"), col("publisher"))\
.filter(col("language") == "english")\
.groupBy(col("publisher"))\
.min("year")\
.na.fill("unknown")\
.toPandas().head(20)

Unnamed: 0,publisher,min(year)
0,unknown,1975
1,"IBM Germany Science Center, Institute for Know...",1987
2,IBM Deutschland GmbH,1997


In [29]:
#query 6
#authors with more than 10 publications (top 10)

pub2auth\
.groupBy(pub2auth.auth_name)\
.agg(
count("title").alias("pubXauthor")
)\
.filter(col("pubXauthor") > 10)\
.sort(col("pubXauthor").desc())\
.limit(10)\
.toPandas().head(20)

Unnamed: 0,auth_name,pubXauthor
0,Christoph Meinel,54
1,Alex Biryukov,35
2,Gerrit Bleumer,35
3,Bart Preneel,28
4,Carlisle Adams,26
5,Friedrich L. Bauer,25
6,Burt Kaliski,23
7,Peter Landrock,21
8,Dieter Baum,21
9,Christoph Beierle,20


In [30]:
#query 7
#years between 2000 and 2015 with more than 50 publications

publications\
.filter((publications.year > 2000)  & (publications.year < 2015))\
.groupBy(publications.year)\
.agg(
count("title").alias("pubXyear")
)\
.filter(col("pubXyear") >50)\
.sort(col("pubXyear").desc())\
.toPandas().head(20)

Unnamed: 0,year,pubXyear
0,2011,458
1,2014,71
2,2012,61
3,2013,60
4,2010,51


In [31]:
#query 8
#average year of birth of the youngest(and oldest) author per type of publication
#the considered articles have been written by a team of authors all born after 1960

#articles where at least one author was born after 1960
timer.start()
#find articles where one of the authors was born in 1960 or before
written_by_old =  pub2auth\
                        .join(
                            authors.\
                            select(col("name"),col("birth_year"))\
                            .filter(col("birth_year") <= 1960), col("auth_name") == col("name")
                        ).select("title")
#remove publications where at least one author was born before 1960
#of these articles get their authors' names
#of these authors get their birth year
#of each publication find the youngest and the oldest
#for each type get the average year of birth for the youngest and the oldest for their publications
q = publications\
.select(col("title"), col("type"))\
.join(written_by_old, publications.title == written_by_old.title, "leftanti")\
.join(pub2auth, publications.title == pub2auth.title)\
.join(authors.select(col("name"), col("birth_year")), col("auth_name") == col("name"))\
.groupBy(col("type"), publications.title).agg(
    min(col("birth_year")).alias("oldest"),
    max(col("birth_year")).alias("youngest")
)\
.groupBy(col("type")).agg(
    avg(col("youngest")).alias("avg_youngest"),
    avg(col("oldest")).alias("avg_oldest")
)
timer.stop()
q.toPandas().head(20)



took 0.08800506591796875s


Unnamed: 0,type,avg_youngest,avg_oldest
0,inproceedings,1969.5,1969.5
1,mastersthesis,1974.8,1974.8
2,article,1984.980337,1978.516854
3,incollection,1980.837912,1978.108516
4,book,1967.0,1967.0


In [32]:
#query 11(?)
#all publications of 2010 containing the most used keyword
topKeyword = (keywords\
.groupBy(keywords.keyword)\
.agg(
    count(col("title")).alias("KeyCounter")
)\
.sort(col("KeyCounter").desc())\
.limit(1)\
.join(keywords, keywords.keyword == keywords.keyword, "inner").drop(keywords.keyword)
)


topKeyword\
.join(publications.select(col("title"), col("year")), topKeyword.title == publications.title, "inner").drop(topKeyword.title)\
.filter(col("year") == 2010)\
.toPandas().head(20)


Unnamed: 0,KeyCounter,keyword,title,year
0,451,vestibulum,Muffin: A Distributed Database Machine,2010
1,451,vestibulum,On Structuring Domain-Specific Knowledge,2010
2,451,vestibulum,Do We Really Need Common Variable Orders for S...,2010
3,451,vestibulum,A Decomposition Algorithm for Optimization ove...,2010
4,451,vestibulum,Parallel versus Sequential Task-Processing: A ...,2010
5,451,vestibulum,An Efficient Method for Aerodynamic Shape Opti...,2010
6,451,vestibulum,An Even Faster Solver for General Systems of E...,2010
7,451,vestibulum,On MAPA/G/K/K Stations,2010
8,451,vestibulum,On the GI/G/k Queue with Lebesgue-Dominated In...,2010
9,451,vestibulum,Right-to-Left Exponentiation.,2010


In [33]:
# query 9
# Authors with at least two publications of type "article" published in 2022  

pub2auth\
.join(publications.select(col("title"), col("year"), col("type")),  publications.title == pub2auth.title)\
.filter(col("type") == "article")\
.filter(col("year") == 2022)\
.groupBy("auth_name")\
.agg(
    count(publications.title).alias("numXauthor")
)\
.filter(col("numXauthor") > 2)\
.toPandas().head(20)



Unnamed: 0,auth_name,numXauthor
0,Helmut Seidl,3
1,Udo Pletat,3
2,Klaus Jansen,4
3,Thomas Ludwig 0001,5
4,Christoph Beierle,3


In [45]:
# query 10
# Articles with at least 5 authors and the vitae keyword

pub2auth\
.join(publications.select(col("title"), col("year"), col("type")),  publications.title == pub2auth.title)\
.join(keywords.select(col("title"), col("keyword")), keywords.title == pub2auth.title)\
.filter(col("type") == "article")\
.filter(col("keyword") == "vitae")\
.groupBy(publications.title)\
.agg(
    count("auth_name").alias("numAuthorsXpub")
)\
.filter(col("numAuthorsXpub") >= 5)\
.toPandas().head()


Unnamed: 0,title,numAuthorsXpub
0,Spectre Attacks: Exploiting Speculative Execut...,10
1,Nonapproximability Results for Partially Obser...,6
2,Scheduling with Incompatible Jobs,6
3,LILOG-DB: Database Support for Knowledge-Based...,5


In [35]:
# new query 3 WHERE, IN, nested query
# titles publications written in 1980 at their latest and with at least one of the keywords in the critical_keywords list

#nested query with in
critical_keywords = ["velit", "non"]
pubs_with_keywords =  keywords.filter(col("keyword").isin(critical_keywords))

#where part with nesting
q = publications\
.select(col("title"), col("year"))\
.filter(col("year") <= 1980)\
.join(pubs_with_keywords, pubs_with_keywords.title == publications.title)\
.select(publications.title)


q.toPandas().head(10)


Unnamed: 0,title
0,Two Program Comprehension Tools for Automatic ...
1,The Infinite Server Queue with Markov Additive...
2,Performance Analysis of SDN Specific Error Pro...
3,Ein Newtonverfahren zur zeitoptimalen Vibratio...
4,BMAP/G/1-Queues: Properties of the Fundamental...
5,Biometric Authentication.
6,Biometric Authentication.
7,Threshold Signature.
8,Binary Euclidean Algorithm.
9,Factorization Circuits.


In [36]:
#new query 8
# WHERE, Nested Query (i.e., 2-step Queries), GROUP BY
#publications released by a publisher with at least 10 publications

#where filter the not null and count >= 10
#group_by : num of  publications
#nete: this whole query
prolific_publisher = publications\
.select(col("title"), col("publisher"))\
.filter(col("publisher").isNotNull())\
.groupBy(col("publisher"))\
.agg(
    count(col("title")).alias("count")
)\
.filter(col("count") >= 10)\

d = publications\
.join(prolific_publisher, publications.publisher == prolific_publisher.publisher, "leftsemi")\

d.toPandas().head(10)






Unnamed: 0,title,type,publisher,journal,month,year,language,booktitle,volume,pages,ee
0,Die Repr�sentation r�umlichen Wissens und die ...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2019,spanish,,191,,
1,Algebraical Optimization of FTA-Expressions,article,IBM Deutschland GmbH,LILOG-Report,,2020,,,59,,
2,Wissensrepr�sentation und Maschinelles Lernen,article,IBM Deutschland GmbH,LILOG-Report,,2010,,,15,,
3,An Algebraic Characterization of STUF,article,IBM Deutschland GmbH,LILOG-Report,,2021,,,40,,
4,A Combined Symbolic-Empirical Apprach for the ...,article,"IBM Germany Science Center, Institute for Know...",IWBS Report,,2012,,,225,,
5,Zur Systemarchitektur von LILOG,article,IBM Deutschland GmbH,LILOG-Memo,,1994,,,2,,
6,Mengenorientierte Auswertung von Anfragen in d...,article,IBM Deutschland GmbH,LILOG-Report,,1991,,,61,,
7,Definite Resolution over Constraint Languages,article,IBM Deutschland GmbH,LILOG-Report,,2020,,,53,,
8,Dokumentation der Syntax der LILOG-Grammatik,article,IBM Deutschland GmbH,LILOG-Memo,,1999,english,,9,,
9,Cognitive Linguistics: The Processing of Spati...,article,IBM Deutschland GmbH,LILOG-Report,,2015,spanish,,45,,


In [38]:
#query 14 top 3 publications that cite the greates number of publications
publications\
.join(citations, publications.title==citations.citing, "left")\
.groupby(col("title")).agg(
    count("cited").alias("num of citations")
).sort(col("num of citations").desc())\
.limit(3)\
.toPandas().head(10)

Unnamed: 0,title,num of citations
0,Vigen�re Encryption.,38
1,Fast Generation of Random Permutations via Net...,15
2,Macrodata Disclosure Protection.,15
