In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 58 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 72.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=ca69b4be9e656a12f5a39c35664ec88c728a0c973ca87c54607d958752eefc32
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


# Import

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
import json
import string
from numpy import nan
import random

spark = SparkSession.builder \
      .master("local") \
      .appName("Queries") \
      .getOrCreate()

In [3]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/SMBUD project/Third Delivery/Structures

Mounted at /gdrive
/gdrive/.shortcut-targets-by-id/1q0fAfMYWojuW0WzsoMALMBcukJ3n_QVK/SMBUD project/Third Delivery/Structures


In [4]:
with open("dataset.json") as json_file:
  data = json.load(json_file)

authors = pd.json_normalize(data, record_path=['authors'])
authors = authors.drop_duplicates('_id', keep='first')
venues = list()
pubs = list()
for el in data:
  venues.append(el['venue'])
  pub = {
      '_id': el['_id'],
      'title': el['title'],
      'year': el['year'],
      'n_citation': el['n_citation'] if 'n_citation' in el.keys() else 0,
      'page_start': el['page_start'] if 'page_start' in el.keys() else "",
      'page_end': el['page_end'] if 'page_end' in el.keys() else "",
      'lang': el['lang'] if 'lang' in el.keys() else "",
      'volume': el['volume'] if 'volume' in el.keys() else "",
      'issue': el['issue'] if 'issue' in el.keys() else "",
      'isbn': el['isbn'] if 'isbn' in el.keys() else "",
      'doi': el['doi'] if 'doi' in el.keys() else "",
      'pdf': el['pdf'],
      'abstract': el['abstract'] if 'abstract' in el.keys() else "",
      'publisher': el['publisher'],
  }
  pubs.append(pub)
publications = pd.DataFrame(pubs)
venue = pd.DataFrame(venues)
venue['name_d'] = venue['name_d'].fillna('')
venue['raw'] = venue['raw'].fillna('')
keywords = pd.json_normalize(data, record_path=['keywords'])
keywords = keywords.drop_duplicates(0, keep='first')
fos = pd.json_normalize(data, record_path=['fos'])
fos = fos.drop_duplicates(0, keep='first')
urls = pd.json_normalize(data, record_path=['url'])
urls = urls.drop_duplicates(0, keep='first')

In [5]:
letters = string.ascii_lowercase + string.digits
tmp_map_id = dict()

for i in range(len(venue.index)):
  if venue.iloc[i]['_id'] is nan:
    if venue.iloc[i]['name_d'] not in tmp_map_id.keys():
      venue.at[i, '_id'] = '53a72' + ''.join(random.choice(letters) for i in range(19))
    else:
      venue.at[i, '_id'] = tmp_map_id[venue.iloc[i]['name_d']]
  else:
    tmp_map_id[venue.iloc[i]['name_d']] = venue.iloc[i]['_id']

publications['venue_id'] = venue['_id']
venue = venue.drop_duplicates('_id', keep='last')

In [6]:
keywords['key_index'] = keywords.index
keywords = keywords.rename(columns={keywords.columns[0]: 'key_name'})
fos['fos_index'] = fos.index
fos = fos.rename(columns={fos.columns[0]: 'fos_name'})
urls['url_index'] = urls.index
urls = urls.rename(columns={urls.columns[0]: 'url_name'})

In [7]:
tmp_df = pd.DataFrame([], columns=['_id', 'keywords', 'fos', 'url', 'authors', 'references'])
for el in data:
  key_idx = [keywords[keywords.key_name == keyword]['key_index'].tolist()[0] for keyword in el['keywords']]
  url_idx = [urls[urls.url_name == url]['url_index'].tolist()[0] for url in el['url']]
  fos_idx = [fos[fos.fos_name == fos_]['fos_index'].tolist()[0] for fos_ in el['fos']]
  authors_orcid = [author['_id'] for author in el['authors']]
  refs = el['references'] if 'references' in el.keys() else []
  tmp_df.loc[len(tmp_df.index)] = [el['_id'], key_idx, fos_idx, url_idx, authors_orcid, refs]
publications = publications.merge(tmp_df)

In [8]:
authors = authors[['_id', 'firstname', 'lastname']]
venue = venue[['_id', 'name_d', 'type', 'raw', 'publisher']]

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, LongType

authors_schema = StructType([ \
    StructField("_id", StringType(), False), \
    StructField("firstname", StringType(), False), \
    StructField("lastname", StringType(), False) \
])

authors_spark = spark.createDataFrame(data=authors, schema=authors_schema) 
authors_spark.printSchema()
authors_spark.show(truncate=False)

root
 |-- _id: string (nullable = false)
 |-- firstname: string (nullable = false)
 |-- lastname: string (nullable = false)

+------------------------+------------+----------------+
|_id                     |firstname   |lastname        |
+------------------------+------------+----------------+
|53f46797dabfaeb22f542630|Jairo       |Rocha           |
|54328883dabfaeb4c6a8a699|Theo        |Pavlidis        |
|53f43b03dabfaedce555bf2a|Min         |Pan             |
|53f45ee9dabfaee43ecda842|Chris       |C. N. Chu       |
|53f42e8cdabfaee1c0a4274e|Hai         |Zhou            |
|548a2e3ddabfae9b40134fbc|Harry       |M. Sneed        |
|53f46a22dabfaee0d9c3d5e5|Shuguo      |Yang            |
|53f43b64dabfaefedbaf97e4|Ilias       |Michalarias     |
|53f43354dabfaedd74d80e7b|Arkadiy     |Omelchenko      |
|53f443b6dabfaeecd69a25b7|Hans-Joachim|Lenz            |
|53f437b0dabfaedce553b065|Mario       |Zuehlke         |
|53f7ba9cdabfae9060ae1f26|Hartmut     |König           |
|53f43640dabfaedf435

In [10]:
venues_schema = StructType([ \
    StructField("_id", StringType(), False), \
    StructField("name_d", StringType(), False), \
    StructField("type", StringType(), False), \
    StructField("raw", StringType(), True), \
    StructField("publisher", StringType(), True) \
])

venue_spark = spark.createDataFrame(data=venue, schema=venues_schema) 
venue_spark.printSchema()
venue_spark.show(truncate=False)

root
 |-- _id: string (nullable = false)
 |-- name_d: string (nullable = false)
 |-- type: string (nullable = false)
 |-- raw: string (nullable = true)
 |-- publisher: string (nullable = true)

+------------------------+-------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------+-------------+
|_id                     |name_d                                                                                                 |type      |raw                                                                                                    |publisher    |
+------------------------+-------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------+-------------+
|53a72a4920f7420be8bfa51b|

In [11]:
fos_schema = StructType([ \
    StructField("fos_name", StringType(), False), \
    StructField("fos_index", LongType(), False) \
    
])

fos_spark = spark.createDataFrame(data=fos, schema=fos_schema) 
fos_spark.printSchema()
fos_spark.show(truncate=False)

keywords_schema = StructType([ \
    StructField("key_name", StringType(), False), \
    StructField("key_index", LongType(), False) \
])

keywords_spark = spark.createDataFrame(data=keywords, schema=keywords_schema) 
keywords_spark.printSchema()
keywords_spark.show(truncate=False)

urls_schema = StructType([ \
    StructField("url_name", StringType(), False), \
    StructField("url_index", LongType(), False) \
])

urls_spark = spark.createDataFrame(data=urls, schema=urls_schema) 
urls_spark.printSchema()
urls_spark.show(truncate=False)

root
 |-- fos_name: string (nullable = false)
 |-- fos_index: long (nullable = false)

+----------------------------------+---------+
|fos_name                          |fos_index|
+----------------------------------+---------+
|feature (computer vision)         |0        |
|handwriting recognition           |1        |
|feature extraction                |2        |
|artificial intelligence           |3        |
|feature (machine learning)        |4        |
|optical character recognition     |5        |
|computer science                  |6        |
|intelligent word recognition      |7        |
|document processing               |8        |
|intelligent character recognition |9        |
|pattern recognition               |10       |
|static timing analysis            |11       |
|statistics                        |12       |
|sequential logic                  |13       |
|statistical static timing analysis|14       |
|algorithm                         |15       |
|clock skew         

In [12]:
publications_schema = StructType([ \
    StructField("_id", StringType(), False), \
    StructField("title", StringType(), False), \
    StructField("year", IntegerType(), False), \
    StructField("n_citation", IntegerType(), True), \
    StructField("page_start", StringType(), True), \
    StructField("page_end", StringType(), True), \
    StructField("lang", StringType(), True), \
    StructField("volume", StringType(), True), \
    StructField("issue", StringType(), True), \
    StructField("isbn", StringType(), True), \
    StructField("doi", StringType(), False), \
    StructField("pdf", StringType(), True), \
    StructField("abstract", StringType(), True), \
    StructField("publisher", StringType(), True), \
    StructField("venue_id", StringType(), True), \
    StructField("keywords", ArrayType(LongType()), True), \
    StructField("fos", ArrayType(LongType()), True), \
    StructField("url", ArrayType(LongType()), True), \
    StructField("authors", ArrayType(StringType()), True), \
    StructField("references", ArrayType(StringType()), True) \
])

publications_spark = spark.createDataFrame(data=publications, schema=publications_schema) 
publications_spark.printSchema()
publications_spark.show(truncate=False)

root
 |-- _id: string (nullable = false)
 |-- title: string (nullable = false)
 |-- year: integer (nullable = false)
 |-- n_citation: integer (nullable = true)
 |-- page_start: string (nullable = true)
 |-- page_end: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- doi: string (nullable = false)
 |-- pdf: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- venue_id: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- url: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)

# Data creation/update commands

In [13]:
from pyspark.sql.functions import col, when, lit
from pyspark.sql.functions import array_union, array, array_remove, array_contains

In [14]:
# Command 1
# Update the publications which published in 1993

publications_spark = publications_spark.withColumn('year',
    when(col('year') == 1993, 1994)
    .otherwise(col('year'))
)

publications_spark.show(truncate = False)

+------------------------+-----------------------------------------------------------------------------------------------+----+----------+----------+---------+----+------+-----+-------------+--------------------------------+------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
# Command 2
# Add as reference a publication with _id = 53e9978ab7602d9701f4b2cc to a publication with _id = 53e99784b7602d9701f3e151

publications_spark = publications_spark.withColumn("references",
      when(col('_id') == '53e99784b7602d9701f3e151', array_union("references",array(lit('53e9978ab7602d9701f4b2cc'))))
      .otherwise(col("references"))
)

publications_spark.select(col('_id'), col('references')).show(truncate = False)

+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [16]:
# Command 3
# Delete a 'http://dx.doi.org/10.1109/ICDAR.1993.395663' url from all publications 

# previous dataframes
publications_spark.select(col('_id'), col('url')).limit(5).show(truncate = False)
urls_spark.limit(5).show(truncate = False)

url_id = urls_spark.filter(col("url_name")=="http://dx.doi.org/10.1109/ICDAR.1993.395663") \
                .select("url_index").collect()[0][0]

df1 = publications_spark.withColumn("url",
        when(array_contains(publications_spark.url, url_id), array_remove(publications_spark.url,url_id))
        .otherwise(col("url"))
)
                                  
df2 = urls_spark.filter(col("url_index") != url_id)

df1.select(col('_id'), col('url')).limit(5).show(truncate = False)
df2.limit(5).show(truncate = False)

+------------------------+----------+
|_id                     |url       |
+------------------------+----------+
|53e99784b7602d9701f3e151|[0]       |
|53e99784b7602d9701f3e15d|[1, 2]    |
|53e99784b7602d9701f3f411|[3, 4, 5] |
|53e99784b7602d9701f3f5fe|[6, 7, 8] |
|53e99784b7602d9701f3f95d|[9, 10, 8]|
+------------------------+----------+

+-----------------------------------------------------------------------+---------+
|url_name                                                               |url_index|
+-----------------------------------------------------------------------+---------+
|http://dx.doi.org/10.1109/ICDAR.1993.395663                            |0        |
|http://ieeexplore.ieee.org/xpl/abstractAuthors.jsp?tp=&arnumber=1465124|1        |
|http://dx.doi.org/10.1109/ISCAS.2005.1465124                           |2        |
|http://ieeexplore.ieee.org/xpl/abstractAuthors.jsp?tp=&arnumber=1044548|3        |
|http://dx.doi.org/10.1109/CMPSAC.2002.1044548                       

In [17]:
# Command 4
# Update the type of venues from conference to journal

df = venue_spark.withColumn("type",
                            when(col('type') ==  'conference' , 'journal')
                            .otherwise(col("type")))
                                  
df.show(truncate = False)

+------------------------+-------------------------------------------------------------------------------------------------------+-------+-------------------------------------------------------------------------------------------------------+-------------+
|_id                     |name_d                                                                                                 |type   |raw                                                                                                    |publisher    |
+------------------------+-------------------------------------------------------------------------------------------------------+-------+-------------------------------------------------------------------------------------------------------+-------------+
|53a72a4920f7420be8bfa51b|International Conference on Document Analysis and Recognition                                          |journal|ICDAR-1                                                                                    

In [18]:
# Command 5
# Insert Venues data in a dataFrame
venues_schema = StructType([ \
    StructField("_id", StringType(), False), \
    StructField("name_d", StringType(), False), \
    StructField("type", StringType(), False), \
    StructField("raw", StringType(), True), \
    StructField("publisher", StringType(), True) \
])

# Show Venues data
venue_spark = spark.createDataFrame(data=venue, schema=venues_schema) 
venue_spark.show()

# Add a new row to the existing rows and shows data
new_venue = spark.createDataFrame([("1234","new_venue","journal","top_v","team MC LAF")], venues_schema)
appended_venue = new_venue.union(venue_spark)
appended_venue.show()


+--------------------+--------------------+----------+--------------------+-------------+
|                 _id|              name_d|      type|                 raw|    publisher|
+--------------------+--------------------+----------+--------------------+-------------+
|53a72a4920f7420be...|International Con...|conference|             ICDAR-1|      Unknown|
|53a72e2020f7420be...|International Sym...|conference|           ISCAS (3)|      Unknown|
|53a72e9920f7420be...|Computer Software...|conference|             COMPSAC|      Unknown|
|572de199d39c4f499...|Frontiers of Comp...|conference|Frontiers of Comp...|      Unknown|
|53a72v1n0pwh6wt02...|Data & Knowledge ...|   journal|Data & Knowledge ...|North-Holland|
|53a72a3020f7420be...|            SMARTNET|   journal|            SMARTNET|      Unknown|
|53a72i7zedxkq591q...|Telematics and In...|   journal|Telematics and In...|     Pergamon|
|53a727b720f7420be...|          PACIIA (2)|   journal|          PACIIA (2)|      Unknown|
|53a72umwb

# Queries

In [19]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import count, max, min, sum, avg, countDistinct
from pyspark.sql.functions import first, collect_set, collect_list, arrays_zip

In [20]:
# Query 1
# retrieve title of 10 English publications which have at least one keyword containing "method" inside
# WHERE, JOIN, LIKE, LIMIT

publications_spark.filter(col('lang') == 'en') \
        .select(col('title'), explode(col('keywords'))) \
        .withColumnRenamed('col', 'keywords') \
        .join(keywords_spark, col('keywords') == keywords_spark.key_index,'inner') \
        .filter(col("key_name").like("%method%")) \
        .select(col('title'), col('key_name')) \
        .limit(10) \
        .show(truncate = False)

+------------------------------------------------------------------------------------------+------------------------------+
|title                                                                                     |key_name                      |
+------------------------------------------------------------------------------------------+------------------------------+
|Quality-aware collaborative question answering: methods and evaluation                    |design method                 |
|Filtering for uncertain 2-D discrete systems with state delays                            |design method                 |
|Timing yield estimation using statistical static timing analysis                          |design method                 |
|Scalable timers for soft state protocols                                                  |receiver estimation methods   |
|New approach to mixed H2/H&infin; filtering for polytopic discrete-time systems           |developed filter design method|
|Analysi

In [21]:
# Query 2
# return the 5 years with the highest number of publications in the field of "computer science" in descending order
# JOIN, WHERE, GROUP BY, AS, SORT, LIMIT

retrieved_df = publications_spark.select(col('_id'), col('title'), col('year'), explode(col('fos'))) \
                    .withColumnRenamed('col', 'fields of study') \
                    .join(fos_spark, col('fields of study') == fos_spark.fos_index, 'inner') \
                    .select(col('_id'), col('title'), col('year'), col('fos_name'))

retrieved_df.filter(col('fos_name') == 'computer science') \
            .groupBy(col('year')) \
            .agg(count('_id').alias('number of publications')) \
            .sort(col('number of publications').desc()) \
            .limit(5) \
            .show(truncate = False)

+----+----------------------+
|year|number of publications|
+----+----------------------+
|2004|51                    |
|2006|44                    |
|2002|44                    |
|1998|43                    |
|2001|42                    |
+----+----------------------+



In [22]:
# Query 3
# Retrieve the 3 oldest publication published in a venue published by a certain publisher 'North-Holland'
# WHERE, JOIN, LIMIT, 2-STEP-QUERY, AS

venue_df = venue_spark.filter(col('publisher') == 'North-Holland') \
                  .withColumnRenamed('publisher', 'venue_publisher') \
                  .withColumnRenamed('_id', 'id_venue')

retrieved_df = publications_spark.join(venue_df, publications_spark.venue_id == venue_df.id_venue, 'inner') \
                  .sort(col('year').asc()).limit(3) \
                  .select(col("_id"), col("title"), col("year"), col("id_venue"), col("venue_publisher"), col("name_d"))
                    
retrieved_df.show(truncate=False)

+------------------------+--------------------------------------------------------------+----+------------------------+---------------+----------------------------------------+
|_id                     |title                                                         |year|id_venue                |venue_publisher|name_d                                  |
+------------------------+--------------------------------------------------------------+----+------------------------+---------------+----------------------------------------+
|53e9af99b7602d97039e14d1|Fuzzy proximities structures and fuzzy grills                 |1996|539078f320f770854f5a88f2|North-Holland  |Fuzzy Sets and Systems                  |
|53e9a15bb7602d9702a4f6fc|Constraint satisfaction problems: Algorithms and applications |1999|548259f6582fc50b5e138119|North-Holland  |European Journal of Operational Research|
|53e9aacab7602d9703446c42|Analysis of third-party warehousing contracts with commitments|2001|548259f6582fc50b5e138

In [23]:
# Query 4
# Retrieve the authors with more than 2 publications after the year 2000
# WHERE, JOIN, GROUP BY, HAVING, AS

df = publications_spark.filter(col('year') > 2000) \
        .select(col('_id'), explode(col('authors'))) \
        .withColumnRenamed('col', '_orcid') \
        .groupBy(col('_orcid')) \
        .agg(count('_id').alias('num publications')) \
        .filter(col('num publications') > 2) \
        .join(authors_spark, col('_orcid') == authors_spark._id) \
        .select(col('_orcid'), col('num publications'), col('firstname'), col('lastname'))

df.show(truncate=False)

+------------------------+----------------+---------+--------------+
|_orcid                  |num publications|firstname|lastname      |
+------------------------+----------------+---------+--------------+
|53f47b80dabfae8a6845cd1d|3               |Jian     |Pei           |
|56127a6545ce1e5962c3c93a|4               |Dana     |Nau           |
|548effffdabfaef989f0972d|3               |Fabio    |Marton        |
|540837d5dabfae8faa6332a2|5               |Markus   |Gross         |
|53f434b3dabfaeb22f461d7b|3               |Aseem    |Agarwal       |
|53f43b64dabfaefedbaf97e4|5               |Ilias    |Michalarias   |
|5433ca63dabfaebba5827ae0|6               |Rolf     |Niedermeier   |
|53f810d1dabfae8faa4d7ef8|5               |Jean-Luc |Marichal      |
|53f3a90edabfae4b34aeb7d2|3               |Samson   |Lasaulce      |
|53f63533dabfaeeec51b3a95|3               |Shaosheng|Zhou          |
|5405aefadabfae92b41f3a30|7               |Huijun   |Gao           |
|542d255edabfae478c1a0b61|3       

In [24]:
# Query 5
# For each keyword, retrive the author(s) with most publications that contain that keyword

max_keyword = publications_spark.select(col('_id'),col('keywords'), explode(col('authors'))).withColumnRenamed('col', 'author') \
                        .select(col('_id'),col('author'), explode(col('keywords'))).withColumnRenamed('col', 'keyword') \
                        .groupby(col('author'), col('keyword')).agg(count('_id').alias('number')).groupby('keyword').max('number')

df = publications_spark.select(col('_id'),col('keywords'), explode(col('authors'))).withColumnRenamed('col', 'authors') \
                        .select(col('_id'),col('authors'), explode(col('keywords'))).withColumnRenamed('col', 'keywords') \
                        .groupby(col('authors'), col('keywords')).agg(count('_id').alias('number')) \
                        .join(max_keyword, col('keywords') == max_keyword.keyword) \
                        .filter(col('number') == col('max(number)')) \
                        .select(col('authors'), col('keyword'), col('number')) \
                        .join(authors_spark, col('authors') == authors_spark._id) \
                        .select(col('authors'), col('keyword'), col('number'), col('firstname'), col('lastname')) \
                        .join(keywords_spark, col('keyword') == keywords_spark.key_index) \
                        .select(col('authors'), col('keyword'), col('number'), col('firstname'), col('lastname'), col('key_name')) \
                        .sort(col('keyword').asc()) \

df.show(truncate=False)   
max_keyword.show(truncate=False)                    

+------------------------+-------+------+-----------+---------------+-----------------------------+
|authors                 |keyword|number|firstname  |lastname       |key_name                     |
+------------------------+-------+------+-----------+---------------+-----------------------------+
|53f46797dabfaeb22f542630|0      |1     |Jairo      |Rocha          |handwriting recognition      |
|54328883dabfaeb4c6a8a699|0      |1     |Theo       |Pavlidis       |handwriting recognition      |
|53f431f8dabfaee0d9b372ae|1      |2     |Maria-Elena|Nilsback       |shape                        |
|53f46ca8dabfaec09f2584aa|1      |2     |Andrew     |Zisserman      |shape                        |
|54328883dabfaeb4c6a8a699|1      |2     |Theo       |Pavlidis       |shape                        |
|53f43abddabfaeecd6988ae9|1      |2     |David      |W. Jacobs      |shape                        |
|53f4345fdabfaedf4356c5e7|2      |2     |Laurent    |Itti           |feature extraction           |


In [25]:
# Query 6
# Considering the keywords that contain all the words 'np' and 'hard' or all the words 'np' and 'complete' or the word 'algorithm', 
# and retrieve the authors that have contributed to publications having at least one of those keyword and
# that were published in at least 3 different years. For each of those authors, return all the publications with those keywords.
# WHERE, GROUP BY, HAVING, 2 JOINs, LIKE, AS

df = publications_spark.select(col('_id'), col('title'), col('year'), col('keywords'), explode(col('authors'))) \
        .withColumnRenamed('col', '_orcid') \
        .withColumnRenamed('_id', 'pub_id') \
        .select(col('pub_id'), col('title'), col('year'), explode(col('keywords')), col('_orcid')) \
        .withColumnRenamed('col', 'keyword_id') \
        .join(keywords_spark, col('keyword_id') == keywords_spark.key_index, 'inner') \
        .filter(col("key_name").rlike("np.*hard") | col("key_name").rlike("np.*complete") | col("key_name").rlike(".* algorithm.*")) \
        .groupBy(['pub_id', '_orcid']) \
        .agg(collect_set('key_name').alias('keywords'), first("title").alias("title"), first('year').alias('year')) \
        .groupBy('_orcid') \
        .agg(collect_set('title').alias('titles_set'), collect_list('keywords').alias('keywords_set'), countDistinct("year").alias('years')) \
        .filter(col('years') > 2) \
        .join(authors_spark, col('_orcid') == authors_spark._id, 'inner') \
        .withColumn("tmp", arrays_zip("titles_set", "keywords_set")) \
        .withColumn("tmp", explode("tmp")) \
        .select(col('firstname'), col('lastname'), col("tmp.titles_set").alias("title"), col("tmp.keywords_set").alias("keywords"))

df.show(truncate = False)

+---------+--------------+-----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
|firstname|lastname      |title                                                                                          |keywords                                                                                                                    |
+---------+--------------+-----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
|Ron      |Alford        |A hierarchical goal-based formalism and algorithm for single-agent planning                    |[translation algorithm]                                                                                                     |
|Ron    

In [26]:
# Query 7
# retrieve, for each year, the number of publications that their citations are greater than 5, and in their titles exist the word "solution"
# WHERE, GROUP BY, HAVING, AS

df = publications_spark.select(col('_id'),col('n_citation'), col('year'), col('title')) \
      .withColumnRenamed('n_citation', 'citation') \
      .filter(col('citation') > 5) \
      .filter(col("title").like("%solution%")) \
      .groupBy(col('year'))\
      .agg(count('_id').alias('num publications')) 

df.show(truncate = False)

+----+----------------+
|year|num publications|
+----+----------------+
|2003|1               |
|2006|1               |
|1997|2               |
|1994|2               |
|2004|1               |
|1985|1               |
|1995|1               |
|2001|1               |
|1992|1               |
|2005|4               |
|1986|1               |
+----+----------------+



In [27]:
# Query 8
# Retrieve the title of the publications which have less than 4 authors.
# and sort the number of authors in a descending order, the title in an ascending order

from pyspark.sql.functions import explode, col

exploded_df = publications_spark.select(col("title"), explode(publications_spark.authors))

exploded_df.groupBy("title") \
    .agg(count("col").alias("number of authors")) \
    .filter(col("number of authors") < 4) \
    .sort(col('number of authors').desc(), col('title').asc()) \
    .show(truncate = False)


+-----------------------------------------------------------------------------------------------------+-----------------+
|title                                                                                                |number of authors|
+-----------------------------------------------------------------------------------------------------+-----------------+
|"GrabCut": interactive foreground extraction using iterated graph cuts                               |3                |
|/spl Hscr//sub /spl infin// filtering for continuous-time linear systems with delay                  |3                |
|3-D Gradient Coil Design—Initial Theoretical Framework                                               |3                |
|A                                                                                                    |3                |
|A Case Study of Case-Based CBR                                                                       |3                |
|A Fast Neighbor Discove

In [28]:
# Query 9
# Retrieve the nb_citations and the titles of the 11 publications
# with the least number of pages published in a journal 


retrieved_df = venue_spark.select(col('type'),col('_id')) \
               .withColumnRenamed('_id', '_id_journals') \
               .filter(col("type") == "journal") \
               .join(publications_spark, col('_id_journals') == publications_spark.venue_id)\
               .select(col('page_start'), col('page_end'), col('n_citation'),col('title'), col('type'))

retrieved_df = retrieved_df.withColumn('nb_pages', (col('page_end') - col('page_start') + 1)) \
               .sort(col('nb_pages').asc()) \
               .filter(col('nb_pages').isNotNull()) \
               .limit(11) \
               .select(col('type'), col('title'), col('nb_pages'), col('n_citation')) \
               .show(truncate=False)


+-------+------------------------------------------------------------------------------------------------------+--------+----------+
|type   |title                                                                                                 |nb_pages|n_citation|
+-------+------------------------------------------------------------------------------------------------------+--------+----------+
|journal|Fuzzy logic, neural networks and soft computing                                                       |1.0     |73        |
|journal|GCP                                                                                                   |1.0     |1         |
|journal|A Semi-Fragile Watermark Scheme For Image Authentication                                              |1.0     |71        |
|journal|The Complex Nature of e-Government Projects: A Case Study of Bhoomi, an Initiative in Karnataka, India|2.0     |6         |
|journal|Vulnerability of water quality in intensively developing urb

In [29]:
# Query 10
# Retrieve all the publications that have been published in the first or second edition of a volume of the journal
# (issue = 1 or issue == 2) and that have a reference to another publication published by North-Holland
# WHERE, IN, Nested Query

# Get the venues published by the North-Holland agency and collect the ids in a list
north_holland_ids = venue_spark.filter(col('publisher') == 'North-Holland') \
                      .withColumnRenamed('_id', 'venue') \
                      .join(publications_spark, col('venue') == publications_spark.venue_id, 'inner') \
                      .select(collect_list("_id"), collect_list('title')).collect()[0][0]

# Get the publications that have issue 1 or 2 and that have a vene_id that is in the
# previously retrieved list
df = publications_spark.select(col('_id'), col('title'), col('issue'), col('volume'), col('references')) \
            .filter(((col('issue') ==  '1') | (col('issue') == '2') | col('issue').rlike('[12]-.*')) & col('volume').isNotNull()) \
            .select(col('_id'), col('title'), col('issue'), col('volume'), explode(col('references'))) \
            .withColumnRenamed('col', 'reference') \
            .filter(col('reference').isin(north_holland_ids)) \
            .sort(col('title').desc())

df.show(truncate=False)


+------------------------+---------------------------------------------------------------------------------------------------------+-----+------+------------------------+
|_id                     |title                                                                                                    |issue|volume|reference               |
+------------------------+---------------------------------------------------------------------------------------------------------+-----+------+------------------------+
|53e9984fb7602d9702084b0d|The SAT2002 competition                                                                                  |1-4  |43    |53e9b9adb7602d970459fe66|
|53e9978ab7602d9701f4a085|N                                                                                                        |2    |156   |53e9aacab7602d9703446c42|
|53e9978db7602d9701f4f82e|H                                                                                                        |2    |44    |