In [None]:
# Downloading pyspark
#!pip install pyspark

PREPARE THE ENVIRONMENT, UPLOAD DATA, PREPROCESS DATA AND CREATE THE TABLES: Author, Paper, Affiliation, Book, Journal and Conference

In [None]:
# With sparkSession we create a connection to our database
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, xxhash64, collect_list, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType

# Creating an entry point to the PySpark Application
spark = SparkSession.builder \
    .master("local") \
    .appName("Bibliography") \
    .getOrCreate()

# Parameters
INPUT_FILE = "bibliography.json"
OPTIONS = {'multiline': 'true', 'allowNumericLeadingZero': 'true', 'timestampFormat': "yyyy-MM-dd'T'HH:mm:ss[.ZZZ'Z']"}

In [None]:
#AUTHOR DATAFRAME
# Schema of the Author DataFrame
schemaAut = StructType(
    [StructField('authors', ArrayType(StructType([
        StructField('_id', StringType(), True),
        StructField('name', StringType(), True),
        StructField('email', StringType(), True),
        StructField('bio', StringType(), True),
    ])), True)
     ])

# Importing data from the JSON file
df_aut = spark.read.format('json').options(**OPTIONS).schema(schemaAut).json(INPUT_FILE)

# Creating one row of the DataFrame for each author of the array field 'authors'
df_aut = df_aut.select(explode(df_aut.authors))
df_aut = df_aut.withColumnRenamed("col", "authors")
# Filtering out the authors who have an identifier set to null and projecting the DataFrame on the wanted columns
df_aut = df_aut.filter(col("authors._id") != "null").select("authors._id", "authors.name", "authors.email",
                                                            "authors.bio")
# Renaming the column containing the author identifier
df_aut = df_aut.withColumnRenamed("_id", "author_id")
# Dropping duplicate rows
df_aut = df_aut.dropDuplicates(["author_id"])

# Visualizing data
df_aut.printSchema()
df_aut.show()

In [None]:
# PAPER DATAFRAME WITHOUT PUBLICATION_ID
# Schema of the Paper DataFrame without the publication_id
schemaPaper = StructType(
    [StructField('_id', StringType(), True),
     StructField('title', StringType(), True),
     StructField('keywords', ArrayType(StringType()), True),
     StructField('fos', ArrayType(StringType()), True),
     StructField('references', ArrayType(StringType()), True),
     StructField('page_start', IntegerType(), True),
     StructField('page_end', IntegerType(), True),
     StructField('lang', StringType(), True),
     StructField('abstract', StringType(), True),
     StructField('publication_type', StringType(), True),
     StructField('date', TimestampType(), True),
     StructField('doi', StringType(), True),
     StructField('url', ArrayType(StringType()), True)
     ])

# Importing data from the JSON file
df_paper = spark.read.format('json').options(**OPTIONS).schema(schemaPaper).json(INPUT_FILE)
# Renaming the column containing the paper identifier
df_paper = df_paper.withColumnRenamed("_id", "paper_id")

# Visualizing data
df_paper.printSchema()
df_paper.show()

In [None]:
# AFFILIATION DATAFRAME
# Schema of the Affiliation DataFrame
schemaAffiliation = StructType(
    [StructField('_id', StringType(), True),
     StructField('authors', ArrayType(StructType([
         StructField('_id', StringType(), True),
         StructField('org', StringType(), True)
     ])), True),
     ])

# Importing data from the JSON file
df_aff = spark.read.format('json').options(**OPTIONS).schema(schemaAffiliation).json(INPUT_FILE)

# Renaming the column containing the paper identifier
df_aff = df_aff.withColumnRenamed("_id", "paper_id")

# Creating one row of the DataFrame for each author of the array field 'authors' for each paper
df_aff = df_aff.select("paper_id", explode(df_aff.authors))
df_aff = df_aff.withColumnRenamed("col", "authors")
# Filtering out the authors who have an identifier set to null and projecting the DataFrame on the wanted columns
df_aff = df_aff.filter(col("authors._id") != "null").filter(col("paper_id") != "null").select("paper_id", "authors._id",
                                                                                              "authors.org")
# Renaming the column containing the author identifier
df_aff = df_aff.withColumnRenamed("_id", "author_id")
# Dropping duplicate rows
df_aff = df_aff.dropDuplicates(["author_id", "paper_id"])
# Renaming the column containing the name of the organization
df_aff = df_aff.withColumnRenamed("org", "organization")

# Visualizing data
df_aff.printSchema()
df_aff.show()

In [None]:
# JOURNAL DATAFRAME
# Preprocessing of the journals for cleaning and merging them

# Schema of the Journal DataFrame
journal_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('issn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('volume', IntegerType(), True),
     StructField('issue', IntegerType(), True),
     StructField('publication_type', StringType(), True)])

# Importing data from the JSON file
df_journals_to_filter = spark.read.format('json').options(**OPTIONS).schema(journal_schema_preprocessing).json(
    INPUT_FILE)

# Filtering the DataFrame keeping only papers published in journals that have the identifying attributes set to a meaningful value
df_journals_to_filter = df_journals_to_filter.filter(col('publication_type') == 'Journal').filter(
    col('issn') != 'null').filter(col('venue') != 'null').filter(col('issue') >= 0).filter(col('volume') >= 0)
# Aggregating the rows using the fields which univocal identify the journals and collecting the 'publisher' attribute and the '_id' attribute in two lists
df_journals_to_filter = df_journals_to_filter.groupBy('venue', 'volume', 'issue', 'issn').agg(
    collect_list('publisher').alias('publishersArray'), collect_list('_id').alias('_id'))
# Keeping only one element 'publisher' for each row since the publisher was added randomly
df_journals_to_insert = df_journals_to_filter.withColumn('publisher',
                                                         df_journals_to_filter['publishersArray'][0]).select('venue',
                                                                                                             'volume',
                                                                                                             'issue',
                                                                                                             'publisher',
                                                                                                             'issn',
                                                                                                             '_id')
# Here we almost have the final Journal DataFrame. For a detailer explanation refer to chapter of the report

# Adding the new column which contains the publication_id created using the function xxhash64
df_journals = df_journals_to_insert.withColumn("publication_id", xxhash64('venue', 'volume', 'issue', 'issn'))

# Adding the "foreign key" publication_id of the publication to the linked papers

# Exploding the Journal DataFrame obtaining one row for each paper published in a journal
exploded_journals = df_journals.select(explode('_id'), 'publication_id')

# Joining the exploded DataFrame and the Paper DataFrame in order to add to each paper the identifier of the publication on which it is published
df_papers_in_journals = exploded_journals.join(df_paper, exploded_journals.col == df_paper.paper_id, "inner")
df_papers_in_journals = df_papers_in_journals.drop('col')
# Dropping the identifier of the papers used only for creating the relationship between Journal and Paper
df_journals = df_journals.drop(df_journals._id)

# Visualizing data
#print('Papers')
#df_papers_in_journals.show(truncate = False)
print('Schema of the journals')
df_journals.printSchema()
print('Journals')
df_journals.show(truncate=False)

In [None]:
# BOOK DATAFRAME
# Preprocessing of the books for cleaning and merging them

# Schema of the Book DataFrame
book_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('isbn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(), True)])

# Importing data from the JSON file
df_books_to_filter = spark.read.format('json').options(**OPTIONS).schema(book_schema_preprocessing).json(INPUT_FILE)

# Filtering the DataFrame keeping only papers published in books that have the identifying attributes set to a meaningful value
df_books_to_filter = df_books_to_filter.filter(col('publication_type') == 'Book').filter(col('isbn') != 'null').filter(
    col('venue') != 'null')
# Aggregating the rows using the fields which univocal identify the books and collecting the 'publisher' attribute and the '_id' attribute in two lists
df_books_to_filter = df_books_to_filter.groupBy('isbn', 'venue').agg(collect_list('publisher').alias('publishersArray'),
                                                                     collect_list('_id').alias('_id'))
# Keeping only one element 'publisher' for each row since the publisher was added randomly
df_books_to_insert = df_books_to_filter.withColumn('publisher', df_books_to_filter['publishersArray'][0]).select(
    'venue',
    'isbn',
    'publisher',
    '_id')
# Here we almost have the final Book DataFrame. For a detailer explanation refer to chapter 13 of the report

# Adding the new column which contains the publication_id created using the function xxhash64
df_books = df_books_to_insert.withColumn('publication_id', xxhash64('isbn', 'venue'))

# Adding the "foreign key" publication_id of the publication to the linked papers

# Exploding the Book DataFrame obtaining one row for each paper published in a book
exploded_books = df_books.select(explode('_id'), 'publication_id')

# Joining the exploded DataFrame and the Paper DataFrame in order to add to each paper the identifier of the publication on which it is published
df_papers_in_books = exploded_books.join(df_paper, exploded_books.col == df_paper.paper_id)
df_papers_in_books = df_papers_in_books.drop('col')
# Dropping the identifier of the papers used only for creating the relationship between Book and Paper
df_books = df_books.drop(df_books._id)

# Visualizing the data
# print('Papers')
# df_papers_in_books.show(truncate = False)
print('Schema of the books')
df_books.printSchema()
print('Books')
df_books.show(truncate=40)

In [None]:
# CONFERENCE DATAFRAME
# Preprocessing of the conferences for cleaning and merging them

# Schema of the Conference DataFrame
schemaConf = StructType(
    [StructField('_id', StringType(), True),
     StructField('location', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(), True)])

# Importing data from the JSON file
df_conferences_to_filter = spark.read.format('json').options(**OPTIONS).schema(schemaConf).json(INPUT_FILE)

# Filtering the DataFrame keeping only papers presented conferences that have the identifying attributes set to a meaningful value
df_conferences_to_filter = df_conferences_to_filter \
    .filter(col('publication_type') == 'Conference') \
    .filter(col('venue') != 'null')
# Aggregating the rows using the fields which univocal identify the conferences and collecting the 'location' attribute and the '_id' attribute in two lists
df_conferences_to_filter = df_conferences_to_filter \
    .groupBy('venue') \
    .agg(collect_list('location').alias('locations_array'), collect_list('_id').alias('_id'))
# Keeping only one element 'location' for each row since the location was added randomly
df_conferences_to_insert = df_conferences_to_filter \
    .withColumn('location', df_conferences_to_filter['locations_array'][0]) \
    .select('venue', 'location', '_id')
# Here we almost have the final Conference DataFrame. For a detailer explanation refer to chapter 13 of the report

# Adding the new column which contains the publication_id created using the function xxhash64
df_conferences = df_conferences_to_insert.withColumn('publication_id', xxhash64('venue'))

# Adding the "foreign key" publication_id of the publication to the linked papers

# Exploding the Conference DataFrame obtaining one row for each paper presented in a conference
exploded_conferences = df_conferences.select(explode('_id'), 'publication_id')

# Joining the exploded DataFrame and the Paper DataFrame in order to add to each paper the identifier of the publication on which it is published
df_papers_in_conferences = exploded_conferences.join(df_paper, exploded_conferences.col == df_paper.paper_id)
df_papers_in_conferences = df_papers_in_conferences.drop('col')
# Dropping the identifier of the papers used only for creating the relationship between Conference and Paper
df_conferences = df_conferences.drop(df_conferences._id)

# Visualizing the data
#print('Papers')
#df_papers_in_conferences.show(truncate = False)
print('Schema of the conferences')
df_conferences.printSchema()
print('Conferences')
df_conferences.show(truncate=50)

In [None]:
# Merging the 3 dataframe, each one contains the papers published in a specific media
df_papers = df_papers_in_books \
    .union(df_papers_in_journals) \
    .union(df_papers_in_conferences)

# Visualizing the data
print('Papers schema')
df_papers.printSchema()
print('Papers data')
df_papers.show(truncate=False)

In [None]:
# Performing some queries for checking the result
print('Papers published in books')
df_papers \
    .filter(col('publication_type') == 'Book') \
    .select('paper_id', 'title', 'publication_type', 'publication_id') \
    .show()
print('Papers published in conferences')
df_papers \
    .filter(col('publication_type') == 'Conference') \
    .select('paper_id', 'title', 'publication_type', 'publication_id') \
    .show()

print('Papers published in journals')
df_papers \
    .filter(col('publication_type') == 'Journal') \
    .select('paper_id', 'title', 'publication_type', 'publication_id') \
    .show(truncate=False)

COMMANDS

In [None]:
# Command 1: Add a new row to the Paper DataFrame
from pyspark.sql.functions import lit

new_paper_file = 'single_paper.json'

# loading the paper with the Paper-schema define before
new_paper = spark.read.options(**OPTIONS).json(new_paper_file, schemaPaper) \
    .withColumnRenamed("_id", "paper_id")

# defining journal schema
journal_schema = StructType(
    [StructField('issn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('volume', IntegerType(), True),
     StructField('issue', IntegerType(), True)])

# loading journal
journal = spark.read.options(**OPTIONS).json(new_paper_file, journal_schema) \
    .withColumn("publication_id", xxhash64('venue', 'volume', 'issue', 'issn'))

# extracting the journal id
# head returns a sequence/list of Row objects. Since the DF contain only this row
# a list of only one row will be returned and we extract it with [0].
# we transform the Row to a dictionary and we extract the 'publication_id' column
foreign_key = journal.head(1)[0].asDict()['publication_id']

# adding the journal to the journals dataframe if not already present
if df_journals.filter(col('publication_id') == foreign_key).count() == 0:
    df_jounrals = df_journals.union(journal)

# extracting the paper ID
paper_id = new_paper.head(1)[0].asDict()['paper_id']

# Inserting foreign_key at position 1 (0 for programmers) of the DF
# * select all the columns of the DF
new_paper = new_paper.select(lit(foreign_key).alias('publication_id'), '*')

# checking if the paper already exists before adding
if df_papers.filter(col('paper_id') == paper_id).collect() == []:
    df_papers = df_papers.union(new_paper)

# checking the paper exists in the DF
df_papers.filter(col('paper_id') == paper_id).show(vertical=True)

In [None]:
# Command 2: update one single row of a dataframe (similar for multiple rows)
# The command modifies the DOI and URL of a paper with identifier equal to '53e997e4b7602d9701fdb48a'

from pyspark.sql.functions import lit, array

# For performing the required update operation, we firstly filter the dataframe keeping only the row to be modified. This is done filtering for a specific value 'paper_id'
updated_df_papers = df_papers \
    .filter(col('paper_id') == '53e997e4b7602d9701fdb48a')
# We add a bunch of columns, each one containing a single value we want to insert, with the name new_fieldName
updated_df_papers = updated_df_papers \
    .withColumn('new_doi', lit('10.1007/11944577_37')) \
    .withColumn('new_url', array([lit('https://link.springer.com/chapter/10.1007/11944577_37')]))
# Then we drop the old columns containing the previous values, and we rename the new columns with the name of the old ones
updated_df_papers = updated_df_papers \
    .drop(col('doi')).drop(col('url')) \
    .withColumnRenamed('new_doi', 'doi') \
    .withColumnRenamed('new_url', 'url')

# We make the union between the entire dataframe, without the row we want to modify, and the new entry, using the command union.
updated_df_papers = df_papers.filter(col('paper_id') != '53e997e4b7602d9701fdb48a').union(updated_df_papers)

# To check the number of the paper is the same as before
print('The size of the entire initial database is ' + str(
    df_papers.count()) + ', the size of the current database is ' + str(updated_df_papers.count()))
updated_df_papers.filter(col('paper_id') == '53e997e4b7602d9701fdb48a').select('paper_id', 'title', 'doi', 'url').show(
    truncate=55)

In [None]:
# Command 3: remove an entire column

# We drop the column `lang` from the dataframe `df_papers`, and we assign the result to the variable `df_papers_without_lang`
df_papers_without_lang = df_papers \
    .drop('lang')

# We print the schema of the dataframe `df_papers_without_lang` to see if there are differences regarding `df_papers` dataframe.
df_papers_without_lang.printSchema()
# We print the first row of the `df_papers_without_lang` dataframe to visualize an example of new data in the database
df_papers_without_lang.show(1)

In [None]:
# Command 4: delete a group of rows
from pyspark.sql.functions import year

# Filter the data frame of the papers maintaining only the ones that have been published after 1951 and save the new version in the dataframe
df_papers = df_papers \
    .filter(year('date') > '1950')

# Show the papers, with some associated attributes, ordered by date
df_papers.select('title', 'publication_type', 'date') \
    .orderBy('date') \
    .show(truncate=False)

In [None]:
# Command 5: create a new column with the length of the paper (number of total pages)

# Create a new dataframe starting from 'df_papers' by keeping only papers that have consistent values in 'page_start' and 'page_end'
df_papers_total_pages = df_papers \
    .filter((col('page_start') >= 0) & (col('page_end') >= 0) & (col('page_start') <= col('page_end')))

# Create an additional column 'total_pages' computed as the difference between 'pages_end' and 'page_start'
df_papers_total_pages = df_papers_total_pages.withColumn('total_pages', col('page_end') - col('page_start'))

# Only some fields are displayed just for better reading
df_papers_total_pages \
    .select(col('title'), col('page_start'), col('page_end'), col('total_pages')) \
    .show(5, truncate=False)

QUERIES

In [None]:
# Query 1: WHERE, JOIN
# Retrieve all papers published on a specific issue and volume of a Journal

# setting the values to perform the query with
venue, volume, issue = ('BMC Bioinformatics', '14', '1')

# We first filter by the values asssigned to the variable defined here
# Then we join the DFs on publication_id col, imposing the condition on 'publication_type' value
# the join will be done only when the two condition match
df_papers_q1 = df_journals \
    .filter((col('venue') == venue) &
            (col('volume') == volume) &
            (col('issue') == issue)) \
    .join(df_papers,
          (df_journals['publication_id'] == df_papers['publication_id']) &
          (df_papers['publication_type'] == 'Journal'))

df_papers_q1 \
    .select(['paper_id', 'title']) \
    .show(truncate=60)

In [None]:
# Query 2: WHERE, LIMIT, LIKE
# The query returns the papers written in the last twenty years that have 'artificial' as substring of one of their keywords. We require that these papers have the DOI set to a not null value.
# The results are ordered ascending by the date and only 15 elements are printed.

from pyspark.sql.functions import current_timestamp, unix_timestamp

df = df_papers.withColumn('current time', current_timestamp())  # Adds a new column containing the current timestamp
df \
    .filter((((unix_timestamp('current time') - unix_timestamp(
    'date')) / 3600 / 24 / 365) < 20) &  # The filter keeps only the papers whose attribute 'doi' is not null and written in the last 20 years
            (col('doi').isNotNull())) \
    .select('paper_id',
            # The select function keeps only the columns 'paper_id', 'title', 'date' and 'keyword' which is a single element contained in 'keywords', expanded by the function explode
            'title',
            'date',
            explode('keywords').alias('keyword')) \
    .filter(col('keyword').like('%artificial%')) \
    .distinct() \
    .select('title',
            'date',
            'keyword') \
    .sort(col('date').asc()) \
    .limit(15) \
    .show(truncate=50)
# After the first select, we keep only the rows of the dataframe that have 'artificial' as a substring of the attribute 'keyword'. Each row now is the couple composed by one paper and one of its keywords
# Then the duplicates are eliminated.
# Finally, we select the attribute we are interested in, we sort the rows with respect to the publication date and the output is limited to 15 rows

In [None]:
# Query 3: WHERE, IN, Nested Query
# Show the papers collected in a book that have `multiagent system` as keyword.

# We query the `df_papers` dataframe, and we assign the result to `nested_query` variable.
# We filter the dataframe searching for the rows that have `keyword` value equal to `multiagent system` string.
# In particular, the function `isin` requires in input a list of strings, but if the list has dimension one, then, it is equivalent in inserting a string instead of a list.
nested_query = df_papers \
    .select('title',  # We select only the columns `title`, `publication_type`, `paper_id`, `date`, and `keyword`.
            'publication_type',
            'paper_id',
            'publication_id',
            'date',
            # We explode the `keywords` column because it is a set, so we can create copies of the selected row with each of them a single string from the set assigned to the new column `keyword`
            explode('keywords').alias('keyword')) \
    .filter(col('keyword').isin('multiagent system')) \
    .drop('keyword')

# We query the `df_conferences` dataframe performing a `join` operation over the column `publication_id` with the dataframe `nested_query`.
# When inside the `join` function we insert only the name of the column, then the function searches in both the dataframe to join if there is a header equal to the input string.
# Then, we filter the joint dataframe selecting only the rows that have `publication_type` value equal to `Conference` because we are interested in papers that were published in conferences.
# We order by `date` the dataframe, and we select only the `title` and `venue` columns to show.
# Finally, we show the first five rows of the dataframe, and we format the visualization of the result thanks to the `truncate` parameter.
df_conferences \
    .join(nested_query, 'publication_id') \
    .filter(col('publication_type') == 'Conference') \
    .orderBy('date', ascending=False) \
    .select(col('title').alias('paper title'),  # `alias` function renames a column with the string in input.
            col('venue').alias('conference venue')) \
    .show(5, truncate=50)

In [None]:
# Query 4: GROUP BY, JOIN, AS
# Retrieve the most prolific organizations regarding the conferences

from pyspark.sql.functions import collect_set, size

# We first perform a join between the dataframe of the affiliations and the dataframe of the papers base on the 'paper_id'
# Then we drop one of the 'paper_id' columns present in the joined table, because in the select we need to extract the 'paper_id' so we have to eliminate ambiguity
# Afterwards we filter the created dataframe, getting only elements that have an 'organization' that is not null or an empty string and elements that have as publication_type the type 'Conference'
# Finally we group by the 'organization' and create for each organization a set that contains all the associated papers, identified by their id, then we filter the ones that have the set size > 10

df_aff \
    .join(df_papers, df_papers.paper_id == df_aff.paper_id, 'inner') \
    .drop(df_papers.paper_id) \
    .select('paper_id',
            'organization',
            'publication_type') \
    .filter((col('organization').isNotNull()) &
            (col('organization') != "") &
            (col('publication_type') == "Conference")) \
    .groupBy('organization') \
    .agg(collect_set('paper_id').alias('papers')) \
    .filter(size(col('papers')) > 10) \
    .show(truncate=50)

In [None]:
# Query 5: WHERE, GROUP BY
# Retrieve some statistics about papers
# In this query we use the DataFrame obtained by running the command 5.

from pyspark.sql.functions import sum, min, max, avg, format_number, variance

# Retrieve some statistics about papers published from the year 2015 on.
# We group the articles by publication year using the year() function to extract the year from the date.
# Then some aggregate functions are performed to obtain insights like:
# the total number of papers published using count(),
# the total number of pages written using sum(),
# the minimum and the maximum number of pages in an article using min() and max(),
# the mean of pages written per article and the variance using avg() and variance().
# To round the fractional numbers we use format_number().
# Years are shown in decreasing order.
df_papers_total_pages \
    .filter(year(col('date')) >= 2015) \
    .groupBy(year(col('date')).alias('year')) \
    .agg(count('paper_id').alias('total_papers'),
         sum('total_pages').alias('total_pages'),
         min('total_pages').alias('min_pages'),
         max('total_pages').alias('max_pages'),
         format_number(avg('total_pages'), 2).alias('avg_pages'),
         format_number(variance('total_pages'), 2).alias('var_pages')) \
    .sort(col('year').desc()) \
    .show()

In [None]:
# Query 6: GROUP BY, HAVING, AS
# Papers that are referenced the most and have at least 30 references

# Firsly we explode the array field to have alll the references in rows
# then we group by the reference so thaw we can perform a count operation on the papers
# we filter by the value of the aggregation operation count
# finally we do a join of the transformed DF with the original one so that we can
# extract some meaningful information on the papers

df_papers \
    .select('paper_id',
            'title',
            explode(col('references')).alias('reference')) \
    .groupBy('reference') \
    .agg(count('paper_id').alias('references_count')) \
    .filter(col('references_count') > 30) \
    .join(df_papers, col('reference') == df_papers.paper_id) \
    .sort(col('references_count').desc()) \
    .select(['title', 'references_count']) \
    .show(truncate=50)

In [None]:
# Query 7: WHERE, GROUP BY, HAVING, AS
# The query returns the association between fields of study and keywords which are more present in the papers written after the year 2000 and how many times they appear together.

#Explanation of the query
# The first filter function keeps the papers that verify the following conditions: not null 'doi', year of publication bigger than 2000, at least one fos and one keyword
# Then we explode 'keywords' and 'fos' arrays obtaining all the couples fos-keyword which appear together inside some paper
# We group by the couples fos-keyword, and we count how many times the couple appears
# We filter keeping only the fields of study and keywords coupled more than 100 times inside the database
# Finally, we order the DataFrame by the number of occurrences of the couple ('couple count') and we limit the printed results

from pyspark.sql.functions import year, col, size

df = df_papers \
    .filter(
    (col('doi').isNotNull()) &
    (year(col('date')) >= 2000) &
    (size(col('fos')) > 0) &
    (size(col('keywords')) > 0)) \
    .select('fos', explode('keywords').alias('keyword')) \
    .select('keyword', explode('fos').alias('fos')) \
    .groupby('fos', 'keyword') \
    .count() \
    .withColumnRenamed('count', 'couple count') \
    .filter(col('couple count') > 100) \
    .sort(col('couple count').desc()) \
    .limit(15) \
    .show(truncate=False)

In [None]:
# Query 8: WHERE, Nested Query (i.e., 2-step Queries), GROUP BY
# Retrieve the organizations associated with an author name for each field of study

from pyspark.sql.functions import collect_set

author_name = 'Hao Wang'  # We are searching for all the authors named 'Hao Wang.'

# We query the `df_aut` dataframe by filtering out the authors that have name equal to the one defined by the `author_name` parameter.
# From the result, we select only the column `author_id,` we collect all the values in a list, and we assign it to the `sub_nested_query` variable.
sub_nested_query = df_aut \
    .filter(col('name') == author_name) \
    .select('author_id') \
    .rdd.flatMap(lambda x: x) \
    .collect()

# We query the `df_aff` dataframe by filtering the rows that have `author_id` value inside the list `sub_nested_query.`
# We also filter the dataframe from all the rows that have `organization` value equal to the string 'null.'
# The result is assigned to the `nested_query` variable.
nested_query = df_aff \
    .filter(col('author_id').isin(sub_nested_query)) \
    .filter(col('organization') != 'null')

# Finally, we query the `df_papers` dataframe by firstly joining it over `paper_id` with the `nested_query` dataframe.
# We explode the `fos` column because we want to extract its values to query them one by one, and then we assign its values to the column `field_of_study`.
# We select from the joint dataframe the columns `paper_id`, `organization`, and `field_of_study`.
# After selecting columns from the joint dataframe, we group the rows by `field_of_study`, and we aggregate the values `organization` into a set by assigning it to the column named `organization`.
# Finally, we alphabetically order the column `field_of_study`, and we show the first fourteen rows of the result.
df_papers \
    .join(nested_query, 'paper_id') \
    .select('paper_id',
            'organization',
            explode('fos').alias('field_of_study')) \
    .groupBy('field_of_study') \
    .agg(collect_set('organization').alias('organization')) \
    .orderBy('field_of_study') \
    .show(14, truncate=80)

In [None]:
# Query 9: WHERE, GROUP BY, HAVING, 1 JOIN
# Retrieve the most prolific publishers

from pyspark.sql.functions import collect_set, concat, size

# We select from the dataframe that contains the journals the elements which have at least 10 volumes
# Then we join the journals dataframe and the books dataframe on the 'publisher' and drop one of the created columns 'publisher' to avoid duplicates
# In the query we also renamed the columns that represent the venue of the Book and of the Journal, to have more clarity in the result and to avoid ambiguity on the columns
# We select the relative venues and the publisher and drop duplicated elements, if there are, then we group by the 'publisher' and collect in two different sets the venueBooks and the venueJournals
# Finally we concatenate the elements of the two sets creating a new column, and we filter on this final sets, keeping only the 'publishers' that have more than '500' associated publications
# So we show the most prolific publishers truncating the show at '50' so the column representation is limited

df_journals \
    .withColumnRenamed('venue', 'venueJournals') \
    .filter((col('volume')) > 10) \
    .join(df_books, df_books.publisher == df_journals.publisher, 'inner') \
    .drop(df_journals.publisher) \
    .withColumnRenamed('venue', 'venueBooks') \
    .select('venueBooks',
            'venueJournals',
            'publisher') \
    .dropDuplicates(['venueBooks',
                     'venueJournals',
                     'publisher']) \
    .groupBy('publisher') \
    .agg(collect_set('venueBooks').alias('books'),
         collect_set('venueJournals').alias('journals')) \
    .withColumn('total_publications_per_publisher', concat('books', 'journals')) \
    .filter(size('total_publications_per_publisher') > '500') \
    .select('publisher',
            'total_publications_per_publisher') \
    .show(truncate=50)

In [None]:
# Query 10: WHERE, GROUP BY, HAVING, 2 JOINs
# Retrieve authors who worked for at least 3 different organizations and have published at least 3 papers with at least 5 fos and 5 references each

from pyspark.sql.functions import approx_count_distinct

# Firstly, filter to keep only papers with at least 5 fos and 5 references.
# Then join the paper table with the affiliation dataframe on 'paper_id' and then join it with the author dataframe on 'author_id', dropping the columns with ambiguous name.
# Group on author_id to get all the aggregate information for each author.
# To count the different organizations an author has worked for we use approx_count_distinct().
# Filter again keeping authors that have worked for at least 3 organizations and published at least 3 papers.
# For consistency, we check that only one name is associated with the grouped 'author_id'.
# In the select part we explode 'name' field to obtain a single string instead of an array.
# The result is decreasingly ordered by the number of papers and then the number of organizations.

df_papers \
    .filter((size(col('fos')) >= 5) &
            (size(col('references')) >= 5)) \
    .join(df_aff, df_papers.paper_id == df_aff.paper_id, 'inner') \
    .drop(df_aff.paper_id) \
    .join(df_aut, df_aff.author_id == df_aut.author_id, 'inner') \
    .drop(df_aff.author_id) \
    .groupBy('author_id') \
    .agg(count('paper_id').alias('papers_count'),
         approx_count_distinct('organization').alias('organizations_count'),
         collect_set('name').alias('name')) \
    .filter((size('name') == 1) &
            (col('papers_count') >= 3) &
            (col('organizations_count') >= 3)) \
    .orderBy(col('papers_count').desc(),
             col('organizations_count').desc()) \
    .select(explode('name').alias('name'),
            'papers_count',
            'organizations_count') \
    .show(5)