In [None]:
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.types import *
from pyspark.sql.functions import col, split, expr, array_contains

## 1. Start a SparkSession

In [None]:
# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("MyFirstSparkApplication") \
      .getOrCreate()
# master contains the URL of your remote spark instance or 'local'

In [None]:
def HeaderToSchema(header):
    structTypeArg = [] # argument to be passed to StructType function
    arrayCols = [] # holds the name of the columns that have an array type

    for col in header.columns:
        typeStr = col.split(":")[1]
        colStr = col.split(":")[0]
        if typeStr == "string[]":
            newCol = StructField(colStr,StringType(),True)
            arrayCols.append(colStr)
        elif typeStr == "string":
            newCol = StructField(colStr,StringType(),True)
        elif typeStr == "int":
            newCol = StructField(colStr,IntegerType(),True)
        elif typeStr == "date":
            newCol = StructField(colStr,TimestampType(),True)
        elif typeStr == "ID":
            newCol = StructField(colStr,IntegerType(),True)
        else :
            newCol = StructField(colStr,StringType(),True)

        structTypeArg.append(newCol);

    schema = StructType(structTypeArg)

    return schema, arrayCols

# returns a new df with the columns in cols converted from String to Array, by splitting the string wrt '|'
def convertStrToArray(df, cols):
    return reduce(
        lambda df, colname: df.withColumn(colname, split(col(colname), "\\|").alias(colname)),
        cols,
        df
    )

## 2. Load csv data

In [None]:
#1. Articles
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_article_header.csv")
schema, cols = HeaderToSchema(header)
articles = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_article.csv",schema = schema)

# convert to array
articles = convertStrToArray(articles, cols)

In [None]:
#2. Inproceedings
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_inproceedings_header.csv")
schema, cols = HeaderToSchema(header)
inproceedings = spark.read.option("header", False).option("delimiter", ";").csv("dblp_small/inproceedings_small.csv",schema = schema)

# convert to array
inproceedings = convertStrToArray(inproceedings, cols)


In [None]:
#3. Proceedings
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_proceedings_header.csv")
schema, cols = HeaderToSchema(header)
proceedings = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_proceedings.csv",schema = schema)

# convert to array
proceedings = convertStrToArray(proceedings, cols)

In [None]:
#4. Books
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_book_header.csv")
schema, cols = HeaderToSchema(header)
books = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_book.csv",schema = schema)

# convert to array
books = convertStrToArray(books, cols)

In [None]:
#5. Incollection
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_incollection_header.csv")
schema, cols = HeaderToSchema(header)
incollection = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_incollection.csv",schema = schema)

# convert to array
incollection = convertStrToArray(incollection, cols)

In [None]:
#6. www
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_www_header.csv")
schema, cols = HeaderToSchema(header)
www = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_www.csv",schema = schema)

# convert to array
www = convertStrToArray(www, cols)

In [None]:
#7. phdthesis
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_phdthesis_header.csv")
schema, cols = HeaderToSchema(header)
phdthesis = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_phdthesis.csv",schema = schema)

# convert to array
phdthesis = convertStrToArray(phdthesis, cols)

In [None]:
#8. mastersthesis
header = spark.read.option("header", True).option("delimiter", ";").csv("../neo4j/output_mastersthesis_header.csv")
schema, cols = HeaderToSchema(header)
mastersthesis = spark.read.option("header", False).option("delimiter", ";").csv("../neo4j/output_mastersthesis.csv",schema = schema)

# convert to array
mastersthesis = convertStrToArray(mastersthesis, cols)

# 3. Update Commands

#### Deletion of rows containing null value


In [None]:
mastersthesis.na.drop(subset=["ee"]) \
    .select("mastersthesis", "author", "ee", "school", "title")\
    .show()

#### Deleting proceedings without editors

In [None]:
proceedings.withColumn("number_of_editors",size(col("editor"))) \
.select(col("editor"), col("number_of_editors")) \
.show()

#### Remove a specific author from the author array of the articles dataframe

In [None]:
articles = articles.withColumn("author", when(array_contains(articles.author, "Marina L. Gavrilova"),
                                              array_remove(articles.author, "Marina L. Gavrilova"))
                                        .otherwise(articles.author))

#### Remove an article

In [None]:
articles = articles.filter(articles.article != 10)


#### Create a dataframe containing the authors of articles

In [None]:
articles.select(explode(articles.author).alias("singleAuthor"), articles.key)

# 4. Queries

#### Books and articles written by the same author

In [None]:
# Selection of authors which have written both books and articles
books.alias("books").join(articles.alias("articles"),
col("books.author-orcid") == col("articles.author-orcid"), "inner") \
.select("books.author", "books.title", "articles.title") \
.show(n=5, truncate=False, vertical=True)

#### Most recent articles containing "Analysis"

In [None]:
# Selection of articles which contained the word "Analysis", modified after 2020
articles.filter((col("title").like("%Analysis%")) &
(to_date(col("mdate"), "dd/MM/yyyy") >= lit("2020-01-01"))) \
.select("article", "title", "mdate") \
.limit(5) \
.show(truncate = False, vertical = True)

#### Authors of incollection and articles

In [None]:
#  Selection of authors which have written incollection in 2020
authors = incollection.filter((to_date(col("mdate"), "dd/MM/yyyy") >=
        lit("2020-01-01")) & (to_date(col("mdate"), "dd/MM/yyyy") <=
        lit("2021-01-01"))) \
    .groupBy("author-orcid") \
    .count() \
    .select(explode(col('author-orcid'))) \
    .collect()

# Transform GroupData into a List
authors_orcid_list = []
for row in authors :
    if row['col'] != None :
        authors_orcid_list.append(row['col'])

#Deletion of null value in the incollection DataFrame
incollection.na.drop(subset=["author-orcid"]) \
    .select("author-orcid", "author")\
    .show()

# WHERE, IN, NESTED QUERY
articles.filter((to_date(col("mdate"), "dd/MM/yyyy") >= lit("2020-01-01"))
        & (to_date(col("mdate"), "dd/MM/yyyy") <= lit("2021-01-01"))) \
    .select(col('author'), explode(col('author-orcid')), col('title')) \
    .withColumnRenamed('col', 'orcid') \
    .filter(col('orcid').isin(authors_orcid_list)) \
    .show()

#### Number of inproceedings related to each publisher

In [None]:
inproceedings.join(proceedings,
    inproceedings.booktitle == proceedings.booktitle, "inner") \
    .select(inproceedings.title,
        explode(proceedings.publisher).alias("singlePublisher")) \
    .groupby("singlePublisher") \
    .count() \
    .withColumnRenamed("count","numberOfInproceedings") \
    .sort(col("numberOfInproceedings").desc()) \
    .show(truncate=False)


#### Number of articles written by a specific author over the years

In [None]:
articles.filter(array_contains(articles.author, "Joaquim Filipe")) \
.groupby(articles.year) \
.count() \
.sort("year") \
.show()

#### Return number of inproceedings written by each author that has written more than one of them

In [None]:
inproceedings.select(explode(inproceedings.author).alias("singleAuthor")) \
     .groupby("singleAuthor") \
     .count() \
     .withColumnRenamed("count","writtenInproceedings") \
     .filter("count > 1") \
     .sort(col("writtenInproceedings").desc()) \
     .show()


#### Return the number of articles whose title starts with a specific letter, written by certain authors

In [None]:
articles.filter(col('title').startswith('A')) \
.filter(array_contains(articles.author, 'Ivan Yotov') |
                     array_contains(articles.author, 'Paola Bonizzoni'))\
.groupby(col('author')) \
.count() \
.withColumnRenamed('count', 'number_of_articles') \
.sort(col('number_of_articles').desc()) \
.show(truncate= False)

In [None]:
articles.filter(array_contains(articles.author, 'Ivan Yotov') |
                     array_contains(articles.author, 'Paola Bonizzoni'))\
.select(col('title'), col('author')) \
.show()

#### Article of a specific journal having the maximum number of authors

In [None]:
articles_exploded = articles.filter(col('journal') == "Sci. Eng. Ethics") \
.select(col('article'), col('title'), explode(col('author')))

articles_exploded.groupby(col('article'), col('title')) \
.count() \
.sort(col('count').desc()) \
.limit(1).show()

#### Proceedings edited by a specific person containing more than 50 inproceedings

In [None]:
proceedings.join(inproceedings, array_contains(inproceedings.crossref,
    proceedings.key)) \
.filter(array_contains(proceedings.editor, "Joaquim Filipe")) \
.select(proceedings.key, proceedings.proceedings) \
.groupby(proceedings.key).count() \
.where("count > 50") \
.show()

#### Return the list of inproceedings written by authors that have a website and have written more than one inproceedings but no articles

In [None]:
art = articles.select(explode(articles.author)
                    .alias("singleAuthor"), articles.key)

inp = inproceedings.select(explode(inproceedings.author)
                        .alias("singleAuthor"), inproceedings.key)

wwwSingleAuthors = www.select(explode(www.author)
                        .alias("singleAuthor"), www.key, www.mdate)

wwwSingleAuthors.filter(year(to_date(col("mdate"), "dd/MM/yyyy")) >= "2010") \
                .join(inp, wwwSingleAuthors.singleAuthor == inp.singleAuthor,
                    "inner") \
                .drop(inp.singleAuthor) \
                .groupby(wwwSingleAuthors.singleAuthor) \
                .agg(
                    array_join(
                        collect_list(inp.key),
                        delimiter=',',
                    ).alias("key")
                ) \
                .join(art, wwwSingleAuthors.singleAuthor == art.singleAuthor,
                    "leftanti") \
                .select(wwwSingleAuthors.singleAuthor, split("key", ",")
                    .alias("key")) \
                .filter(size("key") > 1) \
                .show(n=20)