In [None]:
import os
from pyspark.sql import SparkSession

In [None]:
# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("Project3") \
      .getOrCreate()

In [None]:
# Path of the directory with all the csv files
path = "DATASETH_PATH"
path_of_the_directory = str(path)

dataset = {}

for filename in os.listdir(path_of_the_directory):
    f = os.path.join(path_of_the_directory,filename)
    if os.path.isfile(f):

        # Load a DataFrame for each csv file
        df = spark.read.option("header", True).option("delimiter", "|") \
            .option("inferSchema",True).csv(f)
        
        # Create a dictionary where key=filename and value=dataframe
        dataset[filename.split(".")[0]] = df    #split is used to get only the filename without extension
        

In [None]:
for key,df in dataset.items():
    print("NAME OF THE KEY: " + str(key))
    df.printSchema()
    df.show(2)
    print("/------------------------------------------------------------/\n")

<h2>Queries</h2>

In [None]:
# import functions
from pyspark.sql.functions import *

<h3>Query 1</h3>
<ul>
Compute the average age of the publication's authors with the following limitations:
<li>Exclude the authors born in December(12) from the computation.</li>
<li>Exclude the publications with authors (not born in December) whose sum of age is out of the range [275,500].</li>
</ul>


In [None]:
dataset["write_relationship"].join(dataset["publications"],dataset["write_relationship"].pub_id == dataset["publications"].id,"inner") \
.drop("pub_id","author_order","isbn","year","pages","publisher","doc_type") \
.join(dataset["authors"],dataset["write_relationship"].author_name == dataset["authors"].author_name,"inner") \
.filter(dataset["authors"].month_of_birth != 12) \
.withColumn("age", year(current_date()) - dataset["authors"].year_of_birth) \
.groupBy("id", "title") \
.agg(avg("age").alias("average_age"), sum("age").alias("sum_of_age")) \
.filter((col("sum_of_age") > 275) & (col("sum_of_age") < 500)) \
.sort(col("sum_of_age").asc(), col("average_age").asc()) \
.show(5, truncate=False)

<h3>Query 2</h3>
<p>Count the number of publications that contain in the title the word "machine".</p>

In [None]:
dataset["publications"].filter(col("title").contains("machine")) \
.groupBy() \
.agg(count("id").alias("Number Of Publications"),) \
.show(truncate=False)

<h3>Query 3</h3>
<p>Authors ordered by number of written publications.</p>

In [None]:
dataset["authors"].join(dataset["write_relationship"],["author_name"],"left") \
.groupBy("author_name") \
.agg(count("pub_id").alias("Number Of Publications")) \
.sort(col("Number Of Publications").desc()) \
.show(5)

<h3>Query 4</h3>
<p>Top-10 books ordered by number of keywords.</p>

In [None]:
df_group_count = dataset["publications"].filter(dataset["publications"].doc_type == "book") \
.join(dataset["keywords"], dataset["publications"].id == dataset["keywords"].pubID, "inner") \
.groupBy("id", "title") \
.agg(count("keyword").alias("Number of keywords")) \
.sort(col("Number of keywords").desc()) \
.limit(10) \
.show()

<h3>Query 5</h3>
<p>The number of publications written by authors with a Polimi email grouped by year, starting from 2010.</p>

In [None]:
# Collect into an array all the authors with a Polimi email
polimi_authors = dataset["authors"].filter(col("mail").rlike("polimi")) \
    .select(collect_set("author_name")) \
    .collect()[0][0]
 
dataset["publications"].join(dataset["write_relationship"], dataset["publications"].id == dataset["write_relationship"].pub_id) \
.filter(col("author_name").isin(polimi_authors) & (col("year") >= "2010")) \
.groupBy("year") \
.agg(countDistinct("title").alias("Number of publications")) \
.sort(col("Number of publications").desc()) \
.show()

<h3>Query 6</h3>
<p>Find the top-10 countries ordered by the average number of pages written in the 90'. </br>
Countries with less than 10 written publications are excluded.</p>

In [None]:
dataset['publications'].join(dataset['write_relationship'], dataset['publications'].id == dataset['write_relationship'].pub_id, 'inner') \
.drop('author_order', 'isbn', 'publisher', 'doc_type') \
.join(dataset['authors'], dataset['write_relationship'].author_name == dataset['authors'].author_name, 'inner') \
.drop('orcid', 'month_of_birth', 'year_of_birth', 'mail') \
.join(dataset['work_relationship'], dataset['authors'].author_name == dataset['work_relationship'].author_name, 'inner') \
.join(dataset['institutions'], dataset['work_relationship'].university == dataset['institutions'].institution, 'inner') \
.drop('world_rank', 'institution', 'national_rank') \
.filter( (dataset['publications'].year >= '1990') & (dataset['publications'].year < '2000') ) \
.groupBy('country') \
.agg(avg('pages').alias('average number of pages'), countDistinct('id').alias('different publications')) \
.filter( col('different publications') >= 10) \
.sort(col('average number of pages').desc()) \
.limit(10) \
.show()

<h3>Query 7</h3>
<p>Find 3 authors who have written a book when they were 50 years old.</p>

In [None]:
dataset['publications'].join(dataset['write_relationship'], dataset['publications'].id == dataset['write_relationship'].pub_id, 'left') \
.drop('title', 'id', 'pages', 'author_order', 'isbn', 'publisher') \
.join(dataset['authors'], ['author_name']) \
.drop('orcid', 'month_of_birth', 'mail', 'pub_id') \
.withColumn('years difference', (col('year') - col('year_of_birth'))) \
.filter((col('years difference') == 50) & (col('doc_type') == 'book')) \
.limit(3) \
.show()

<h3>Query 8</h3>
<p>PoliMi authors who have written at least 10 publications.</p>

In [None]:
authors_df = dataset["authors"].withColumnRenamed('author_name', 'name') \
.filter(col("mail").like("%polimi.it")) \
.join(dataset['write_relationship'], dataset['write_relationship'].author_name == col('name'), 'inner') \
.groupBy('name') \
.agg(count("pub_id").alias("num_publications_written")) \
.filter(col("num_publications_written") >= 10) \
.sort(col("num_publications_written").desc()) \
.show(10, truncate = False)

<h3>Query 9</h3>
<p>Find top 10 articles edited by youngest group of editors.</p>

In [None]:
editors_df = dataset["editor_authors"].withColumnRenamed('editor_name', 'name')
 
df_group_count = dataset["publications"] \
.filter(dataset["publications"].doc_type == "article") \
.join(dataset["editor_authors_relationship"], dataset["editor_authors_relationship"].doi == dataset["publications"].id, "left") \
.join(editors_df, editors_df.name == dataset["editor_authors_relationship"].editor_name, "left") \
.groupBy("doi") \
.agg(count("doi").alias("num_editors"), round(avg(year(current_date()) - col("year_of_birth")), 1).alias("avg_age"),) \
.sort(col("avg_age").asc_nulls_last()) \
.show(10, truncate = True)

<h3>Query 10</h3>
<p>Compound keywords (e.g. Agent-Based) associated to more than 2 publications.</p>

In [None]:
dataset['keywords'].filter(col("keyword").like("%-%")) \
.groupBy("keyword") \
.agg(count("keyword").alias("num of publications")) \
.filter(col("num of publications") > 2) \
.sort("num of publications") \
.show(truncate=False)

<h3>Query 11</h3>
<p>Find authors who have written 3 books and who are also editors of 3 publications.</p>

In [None]:
# Authors who have written 3 books
list_authors = \
    dataset['write_relationship'].join(dataset['publications'], dataset['write_relationship'].pub_id == dataset['publications'].id, "inner") \
    .filter(col("doc_type") == "book") \
    .groupBy("author_name").count() \
    .filter(col("count") == 3) \
    .select(collect_set("author_name")).collect()[0][0]
 
# Authors who have written 3 books and edited 3 publications
dataset['editor_authors_relationship'].groupBy("editor_name").count() \
.filter(col("count") == 3) \
.filter(col("editor_name").isin(list_authors)) \
.select(col("editor_name").alias("author/editor")) \
.show()

<h3>Query 12</h3>
<p>Authors under 40 of USA institutions who have written at least 5 publications.</p>

In [None]:
dataset['authors'].withColumn("age", year(current_date()) - dataset['authors'].year_of_birth) \
.filter(col("age") < 40) \
.join(dataset['work_relationship'], dataset['authors'].author_name == dataset['work_relationship'].author_name, "inner") \
.join(dataset['institutions'], dataset['work_relationship'].university == dataset['institutions'].institution, "inner") \
.filter(dataset['institutions'].country == "USA") \
.join(dataset['write_relationship'], dataset['work_relationship'].author_name == dataset['write_relationship'].author_name, "inner") \
.groupBy(dataset['write_relationship'].author_name, "university", "country", "age").count().withColumnRenamed("count", "num of publications") \
.filter(col("num of publications") >= 5) \
.sort(col("num of publications").desc()) \
.show(truncate=False)

<h2>Insert/update/delete</h2>

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

<h3>Insert a new author</h3>

In [None]:
schema = StructType([ \
    StructField("author_name", StringType(), True), \
    StructField("orcid", StringType(), True), \
    StructField("month_of_birth", IntegerType(), True), \
    StructField("year_of_birth", IntegerType(), True), \
    StructField("mail", StringType(), True) \
])

print("Before Add")
dataset["authors"].filter(col("orcid")=="3414-5303-4227-4420").show()

df_data= [("Name","3414-5303-4227-4420",3,1999,"assignment@mail.polimi.it")]
newRow = spark.createDataFrame(data = df_data, schema = schema)
dataset["authors"] = dataset["authors"].union(newRow)

print("After Add")
dataset["authors"].filter(col("orcid")=="3414-5303-4227-4420").show()

<h3>Update the year of birth of an author</h3>

In [None]:
# Previous row
print("Before the update\n")
dataset["authors"].filter(dataset["authors"].author_name == "Gilles Guette") \
                    .show()

# Updated row
print("\nAfter the update\n")
dataset["authors"].withColumn("year_of_birth", when(dataset["authors"].author_name == "Gilles Guette", "1999") \
.otherwise(col("year_of_birth"))) \
.filter(dataset["authors"].author_name == "Gilles Guette") \
.show()

<h3>Insert a new publication</h3>

In [None]:
schema = StructType([ \
    StructField("id", StringType(), True), \
    StructField("title", StringType(), True), \
    StructField("year", IntegerType(), True), \
    StructField("pages", IntegerType(), True), \
    StructField("isbn", StringType(), True), \
    StructField("doc_type", StringType(), True) \
])
 
df_data = [("1111-2222-3333-4444", "Publication title", 2022, 300, "abcdef", "book")]
new_pub = spark.createDataFrame(data = df_data, schema = schema)

print("Before insert")
dataset['publications'].filter(col("title") == "Publication title").show(truncate=False)

# Insert a new publication
dataset['publications'] = dataset['publications'].union(new_pub)

print("After insert")
dataset['publications'].filter(col("title") == "Publication title").show(truncate=False)

<h3>Update PoliMi emails from "@mail.polimi.it" to "@polimi.it"</h3>

In [None]:
print("Before update")
dataset["authors"].filter(col("mail").like("%polimi.it")).show(2)

dataset["authors"] = dataset["authors"] \
    .withColumn('mail', when(col("mail").like("%@mail.polimi.it"), concat(split(col("mail"), '@')[0], 
        lit('@polimi.it'))) \
    .otherwise(col('mail')))

print("After update")
dataset["authors"].filter(col("mail").like("%polimi.it")).show(2)

<h3>Remove a publication</h3>

In [None]:
print("Before the remove\n")
dataset["publications"].filter(dataset["publications"].id == "https://d-nb.info/960448683").show()
 
# Removing the row
dataset["publications"] = dataset["publications"].where(dataset["publications"].id != "https://d-nb.info/960448683")

print("\nAfter the remove\n")
dataset["publications"].filter(dataset["publications"].id == "https://d-nb.info/960448683").show()