In [None]:
# Dowloading pyspark
#!pip install pyspark

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

PREPARE THE ENVIRONMENT, UPLOAD DATA, PREPROCESS DATA AND CREATE THE TABLES: Author, Paper, Affiliation, Book, Journal and Conference

In [None]:
# With sparkSession we create a connection to our database
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType
from pyspark.sql.functions import count, col, xxhash64, collect_list, explode

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("Bibliography") \
      .getOrCreate()

#INPUT_FILE = "/content/drive/MyDrive/bibliography.json"
INPUT_FILE = "bibliography.json"
OPTIONS = {'multiline': 'true', 'allowNumericLeadingZero': 'true','timestampFormat': "yyyy-MM-dd'T'HH:mm:ss[.ZZZ'Z']"}

In [None]:
#AUTHOR TABLE
schemaAut = StructType(
            [StructField('authors', ArrayType(StructType([
                StructField('_id', StringType(), nullable = False),
                StructField('name', StringType(), True),
                StructField('email', StringType(), True),
                StructField('bio', StringType(), True),
                ])), True)
            ])

dfAut = spark.read.format('json').options(**OPTIONS).schema(schemaAut).json(INPUT_FILE)
dfAut = dfAut.select(explode(dfAut.authors))
dfAut = dfAut.withColumnRenamed("col", "authors")
dfAut = dfAut.filter(col("authors._id") != "null").select("authors._id","authors.name","authors.email", "authors.bio")
dfAut = dfAut.withColumnRenamed("_id", "authorID")
dfAut = dfAut.dropDuplicates(["authorID"])
dfAut.printSchema()
dfAut.show()

In [None]:
# PAPER TABLE WITHOUT PUBLICATION_ID
schemaPaper = StructType(
            [StructField('_id', StringType(), True),
             StructField('title', StringType(),True),
             StructField('keywords', ArrayType(StringType()), True),
             StructField('fos', ArrayType(StringType()), True),
             StructField('references', ArrayType(StringType()), True),
             StructField('page_start', IntegerType(), True),
             StructField('page_end', IntegerType(), True),
             StructField('lang', StringType(),True),
             StructField('doi', StringType(),True),
             StructField('url', ArrayType(StringType()),True),
             StructField('abstract', StringType(),True),
             StructField('publication_type', StringType(),True),
             StructField('date', TimestampType(), True)
            ])

dfPaper = spark.read.format('json').options(**OPTIONS).schema(schemaPaper).json(INPUT_FILE)
dfPaper = dfPaper.withColumnRenamed("_id", "paperID")
dfPaper.printSchema()
dfPaper.show()

In [None]:
# AFFILIATION TABLE
schemaAffiliation = StructType(
            [StructField('_id', StringType(), True),
             StructField('authors', ArrayType(StructType([
                    StructField('_id', StringType(), True),
                    StructField('org', StringType(), True)
             ])), True),
            ])

dfAff = spark.read.format('json').options(**OPTIONS).schema(schemaAffiliation).json(INPUT_FILE)
dfAff = dfAff.withColumnRenamed("_id", "paperID")
dfAff = dfAff.select("paperID", explode(dfAff.authors))
dfAff = dfAff.withColumnRenamed("col", "authors")
dfAff = dfAff.filter(col("authors._id") != "null").filter(col("paperID") != "null").select("paperID", "authors._id","authors.org")
dfAff = dfAff.withColumnRenamed("_id", "authorID")
dfAff = dfAff.dropDuplicates(["authorID", "paperID"])
dfAff = dfAff.withColumnRenamed("org", "organization")
dfAff.printSchema()
dfAff.show()

In [None]:
# JOURNAL TABLE
# Preprocessing of the journals for cleaning and merging the journals

journal_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('issn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('volume', IntegerType(), True),
     StructField('issue', IntegerType(), True),
     StructField('publication_type', StringType(),True)])

# Reading the json file
df_journals_to_filter = spark.read.format('json').options(**OPTIONS).schema(journal_schema_preprocessing).json(INPUT_FILE)

# Filtering and adjusting the dataframe
df_journals_to_filter = df_journals_to_filter.filter(col('publication_type') == 'Journal').filter(col('issn') != 'null').filter(col('venue') != 'null').filter(col('issue') >= 0).filter(col('volume') >= 0)
df_journals_to_filter = df_journals_to_filter.groupBy('venue', 'volume', 'issue', 'issn').agg(collect_list('publisher').alias('publishersArray'), collect_list('_id').alias('_id'), count(col('publisher'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
df_journals_to_insert = df_journals_to_filter.withColumn('publisher', df_journals_to_filter['publishersArray'][0]).select('venue', 'volume', 'issue', 'publisher', 'issn', '_id')

# Adding the new column which contains the publication identifier
df_journals = df_journals_to_insert.withColumn("publication id", xxhash64('venue', 'volume', 'issue', 'issn'))

# Adding the foreign key to the papers
exploded_journals = df_journals.select(explode('_id'), 'publication id')
#exploded_journals.show(truncate = False)

df_papers_in_journals = exploded_journals.join(dfPaper, exploded_journals.col == dfPaper.paperID, "inner")
df_papers_in_journals = df_papers_in_journals.drop('col')

df_journals = df_journals.drop(df_journals._id)

# Visualizing the data
# print('Papers')
# df_papers_in_journals.show(truncate = False)
print('Schema of the journals')
df_journals.printSchema()
print('Journals')
df_journals.show(truncate=False)

In [None]:
# BOOK TABLE
# Preprocessing of the books for cleaning and merging the books
book_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('isbn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(),True)])

# Reading the json file
dfbooks_to_filter = spark.read.format('json').options(**OPTIONS).schema(book_schema_preprocessing).json(INPUT_FILE)

# Filtering and adjusting the dataframe
dfbooks_to_filter = dfbooks_to_filter.filter(col('publication_type') == 'Book').filter(col('isbn') != 'null').filter(col('venue') != 'null')
dfbooks_to_filter = dfbooks_to_filter.groupBy('isbn', 'venue').agg(collect_list('publisher').alias('publishersArray'), collect_list('_id').alias('_id'), count(col('publisher'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
dfbooks_to_insert = dfbooks_to_filter.withColumn('publisher', dfbooks_to_filter['publishersArray'][0]).select('venue', 'isbn', 'publisher', '_id')

# Adding the new column which is the id
df_books = dfbooks_to_insert.withColumn('publication id', xxhash64('isbn', 'venue'))

# Adding the foreign key to the papers
exploded_books = df_books.select(explode('_id'), 'publication id')
# exploded_books.show(truncate = False)

df_papers_in_books = exploded_books.join(dfPaper, exploded_books.col == dfPaper.paperID)
df_papers_in_books = df_papers_in_books.drop('col')

df_books = df_books.drop(df_books._id)

# Visualizing the data
# print('Papers')
# df_papers_in_books.show(truncate = False)
print('Schema of the books')
df_books.printSchema()
print('Books')
df_books.show(truncate=False)

In [None]:
# CONFERENCE TABLE
# Preprocessing of the books for cleaning and merging the books

schemaConf = StructType(
    [StructField('_id', StringType(), True),
     StructField('location', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(),True)])
# Reading the json file
df_conferences_to_filter = spark.read.format('json').options(**OPTIONS).schema(schemaConf).json(INPUT_FILE)

# Filtering and adjusting the dataframe
df_conferences_to_filter = df_conferences_to_filter.filter(col('publication_type') == 'Conference').filter(col('venue') != 'null')
df_conferences_to_filter = df_conferences_to_filter.groupBy('venue').agg(collect_list('location').alias('locations_array'), collect_list('_id').alias('_id'), count(col('location'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
df_conferences_to_insert = df_conferences_to_filter.withColumn('location', df_conferences_to_filter['locations_array'][0]).select('venue', 'location', '_id')

# Adding the new column which is the id
df_conferences = df_conferences_to_insert.withColumn('publication id', xxhash64('venue'))

# Adding the foreign key to the papers
exploded_conferences = df_conferences.select(explode('_id'), 'publication id')
#exploded_conferences.show(truncate = False)

df_papers_in_conferences = exploded_conferences.join(dfPaper, exploded_conferences.col == dfPaper.paperID)
df_papers_in_conferences = df_papers_in_conferences.drop('col')

df_conferences = df_conferences.drop(df_conferences._id)

# Visualizing the data
#print('Papers')
#df_papers_in_conferences.show(truncate = False)
print('Schema of the conferences')
df_conferences.printSchema()
print('Conferences')
df_conferences.show(truncate=False)

In [None]:
# Merging the 3 dataframe which one contains the papers published in a specific media
df_papers = df_papers_in_books.union(df_papers_in_journals).union(df_papers_in_conferences)

# Visualizing the data
print('Papers schema')
df_papers.printSchema()
print('Papers data')
df_papers.show()

In [None]:
# For checking the result
print('Papers published in books')
df_papers.filter(col('publication_type') == 'Book').select('paperID', 'title', 'publication_type', 'publication id').show()
print('Papers published in journals')
df_papers.filter(col('publication_type') == 'Journal').select('paperID', 'title', 'publication_type', 'publication id').show()
print('Papers published in conferences')
df_papers.filter(col('publication_type') == 'Conference').select('paperID', 'title', 'publication_type', 'publication id').show()


COMMANDS

In [None]:
# Command 2:

In [None]:
# Command 4: delete a group of rows

# Use the function year to extract the year from the timestamp
from pyspark.sql.functions import year

# Drop rows with conditions – where clause
# From 37626 to 37175 -> delete all the rows that represent papers published before 1950, because obsolete
df_papers = df_papers.where(year('date') > '1950')
df_papers.select('title', 'publication_type', 'date').orderBy('date').show()

In [None]:
# Command 5: create a new column with the length of the paper (number of total pages)

df_papers_total_pages = df_papers \
    .filter((col('page_start') >= 0) & (col('page_end') >= 0) & (col('page_start') <= col('page_end'))) \
    .withColumn('total_pages', col('page_end') - col('page_start'))

df_papers_total_pages \
    .select(col('title'), col('page_start'), col('page_end'), col('total_pages')) \
    .show(5, truncate=False)

QUERIES

In [None]:
#Query 1: WHERE, JOIN

venue, volume, issue = ('BMC Bioinformatics', '14', '1') 

df_papers_q1 = df_journals\
               .filter((col('venue') == venue) &
                       (col('volume') == volume) &
                       (col('issue') == issue))\
                .join(df_papers,
                      (df_journals['publication id'] == df_papers['publication id']) &
                        (df_papers['publication_type'] == 'Journal')
                     )

df_papers_q1.select(['paperID', 'title']).show(truncate=60)

In [None]:
from pyspark.sql.functions import current_timestamp, unix_timestamp
# Query 2: WHERE, LIMIT, LIKE
# Find the papers written in the last twenty years in english whose keywords have the word \verb|artificial| inside the keywords. We require that these papers have the DOI set to a not null value.
# The results are ordered ascendengly by the date and only 20 elements are printed.

df = df_papers.withColumn('current time', current_timestamp())
df = df\
    .filter(
        (((unix_timestamp('current time') - unix_timestamp('date')) / 3600 / 24 / 365) > 50) & 
        (col('doi').isNotNull()))\
    .select('title', 'date', explode('keywords').alias('keyword'))\
    .filter(col('keyword').like('%artificial%'))\
    .distinct()\
    .sort(col('date').asc())\
    .limit(20)\
    .show(truncate = False)

In [None]:
# Query 4: GROUP BY, JOIN, AS
from pyspark.sql.functions import collect_set, size

df = dfAff\
    .join(df_papers, df_papers.paperID == dfAff.paperID, 'inner')\
    .drop(df_papers.paperID).select('paperID', 'organization', 'publication_type')\
    .filter((col('organization').isNotNull()) & (col('organization') != "") & (col('publication_type') == "Conference"))\
    .groupBy('organization')\
    .agg(collect_set('paperID').alias('papers'))\
    .filter(size(col('papers')) > 10)\
    .show(truncate = 50)

In [None]:
# Query 5: WHERE, GROUP BY
# Retrieve some statistics about papers
from pyspark.sql.functions import sum, min, max, avg, format_number, variance

df_papers_total_pages.filter(year(col('date')) >= 2015) \
    .groupBy(year(col('date')).alias('year')) \
    .agg(count('paperID').alias('total_papers'),
         sum('total_pages').alias('total_pages'),
         min('total_pages').alias('min_pages'),
         max('total_pages').alias('max_pages'),
         format_number(avg('total_pages'), 2).alias('avg_pages'),
         format_number(variance('total_pages'), 2).alias('var_pages')) \
    .sort(col('year').desc()) \
    .show()

In [None]:
# Query 6: GROUP BY, HAVING, AS

df_papers_q6 = df_papers\
               .select('paperID',
                       'title',
                       explode(col('references')).alias('reference'))\
               .groupBy('reference')\
               .agg(count('paperID').alias('references_count'))\
               .filter(col('references_count') > 30)\
               .join(df_papers,
                     col('reference') == df_papers.paperID)\
               .sort(col('references_count').desc())\

df_papers_q6.select(['title', 'references_count']).show(truncate=False)

In [None]:
# Query 7: WHERE, GROUP BY, HAVING, AS


In [None]:
# Query 9: WHERE, GROUP BY, HAVING, 1 JOIN

from pyspark.sql.functions import collect_set, concat, size

df_journals_venue_rename = df_journals.withColumnRenamed('venue', 'venueJournals')
df_books_venue_rename = df_books.withColumnRenamed('venue', 'venueBooks')

df = df_journals_venue_rename\
    .filter((col('volume')) > 10)\
    .join(df_books_venue_rename,
          df_books_venue_rename.publisher == df_journals_venue_rename.publisher,
          "inner")\
    .drop(df_journals.publisher)\
    .select('venueBooks', 'venueJournals', 'publisher')\
    .dropDuplicates(['venueBooks', 'venueJournals', 'publisher'])\
    .groupBy('publisher')\
    .agg(collect_set('venueBooks').alias('books'),
         collect_set('venueJournals').alias('journals'))\
    .withColumn("total_publications_per_publisher",
                concat(col("books"), col("journals")))\
    .filter(size(col("total_publications_per_publisher")) > '500')\
    .select('publisher', "total_publications_per_publisher")\
    .show(truncate = 50)

In [None]:
# Query 10: WHERE, GROUP BY, HAVING, 2 JOINs
#Prendere i journal con almeno 5 for e 5  references i cui autori hanno scritto almeno 3 e che hanno lavorato in affiliation di tipo politecnico, mettere anche la order by e limit

df_papers \
    .filter((size(col('fos')) >= 5) & (size(col('references')) >= 5)) \
    .join(dfAff, df_papers.paperID == dfAff.paperID, "inner") \
    .show()
#.withColumn('size', size(col('references'))) \
#.sort(col('size').desc()).select(col('title'), col('size')).show()



In [None]:
# Query 9: WHERE, GROUP BY, HAVING, 1 JOIN




In [None]:
# Query 10: WHERE, GROUP BY, HAVING, 2 JOINs
#Prendere i journal con almeno 5 for e 5  references i cui autori hanno scritto almeno 3 e che hanno lavorato in affiliation di tipo politecnico, mettere anche la order by e limit

df_papers \
    .filter((size(col('fos')) >= 5) & (size(col('references')) >= 5)) \
    .join(dfAff, df_papers.paperID == dfAff.paperID, "inner") \
    .show()
#.withColumn('size', size(col('references'))) \
#.sort(col('size').desc()).select(col('title'), col('size')).show()

