In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType, LongType
from pyspark.sql.functions import col                       # Filtering using the col() function
from pyspark.sql.functions import array_contains            # Filtering on array columns
from pyspark.sql.functions import explode                   # Explode Arrays in Individual Rows
from pyspark.sql.functions import sum, avg, count, max      # Multiple Aggregations
from pyspark.sql.functions import first, last	
from pyspark.sql.functions import array_union               # Union of 2 array without duplicates
from pyspark.sql.functions import lit, array
from pyspark.sql.functions import when


# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("MyFirstSparkApplication") \
      .getOrCreate()

In [None]:
#IMPORT

df_book = spark.read.option("multiline","true")  \
      .json("Datasets (json)/book-db.json")

df_article = spark.read.option("multiline","true")  \
      .json("Datasets (json)/article-db.json")

df_incollection = spark.read.option("multiline","true")  \
      .json("Datasets (json)/incollection-db.json")

df_www = spark.read.option("multiline","true")  \
      .json("Datasets (json)/www-db.json")

### 1. Most cited AUTHORS in a given field
The following query can be used to find the authors, and their website (if existing), who
wrote about a given field (e.g. "data mining") ordering them by the sum of citations they
received for thoose publications.

In [None]:
exploded_df_article = df_article.filter(array_contains(col("keyword"), "data mining"))  \
    .select(df_article.key, explode(df_article.author), df_article.keyword, df_article.citations  )    \
    .withColumnRenamed("col", "author")
# exploded_df_article.printSchema()
# exploded_df_article.show()

exploded_df_www = df_www  \
    .select(df_www.key, explode(df_www.author), df_www.url)    \
    .withColumnRenamed("col", "author")
# exploded_df_www.printSchema()
# exploded_df_www.show()

df_top_author = exploded_df_article.groupBy("author").agg(
    sum("citations").alias("Sum of Citations"), 
    avg("citations").alias("Average Citations"), 
    count("citations").alias("Number of Paper"),
    max("citations").alias("Max Citations"))    \
    .sort(col("Sum of Citations").desc()).limit(5)
# df_top_author.printSchema()
# df_top_author.show()

df_website = df_www.select(explode(df_www.author), df_www.url)    \
    .withColumnRenamed("col", "author_w")   \
    .withColumnRenamed("url", "website")
# df_website.printSchema()
# df_website.show()


result = df_top_author.join(df_website, df_top_author.author == df_website.author_w, "left")  \
    .drop("author_w")
    

result.printSchema()    
result.show()

### 2. Add a KEYWORD to a publications
here we can see a specific
example with the binding of the keyword "machine learning" to books in the "Intelligent
Systems Reference Library" series from volume 85 to volume 100

In [None]:
# Show before update
df_book\
    .filter(df_book.series.title == "Intelligent Systems Reference Library")\
    .filter(df_book.volume >= 85)\
    .filter(df_book.volume <= 100)\
    .select("key","keyword","volume",df_book.series.title)\
    .sort("volume")\
    .show(truncate=False)

# filter and add a support column with the desired keyword
df_book_update = df_book    \
    .withColumn("add", 
        when( 
            (df_book.series.title == "Intelligent Systems Reference Library") & 
            (df_book.volume >= 85) & (df_book.volume <= 100),   \
            array(lit("machine learning"))
            )\
        .otherwise(array(lit(None)))
        )

# merge the keywords column with the suuport one, and then drop the latter
df_book_update = df_book_update\
    .withColumn("keyword", array_union(df_book_update.keyword, df_book_update.add))\
    .drop("add")

# Show after update
df_book_update\
    .filter(df_book_update.series.title == "Intelligent Systems Reference Library")\
    .filter(df_book_update.volume >= 85)\
    .filter(df_book_update.volume <= 100)\
    .select("key","keyword","volume", df_book_update.series.title)\
    .sort("volume")\
    .show(truncate=False)

### 3. Add a new ARTICLE
The following command can be used to Add a new article to volume 51 of journal "Commun. ACM". 

In [None]:
# Define the schema of the element to insert
schema= StructType([
      # author field
      StructField("author",ArrayType(StructType([
       StructField("orcid", StringType(), True),
       StructField("name", StringType(), True)
       ])),True),
      # simple strings
      StructField("key",StringType(),True),
      StructField("title",StringType(),True),
      StructField("publisher",StringType(),True),
      StructField("journal",StringType(),True),
      # array of strings
      StructField("keyword",ArrayType(StringType()), True),
      StructField("ee",ArrayType(StringType()), True),
      StructField("cite",ArrayType(StringType()), True),
      StructField("note",ArrayType(StringType()), True),
      # simple integers
      StructField("year",LongType(),True),
      StructField("citations",LongType(),True),
      StructField("pages",LongType(),True),
      StructField("volume",LongType(),True),
      ])

# Upload the desired data
data = [(
            [("Author Orcid 1","Author Name 1"),("Author Orcid 2","Author Name 2")], 
            
            "NewArticleKey", 
            "Article Title",
            "Publisher Name",
            "Journal Name",

            ["kw1","kw2"],
            # ee = null 
            None,
            # cite = null
            None,
            # note = null
            None,

            2022,
            1,
            10,
            100
      )]

newArticle= spark.createDataFrame(data = data, schema = schema)
df_article =df_article.unionByName(newArticle, allowMissingColumns=True)

#check
df_article.filter(df_article.key == "NewArticleKey").show(truncate=False)