In [None]:
# Dowloading pyspark
#!pip install pyspark

PREPARE THE ENVIRONMENT, UPLOAD DATA, PREPROCESS DATA AND CREATE THE TABLES: Author, Paper, Affiliation, Book, Journal and Conference

In [1]:
# With sparkSession we create a connection to our database
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType
from pyspark.sql.functions import count, col, xxhash64, collect_list, explode, monotonically_increasing_id

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("Bibliography") \
      .getOrCreate()

INPUT_FILE = "bibliography.json"
OPTIONS = {'multiline': 'true', 'allowNumericLeadingZero': 'true','timestampFormat': "yyyy-MM-dd'T'HH:mm:ss[.ZZZ'Z']"}

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/08 09:07:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#AUTHOR TABLE
schemaAut = StructType(
            [StructField('authors', ArrayType(StructType([
                StructField('_id', StringType(), nullable = False),
                StructField('name', StringType(), True),
                StructField('email', StringType(), True),
                StructField('bio', StringType(), True),
                ])), True)
            ])

dfAut = spark.read.format('json').options(**OPTIONS).schema(schemaAut).json(INPUT_FILE)
dfAut = dfAut.select(explode(dfAut.authors))
dfAut = dfAut.withColumnRenamed("col", "authors")
dfAut = dfAut.filter(col("authors._id") != "null").select("authors._id","authors.name","authors.email", "authors.bio")
dfAut = dfAut.withColumnRenamed("_id", "authorID")
dfAut = dfAut.dropDuplicates(["authorID"])
dfAut.printSchema()
dfAut.show()

root
 |-- authorID: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- bio: string (nullable = true)



                                                                                

+--------------------+------------------+--------------------+--------------------+
|            authorID|              name|               email|                 bio|
+--------------------+------------------+--------------------+--------------------+
|53f3186fdabfae9a8...|   A. M. A. Hariri|a..m..a..hariride...|My name is A. M. ...|
|53f3186fdabfae9a8...|    Matthew Prowse|matthew.prowsefb@...|My name is Matthe...|
|53f31870dabfae9a8...|       Sui-ping Qi|sui-ping.qi19@gma...|My name is Sui-pi...|
|53f31871dabfae9a8...|     Renato Fabbri|renato.fabbrib7@g...|My name is Renato...|
|53f31873dabfae9a8...|   Joachim Schimpf|joachim.schimpf8a...|My name is Joachi...|
|53f31874dabfae9a8...|    E. Di Bernardo|e..di.bernardo10@...|My name is E. Di ...|
|53f31875dabfae9a8...|    Steven F. Roth|steven.f..roth46@...|My name is Steven...|
|53f31878dabfae9a8...|      Nima Zahadat|nima.zahadat3d@gm...|My name is Nima Z...|
|53f3187ddabfae9a8...|         Ke Fa Cen|ke.fa.cen23@gmail...|My name is Ke 

In [3]:
# PAPER TABLE WITHOUT PUBLICATION_ID
schemaPaper = StructType(
            [StructField('_id', StringType(), True),
             StructField('title', StringType(),True),
             StructField('keywords', ArrayType(StringType()), True),
             StructField('fos', ArrayType(StringType()), True),
             StructField('references', ArrayType(StringType()), True),
             StructField('page_start', IntegerType(), True),
             StructField('page_end', IntegerType(), True),
             StructField('lang', StringType(),True),
             StructField('doi', StringType(),True),
             StructField('url', ArrayType(StringType()),True),
             StructField('abstract', StringType(),True),
             StructField('publication_type', StringType(),True),
             StructField('date', TimestampType(), True)
            ])

dfPaper = spark.read.format('json').options(**OPTIONS).schema(schemaPaper).json(INPUT_FILE)
dfPaper = dfPaper.withColumnRenamed("_id", "paperID")
dfPaper.printSchema()
dfPaper.show()

root
 |-- paperID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- page_start: integer (nullable = true)
 |-- page_end: integer (nullable = true)
 |-- lang: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- url: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- abstract: string (nullable = true)
 |-- publication_type: string (nullable = true)
 |-- date: timestamp (nullable = true)



[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------+----+--------------------+--------------------+--------------------+----------------+-------------------+
|             paperID|               title|            keywords|                 fos|          references|page_start|page_end|lang|                 doi|                 url|            abstract|publication_type|               date|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------+----+--------------------+--------------------+--------------------+----------------+-------------------+
|53e99784b7602d970...|Using XML to Inte...|[internet, hyperm...|[xml base, world ...|[53e9adbdb7602d97...|       167|     172|  en|10.1109/CMPSAC.20...|[http://dx.doi.or...|The eXtensible Ma...|            Book|1974-09-13 06:34:29|
|53e99784b7602d970...|               FCLOS|[molap, subsumpti...|[informa

                                                                                

In [4]:
# AFFILIATION TABLE
schemaAffiliation = StructType(
            [StructField('_id', StringType(), True),
             StructField('authors', ArrayType(StructType([
                    StructField('_id', StringType(), True),
                    StructField('org', StringType(), True)
             ])), True),
            ])

dfAff = spark.read.format('json').options(**OPTIONS).schema(schemaAffiliation).json(INPUT_FILE)
dfAff = dfAff.withColumnRenamed("_id", "paperID")
dfAff = dfAff.select("paperID", explode(dfAff.authors))
dfAff = dfAff.withColumnRenamed("col", "authors")
dfAff = dfAff.filter(col("authors._id") != "null").filter(col("paperID") != "null").select("paperID", "authors._id","authors.org")
dfAff = dfAff.withColumnRenamed("_id", "authorID")
dfAff = dfAff.dropDuplicates(["authorID", "paperID"])
dfAff = dfAff.withColumnRenamed("org", "organization")
dfAff.printSchema()
dfAff.show()

root
 |-- paperID: string (nullable = true)
 |-- authorID: string (nullable = true)
 |-- organization: string (nullable = true)



[Stage 4:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|             paperID|            authorID|        organization|
+--------------------+--------------------+--------------------+
|53e998c7b7602d970...|53f3186fdabfae9a8...|Department of Sta...|
|53e99827b7602d970...|53f3186fdabfae9a8...|Laboratory for Fo...|
|53e99924b7602d970...|53f31870dabfae9a8...|Henan Academy of ...|
|53e998dbb7602d970...|53f31871dabfae9a8...|Instituto de Físi...|
|53e998f6b7602d970...|53f31873dabfae9a8...|                null|
|53e998bfb7602d970...|53f31874dabfae9a8...|                null|
|53e9984bb7602d970...|53f31875dabfae9a8...|                null|
|53e998e8b7602d970...|53f31878dabfae9a8...|George Mason Univ...|
|53e99905b7602d970...|53f3187ddabfae9a8...|State Key Laborat...|
|53e998e9b7602d970...|53f31881dabfae9a8...|                null|
|53e9984fb7602d970...|53f31881dabfae9a8...|Tecnologico de Mo...|
|53e9980eb7602d970...|53f31883dabfae9a8...|University of Was...|
|53e997e9b7602d970...|53f

                                                                                

In [144]:
# BOOK TABLE
# Preprocessing of the books for cleaning and merging the books
book_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('isbn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(),True)])

# Reading the json file
dfbooks_to_filter = spark.read.format('json').options(**OPTIONS).schema(book_schema_preprocessing).json(INPUT_FILE)

# Filtering and adjusting the dataframe
dfbooks_to_filter = dfbooks_to_filter.filter(col('publication_type') == 'Book').filter(col('isbn') != 'null').filter(col('venue') != 'null')
dfbooks_to_filter = dfbooks_to_filter.groupBy('isbn', 'venue').agg(collect_list('publisher').alias('publishersArray'), collect_list('_id').alias('_id'), count(col('publisher'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
dfbooks_to_insert = dfbooks_to_filter.withColumn('publisher', dfbooks_to_filter['publishersArray'][0]).select('venue', 'isbn', 'publisher', '_id')

# Adding the new column which is the id
df_books = dfbooks_to_insert.withColumn('publication id', xxhash64('isbn', 'venue'))

# Adding the foreign key to the papers
exploded_books = df_books.select(explode('_id'), 'publication id')
# exploded_books.show(truncate = False)

df_papers_in_books = exploded_books.join(dfPaper, exploded_books.col == dfPaper.paperID)
df_papers_in_books = df_papers_in_books.drop('col')

df_books = df_books.drop(df_books._id)

# Visualizing the data
# print('Papers')
# df_papers_in_books.show(truncate = False)
print('Schema of the books')
df_books.printSchema()
print('Books')
df_books.show(truncate=False)

Schema of the books
root
 |-- venue: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publication id: long (nullable = false)

Books


[Stage 1025:>                                                       (0 + 1) / 1]

+------------------------------------------------------------------------------------+-------------+--------------------------------------------------+--------------------+
|venue                                                                               |isbn         |publisher                                         |publication id      |
+------------------------------------------------------------------------------------+-------------+--------------------------------------------------+--------------------+
|ACM SIGSOFT Software Engineering Notes                                              |-159593-125-2|AGH University of Science and Technology          |-4146681027596907540|
|Theor. Comput. Sci.                                                                 |0-0304-3975  |Elsevier                                          |-8641604748836216424|
|ACL (Companion)                                                                     |0-111-456789 |Intellect Ltd.                     

                                                                                

In [145]:
#from google.colab import drive
#drive.mount('/content/drive')

In [146]:
# CONFERENCE TABLE
# Preprocessing of the books for cleaning and merging the books

schemaConf = StructType(
    [StructField('_id', StringType(), True),
     StructField('location', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('publication_type', StringType(),True)])
# Reading the json file
df_conferences_to_filter = spark.read.format('json').options(**OPTIONS).schema(schemaConf).json(INPUT_FILE)

# Filtering and adjusting the dataframe
df_conferences_to_filter = df_conferences_to_filter.filter(col('publication_type') == 'Conference').filter(col('venue') != 'null')
df_conferences_to_filter = df_conferences_to_filter.groupBy('venue').agg(collect_list('location').alias('locations_array'), collect_list('_id').alias('_id'), count(col('location'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
df_conferences_to_insert = df_conferences_to_filter.withColumn('location', df_conferences_to_filter['locations_array'][0]).select('venue', 'location', '_id')

# Adding the new column which is the id
df_conferences = df_conferences_to_insert.withColumn('publication id', xxhash64('venue'))

# Adding the foreign key to the papers
exploded_conferences = df_conferences.select(explode('_id'), 'publication id')
#exploded_conferences.show(truncate = False)

df_papers_in_conferences = exploded_conferences.join(dfPaper, exploded_conferences.col == dfPaper.paperID)
df_papers_in_conferences = df_papers_in_conferences.drop('col')

df_conferences = df_conferences.drop(df_conferences._id)

# Visualizing the data
#print('Papers')
#df_papers_in_conferences.show(truncate = False)
print('Schema of the conferences')
df_conferences.printSchema()
print('Conferences')
df_conferences.show(truncate=False)

Schema of the conferences
root
 |-- venue: string (nullable = true)
 |-- location: string (nullable = true)
 |-- publication id: long (nullable = false)

Conferences


[Stage 1028:>                                                       (0 + 1) / 1]

+----------------------------------------------------------------------------------------+--------------------------+--------------------+
|venue                                                                                   |location                  |publication id      |
+----------------------------------------------------------------------------------------+--------------------------+--------------------+
|"EDUCON                                                                                 |Moscow, Russia            |950373860555954453  |
|2012 50TH ANNUAL ALLERTON CONFERENCE ON COMMUNICATION, CONTROL, AND COMPUTING (ALLERTON)|Dublin, Ireland           |-4245717996156385657|
|2985415099                                                                              |Mexico City, Mexico       |-762166203857654039 |
|2985532720                                                                              |Milan, Italy              |-7124098142925130015|
|2987493177                

                                                                                

In [147]:
# Merging the 3 dataframe which one contains the papers published in a specific media
df_papers = df_papers_in_books.union(df_papers_in_journals).union(df_papers_in_conferences)

# Visualizing the data
print('Papers schema')
df_papers.printSchema()
print('Papers data')
df_papers.show()

Papers schema
root
 |-- publication id: long (nullable = false)
 |-- paperID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- page_start: integer (nullable = true)
 |-- page_end: integer (nullable = true)
 |-- lang: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- url: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- abstract: string (nullable = true)
 |-- publication_type: string (nullable = true)
 |-- date: timestamp (nullable = true)

Papers data


                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------+----+--------------------+--------------------+--------------------+----------------+-------------------+
|      publication id|             paperID|               title|            keywords|                 fos|          references|page_start|page_end|lang|                 doi|                 url|            abstract|publication_type|               date|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------+----+--------------------+--------------------+--------------------+----------------+-------------------+
|-4336127124407324726|53e997d1b7602d970...|A fault diagnosis...|[stuck at defects...|[computer testing...|[53e9bca6b7602d97...|       494|     494|  en|10.1109/EDTC.1997...|[http://dx.doi.or...|In this paper we ...|            Book|1950-09-0

In [148]:
# For checking the result
print('Papers published in books')
df_papers.filter(col('publication_type') == 'Book').select('paperID', 'title', 'publication_type', 'publication id').show()
print('Papers published in journals')
df_papers.filter(col('publication_type') == 'Journal').select('paperID', 'title', 'publication_type', 'publication id').show()
print('Papers published in conferences')
df_papers.filter(col('publication_type') == 'Conference').select('paperID', 'title', 'publication_type', 'publication id').show()


Papers published in books


                                                                                

+--------------------+--------------------+----------------+--------------------+
|             paperID|               title|publication_type|      publication id|
+--------------------+--------------------+----------------+--------------------+
|53e997d1b7602d970...|A fault diagnosis...|            Book|-4336127124407324726|
|53e997e4b7602d970...|Problem Decomposi...|            Book| 4674035537543784623|
|53e997e8b7602d970...|X-tract: Structur...|            Book| 1144530160964015551|
|53e997e8b7602d970...|Cognitive agent p...|            Book|-6465152039831803131|
|53e997e9b7602d970...|Constraint based ...|            Book| 5112790730981969721|
|53e997ecb7602d970...|Automatic input r...|            Book| 1369111316993402587|
|53e997ecb7602d970...|Acceptability-ori...|            Book| 1430384888819899512|
|53e997ecb7602d970...|Anomalous Neighbo...|            Book|  710748220667054414|
|53e997f1b7602d970...|A Digital Watermark.|            Book| 8148044123726554362|
|53e997f4b7602d9

                                                                                

+--------------------+--------------------+----------------+--------------------+
|             paperID|               title|publication_type|      publication id|
+--------------------+--------------------+----------------+--------------------+
|53e99915b7602d970...|A note on robust ...|         Journal|-8668689828003778653|
|53e99984b7602d970...|A new approach fo...|         Journal|-6055461944651662439|
|53e998b0b7602d970...|Two-machine flow ...|         Journal| 4482587362010925183|
|53e9994cb7602d970...|Stochastic semide...|         Journal|-5369333412851545957|
|53e9990db7602d970...|Models and algori...|         Journal| 4907811008184705540|
|53e99952b7602d970...|Maximizing the mi...|         Journal|-7478992719207099405|
|53e99800b7602d970...|Integer extended ...|         Journal|-5578571051264951046|
|53e99858b7602d970...|Attraction probab...|         Journal| 3303577774132183121|
|53e9998bb7602d970...|Strategy vs risk ...|         Journal|-3839102222582550984|
|53e99915b7602d9

                                                                                

+--------------------+--------------------+----------------+--------------------+
|             paperID|               title|publication_type|      publication id|
+--------------------+--------------------+----------------+--------------------+
|53e99854b7602d970...|  The EOLES project.|      Conference|  950373860555954453|
|53e9989bb7602d970...|Life is engineeri...|      Conference|  950373860555954453|
|53e998bfb7602d970...|Gaining and maint...|      Conference|  950373860555954453|
|53e998c0b7602d970...|From manuals towa...|      Conference|  950373860555954453|
|53e998e9b7602d970...|Learning with com...|      Conference|  950373860555954453|
|53e99976b7602d970...|Cloud E-learning ...|      Conference|  950373860555954453|
|53e9997eb7602d970...|Motivating progra...|      Conference|  950373860555954453|
|53e99991b7602d970...|Monitoring studen...|      Conference|  950373860555954453|
|53e99998b7602d970...|OLAREX project: O...|      Conference|  950373860555954453|
|53e99859b7602d9

COMMANDS

In [None]:
# Command 4: delete a group of rows

# Use the function year to extract the year from the timestamp
from pyspark.sql.functions import year

# Drop rows with conditions – where clause
# From 37626 to 37175 -> delete all the rows that represent papers published before 1950, because obsolete
df_papers = df_papers.where(year('date') > '1950')
df_papers.select('title', 'publication_type', 'date').orderBy('date').show()

In [None]:
# Command 5: create a new column with the length of the paper (number of total pages)

df_papers_total_pages = df_papers \
    .filter((col('page_start') >= 0) & (col('page_end') >= 0) & (col('page_start') <= col('page_end'))) \
    .withColumn('total_pages', col('page_end') - col('page_start'))

df_papers_total_pages \
    .select(col('title'), col('page_start'), col('page_end'), col('total_pages')) \
    .show(5, truncate=False)

QUERIES

In [None]:
#Query 1: WHERE, JOIN
# from pyspark.sql.functions import collect_list, size
# 
# venue, volume, issue = ('IEEE Internet Computing', '5', '6') 
# 
# df_papers_q1 = df_journals\
#                 .filter((col('venue') == venue) &
#                         (col('volume') == volume) &
#                         (col('issue') == issue))\
#                 .join(df_papers.select(['paperID', 'publication id', 'publication_type', 'title']),
#                       (df_journals['publication id'] == df_papers['publication id']) &
#                         (df_papers['publication_type'] == 'Journal'),
#                       'inner')\
#                 .groupBy(['venue', 'volume', 'issue'])\
#                 .agg(collect_list('title').alias('papers'))\
#                 .withColumn('papers', size(col('papers')))
# 
# df_papers_q1.select(['papers']).show(truncate=False)

In [130]:
# JOURNAL TABLE
# Preprocessing of the journals for cleaning and merging the journals

journal_schema_preprocessing = StructType(
    [StructField('_id', StringType(), True),
     StructField('issn', StringType(), True),
     StructField('publisher', StringType(), True),
     StructField('venue', StringType(), True),
     StructField('volume', IntegerType(), True),
     StructField('issue', IntegerType(), True),
     StructField('publication_type', StringType(),True)])

# Reading the json file
df_journals_to_filter = spark.read.format('json').options(**OPTIONS).schema(journal_schema_preprocessing).json(INPUT_FILE)

# Filtering and adjusting the dataframe
df_journals_to_filter = df_journals_to_filter\
                            .filter(col('publication_type') == 'Journal')\
                            .filter(col('issn') != 'null')\
                            .filter(col('venue') != 'null')\
                            .filter(col('issue') >= 0)\
                            .filter(col('volume') >= 0)

df_journals_to_filter = df_journals_to_filter\
                            .groupBy('venue', 'volume', 'issue', 'issn')\
                            .agg(collect_list('publisher').alias('publishersArray'),
                                 collect_list('_id').alias('_id'),
                                 count(col('publisher'))) # count can be removed (I was interested in evaluating if the group by was meaningful)
df_journals_to_insert = df_journals_to_filter\
                            .withColumn('publisher', df_journals_to_filter['publishersArray'][0])\
                            .select('venue', 'volume', 'issue', 'publisher', 'issn', '_id')

# Adding the new column which contains the publication identifier
df_journals = df_journals_to_insert.withColumn("publication id", xxhash64('venue', 'volume', 'issue', 'issn'))
df_journals2 = df_journals_to_insert.withColumn("publication id", monotonically_increasing_id())

df_journals4 = df_journals_to_insert.withColumns({'hash': xxhash64('venue', 'volume', 'issue', 'issn'),
                                                  'publication id': monotonically_increasing_id()})

# df_journals.printSchema()
# Adding the foreign key to the papers
exploded_journals = df_journals.select(explode('_id'), 'publication id')
exploded_journals2 = df_journals2.select(explode('_id').alias('pID'), 'publication id')
exploded_journals4 = df_journals4.select(explode('_id').alias('pID'), 'publication id', 'hash')
#exploded_journals.show(truncate = False)

df_papers_in_journals = exploded_journals.join(dfPaper, exploded_journals.col == dfPaper.paperID, "inner")
df_papers_in_journals = df_papers_in_journals.drop('col')

df_papers_in_journals2 = exploded_journals2\
                            .join(dfPaper, exploded_journals2.pID == dfPaper.paperID, "inner")
df_papers_in_journals2 = df_papers_in_journals2.drop('pID')

df_papers_in_journals4 = exploded_journals4\
                            .join(dfPaper, exploded_journals4.pID == dfPaper.paperID, "inner")
df_papers_in_journals4 = df_papers_in_journals4.drop('pID')

df_journals3 = df_journals.withColumn('hash', col('publication id')).drop('publication id').drop('_id')\
                            .join(df_journals2.drop('publisher'),
                                  ['venue', 'volume', 'issue', 'issn'],
                                 'inner')

# df_journals3.printSchema()
exploded_journals3 = df_journals3.select(explode('_id').alias('pID'), 'publication id', 'hash')

df_papers_in_journals3 = exploded_journals3\
                            .join(dfPaper, exploded_journals3.pID == dfPaper.paperID, "inner")

df_papers_in_journals3.drop('pID')
all_papers = df_papers_in_journals3\
                .select(['paperID', 'hash', 'publication id', 'publication_type', 'title'])\
                .join(df_papers_in_journals2.select(col('paperID').alias('p2'), col('publication id').alias('pub_join')),
                      df_papers_in_journals3.paperID == col('p2'))\
                .join(df_papers_in_journals.select(col('paperID').alias('p1'), col('publication id').alias('hash_join')),
                      df_papers_in_journals3.paperID == col('p1'))

stats = all_papers.withColumn('pubID?', col('publication id') == col('pub_join'))\
                  .withColumn('hashID?', col('hash') == col('pub_join'))

stats.printSchema()


df_journals = df_journals.drop(df_journals._id)
df_journals2 = df_journals2.drop('_id')
df_journals4 = df_journals4.drop('_id')
# Visualizing the data
# print('Papers')
# df_papers_in_journals.show(truncate = False)
# print('Schema of the journals')
# df_papers_in_journals.printSchema()
# df_journals.printSchema()
# print('Journals')
# df_journals.show(truncate=False)

root
 |-- paperID: string (nullable = true)
 |-- hash: long (nullable = false)
 |-- publication id: long (nullable = false)
 |-- publication_type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- p2: string (nullable = true)
 |-- pub_join: long (nullable = false)
 |-- p1: string (nullable = true)
 |-- hash_join: long (nullable = false)
 |-- pubID?: boolean (nullable = false)
 |-- hashID?: boolean (nullable = false)



In [70]:
tmp = stats.agg(count(col('pubID?') == True))
tmp.show()
tmp = stats.agg(count(col('hashID?') == True))
tmp.show()

                                                                                

+----------------------+
|count((pubID? = true))|
+----------------------+
|                 13708|
+----------------------+



                                                                                

+-----------------------+
|count((hashID? = true))|
+-----------------------+
|                  13708|
+-----------------------+



In [126]:
print(df_journals.count())
print(df_journals.drop_duplicates(['publication id']).count())

print('2')
print(df_journals2.count())
print(df_journals2.drop_duplicates(['publication id']).count())

print('3')
print(df_journals3.count())
print(df_journals3.drop_duplicates(['publication id', 'hash']).count())

print('4 mon_inc')
print(df_journals4.count())
print(df_journals4.drop_duplicates(['publication id']).count())

print('4 hash')
print(df_journals4.count())
print(df_journals4.drop_duplicates(['hash']).count())

print('Papers')
print(df_papers_in_journals.count())
print(df_papers_in_journals2.count())

                                                                                

11270


                                                                                

11270
2


                                                                                

11270


                                                                                

11270
3


                                                                                

11270


                                                                                

11270
4 mon_inc


                                                                                

11270


                                                                                

11270
4 hash


                                                                                

11270


                                                                                

11270
Papers


                                                                                

13708


[Stage 898:>                                                        (0 + 1) / 1]

13708


                                                                                

In [140]:
keys = df_papers_in_journals4.select('publication id', 'hash').dropDuplicates()
print(keys.count())
jkeys = df_journals4.select('publication id', 'hash').dropDuplicates()
print(jkeys.count())

                                                                                

11270


[Stage 1003:>                                                       (0 + 1) / 1]

11270


                                                                                

In [142]:
papers = df_papers_in_journals4
journals = df_journals4

journals.printSchema()
df_journals.printSchema()
papers.printSchema()

root
 |-- venue: string (nullable = true)
 |-- volume: integer (nullable = true)
 |-- issue: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- issn: string (nullable = true)
 |-- hash: long (nullable = false)
 |-- publication id: long (nullable = false)

root
 |-- venue: string (nullable = true)
 |-- volume: integer (nullable = true)
 |-- issue: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- issn: string (nullable = true)
 |-- publication id: long (nullable = false)

root
 |-- publication id: long (nullable = false)
 |-- hash: long (nullable = false)
 |-- paperID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- page_start: integer (nullable = true)
 |-- page_end: inte

In [141]:
from pyspark.sql.functions import collect_list, size


venue, volume, issue = ('IEEE Internet Computing', '5', '6') 

papers = df_papers_in_journals4
journals = df_journals4
df_papers_q1 = journals\
                .join(papers,
                      ['hash'],
                      'inner')\
                .groupBy('venue', 'volume', 'issue')\
                .agg(collect_list('title').alias('papers'))\
                .withColumn('papers', size(col('papers')))\
                .sort(col('papers').desc())

df_papers_q1.select(col('venue').alias('name'), 'volume', 'issue', 'papers').show()

                                                                                

+--------------------+------+-----+------+
|                name|volume|issue|papers|
+--------------------+------+-----+------+
|  BMC Bioinformatics|     7|    1|    18|
|  BMC Bioinformatics|     9|    1|    16|
|  BMC Bioinformatics|    11|    1|    13|
|  BMC Bioinformatics|     6|    1|    12|
|  BMC Bioinformatics|     8|    1|    11|
|   ACM Trans. Graph.|    27|    3|    10|
|  BMC Bioinformatics|    10|    1|     9|
|  BMC Bioinformatics|    14|    1|     9|
| BMC systems biology|     6|    1|     9|
|  Expert Syst. Appl.|    39|    1|     8|
|   ACM Trans. Graph.|    23|    3|     8|
|   ACM Trans. Graph.|    24|    3|     8|
|  BMC Bioinformatics|    12|    1|     8|
|   ACM Trans. Graph.|    28|    5|     7|
| BMC systems biology|     4|    1|     7|
| BMC systems biology|     2|    1|     7|
|  Expert Syst. Appl.|    39|    3|     7|
|Procedia Computer...|     1|    1|     7|
|  Expert Syst. Appl.|    39|    5|     7|
|             SIGCOMM|    43|    4|     7|
+----------

In [None]:
# Query 4: GROUP BY, JOIN, AS
from pyspark.sql.functions import collect_set, concat, size

df_journals_venue_rename = df_journals.withColumnRenamed('venue', 'venueJournals')
df_books_venue_rename = df_books.withColumnRenamed('venue', 'venueBooks')
df = df_books_venue_rename\
    .join(df_journals_venue_rename,
          df_books_venue_rename.publisher == df_journals_venue_rename.publisher,
          "inner")\
    .drop(df_journals.publisher)\
    .select('venueBooks', 'venueJournals', 'publisher')\
    .dropDuplicates(['venueBooks', 'venueJournals', 'publisher'])\
    .groupBy('publisher')\
    .agg(collect_set('venueBooks').alias('books'),
         collect_set('venueJournals').alias('journals'))\
    .withColumn("total_publications_per_publisher",
                concat(col("books"), col("journals")))\
    .filter(size(col("total_publications_per_publisher")) > '500')\
    .select('publisher', "total_publications_per_publisher")\
    .show(truncate = 50)

In [None]:
# Query 5: WHERE, GROUP BY
# Retrieve some statistics about papers
from pyspark.sql.functions import sum, min, max, avg, format_number, variance

df_papers_total_pages.filter(year(col('date')) >= 2015) \
    .groupBy(year(col('date')).alias('year')) \
    .agg(count('paperID').alias('total_papers'),
         sum('total_pages').alias('total_pages'),
         min('total_pages').alias('min_pages'),
         max('total_pages').alias('max_pages'),
         format_number(avg('total_pages'), 2).alias('avg_pages'),
         format_number(variance('total_pages'), 2).alias('var_pages')) \
    .sort(col('year').desc()) \
    .show()

In [None]:
# Query 6: GROUP BY, HAVING, AS
df_papers_q6 = df_papers\
                    .select(['paperID', 'title', explode(col('references')).alias('reference')])\
                    .groupBy('reference')\
                    .agg(count('*').alias('references_count'))\
                    .filter(col('references_count') > 30)\
                    .join(df_papers.select(['paperID', 'title', 'doi', 'url']),
                          col('reference') == df_papers.paperID)\
                    .sort(col('references_count').desc())\

df_papers_q6.select(['title', 'references_count']).show(truncate=False)

In [None]:
# Query 9: WHERE, GROUP BY, HAVING, 1 JOIN




In [None]:
# Query 10: WHERE, GROUP BY, HAVING, 2 JOINs
# Retrieve authors who worked for at least 3 different organizations and have published at least 3 papers with at least 5 fos and 5 references each

from pyspark.sql.functions import approx_count_distinct

df_papers \
    .filter((size(col('fos')) >= 5) & (size(col('references')) >= 5)) \
    .join(dfAff, df_papers.paperID == dfAff.paperID, "inner") \
    .drop(dfAff.paperID) \
    .join(dfAut, dfAff.authorID == dfAut.authorID, "inner") \
    .drop(dfAff.authorID) \
    .groupBy("authorID") \
    .agg(count("paperID").alias("papers_count"),                                                                                       approx_count_distinct("organization").alias("organizations_count"),
         collect_set("name").alias("name")) \
    .filter((size("name") == 1) & (col("papers_count") >= 3)  & (col("organizations_count") >= 3)) \
    .orderBy(col("papers_count").desc(), col("organizations_count").desc()) \
    .select(explode(col("name")).alias("name"), col("papers_count"), col("organizations_count")) \
    .show(5)

In [154]:
df_papers_q6 = df_papers\
                    .select('paperID',
                            'title',
                            explode(col('references')).alias('reference'))\
                    .groupBy('reference')\
                    .agg(count('paperID').alias('references_count'))\
                    .filter(col('references_count') > 30)\
                    .join(df_papers,
                          col('reference') == df_papers.paperID)\
                    .sort(col('references_count').desc())\

df_papers_q6.select(['title', 'references_count']).show(truncate=False)

                                                                                

+-----------------------------------------------------------------+----------------+
|title                                                            |references_count|
+-----------------------------------------------------------------+----------------+
|Distinctive Image Features from Scale-Invariant Keypoints        |196             |
|A simple transmit diversity technique for wireless communications|62              |
|Light field rendering                                            |56              |
|Network information flow                                         |56              |
|Symbolic Model Checking                                          |45              |
|Mining Sequential Patterns                                       |44              |
|Geodesic Active Contour.                                         |40              |
|Differential Power Analysis                                      |38              |
+----------------------------------------------------------------