#### Extract (ETL)

In [0]:
stack = []
countOfObjects = 0
countFiles = 0
output =open("dblpv13_" + str(countFiles) + ".json", "w", encoding="utf8")
with open("dblpv13.json", "r", encoding="utf8") as input:
        while countFiles != 1000:
                line = input.readline()
                if not line:
                    break

                getNumberIntRegex = re.compile(r'NumberInt\(\d+\)')
                getIntRegex = re.compile(r'\d+')
                numberInt = getNumberIntRegex.search(line)

                if numberInt:
                    number = getIntRegex.search(numberInt.group(0))
                    line = re.sub(r"NumberInt\(\d+\)", number.group(0), line)

                checkOpenBracketRegex = re.compile(r'".*{.*"')
                checkCloseBracketRegex = re.compile(r'".*}.*"')
                try:

                    if  '{' in line and not checkOpenBracketRegex.search(line):
                        stack.append('{')

                    if '}' in line and not checkCloseBracketRegex.search(line):
                        stack.pop()
                        if not stack:
                            countOfObjects += 1
                            print('object:',countOfObjects,', file: ',countFiles)

                except:
                    print(line)
                    break

                if countOfObjects == 25000:
                    output.write("}]")
                    countFiles += 1
                    output = open("dblpv13_" + str(countFiles) + ".json", "w", encoding="utf8")
                    countOfObjects = 0
                    output.write("[")
                else:
                    output.write(line)

#### Reading first 10 files each file has 25000 objects as a dataframe

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

import requests
import difflib


# File location and type
file_location = "/FileStore/tables/dblpv13/"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read \
      .option("inferSchema", "true") \
      .option("multiline", "true") \
      .json(file_location)


#### Transform (ETL):
   1. drop publications with one word title.
   2. drop publications if ISSN has wrong value.
   3. drop publications doesn't have authors.
   4. drop publications have wrong year.

In [0]:
df = df.where(size(split(col('title'),' ')) != 1 ) \
       .where(size(col('authors')) != 0) \
       .where(col("issn").rlike("[0-9]+-[0-9]+"))\
       .where(col("year") != 0)

#### Transform (ETL): (cont.)
   5. add attribute doc_type.

In [0]:
df = df.withColumn("doc_type", \
         when(col('venue.raw').rlike(".*@.*"), lit("Conference and Workshop Papers")) \
         .when((col('volume') !="") | (col('issue') !=""), lit("Journal Articles")) \
         .otherwise(lit("conference Papers")))

#### explore number of citations
  1. display minimum.
  2. display maximum.
  3. display mean.

In [0]:
df.selectExpr('min(n_citation)','max(n_citation)','mean(n_citation)').show()

+---------------+---------------+-----------------+
|min(n_citation)|max(n_citation)| mean(n_citation)|
+---------------+---------------+-----------------+
|            0.0|        60891.0|47.87682848579853|
+---------------+---------------+-----------------+



#### delete n_citations outliers

In [0]:
Q1_n_citation, Q2_n_citation, Q3_n_citation = df.approxQuantile("n_citation", [0.25, 0.5, 0.75], 0)

IQR_n_citation = Q3_n_citation - Q1_n_citation
lower_n_citation = Q1_n_citation - 1.5 * IQR_n_citation
upper_n_citation = Q3_n_citation + 1.5 * IQR_n_citation

print("first Quantile: ",Q1_n_citation)
print("second Quantile: ",Q2_n_citation)
print("third Quantile: ",Q3_n_citation)
print("IQR: ",IQR_n_citation)
print("Lower bound: ",lower_n_citation)
print("Upper bound: ",upper_n_citation)

first Quantile:  2.0
second Quantile:  11.0
third Quantile:  36.0
IQR:  34.0
Lower bound:  -49.0
Upper bound:  87.0


In [0]:
df = df.where((col('n_citation') > lower_n_citation) & (col('n_citation') < upper_n_citation))

In [0]:
df.selectExpr('min(n_citation)','max(n_citation)','mean(n_citation)').show()

+---------------+---------------+------------------+
|min(n_citation)|max(n_citation)|  mean(n_citation)|
+---------------+---------------+------------------+
|              0|             86|16.201320460502306|
+---------------+---------------+------------------+



#### Transform (ETL): (cont.)
##### flatting authors to author and add the author rank

In [0]:
df_author = df.select(posexplode('authors').alias("author_rank", "author"),col('*'))

####  Transform (ETL): (cont.)
##### Computing H-index of an author, journal, organization
  1. calculate h_index for each author.
  2. calculate h_index for each organization.
  3. calculate h_index for each journal.

In [0]:
def calculate_H_index(df, window, column_name):
    df = df.withColumn("index", row_number().over(window)) \
           .withColumn('count_h_index', when(col('n_citation') > col('index'), col('index')).otherwise(lit(0))) \
           .withColumn(column_name, max(col("count_h_index")).over(window)) \
           .drop('count_h_index') \
           .drop('index')
    return df


In [0]:
windowHindex  = Window.partitionBy("author.name").orderBy(col("n_citation").desc())

df_author = calculate_H_index(df_author, windowHindex, 'h_index_author')

In [0]:
windowHindex  = Window.partitionBy("author.org").orderBy(col("n_citation").desc())

df_author = calculate_H_index(df_author, windowHindex, 'h_index_organization')

In [0]:
windowHindex  = Window.partitionBy("venue.raw").orderBy(col("n_citation").desc())

df_author = calculate_H_index(df_author, windowHindex, 'h_index_journal')

In [0]:
df_author.select(col('n_citation'),col('author.name'),col('h_index_author'),col('h_index_organization'),col('h_index_journal')).show(5)

+----------+-----------------+--------------+--------------------+---------------+
|n_citation|             name|h_index_author|h_index_organization|h_index_journal|
+----------+-----------------+--------------+--------------------+---------------+
|        74|     Damrong Guoy|             1|                   1|              4|
|        74|   Edgar A. Ramos|             1|                   1|              4|
|        74|   Anil N. Hirani|             1|                   1|              4|
|        74|   Evan VanderZee|             1|                   1|              4|
|        47|Sébastien Tixeuil|             1|                   2|              6|
+----------+-----------------+--------------+--------------------+---------------+
only showing top 5 rows



#### Transform (ETL): (cont.)
##### get sample from data to resolve problems on it.
we do that because the data is very large and apis blocking me.
  1. filter data by year equals 2006 and n_citation equals 50.
  2. get gender for each author by first name.
  3. resolving ambiguous author names.
  4. resolving ambiguous journal names.
  5. Refining venues.

In [0]:
df_author_sample = df_author.where((col('year') == '2006') & (col('n_citation') == 50))

In [0]:
@udf("string")
def getGender(name):
    gender = ''
    if name:
        fname = name.split()[0]
        
        response = requests.get("https://gender-api.com/get?name="+fname+"&key=YSD2kk8ZQr3eQFTnuDPzrfoQCTTUq89rgKph")
        if response:
            gender = response.json()['gender']
            
    return gender

In [0]:
df_author_sample = df_author_sample.withColumn("gender", getGender(col('author.name')))
df_author_sample.select(col('author.name'),col('gender')).show()

+-----------------+-------+
|             name| gender|
+-----------------+-------+
|Anatoli Degtyarev|   male|
|   Michael Fisher|   male|
|      Boris Konev|   male|
|   Ghalib A. Shah|   male|
|  Muslim Bozyiğit|unknown|
|    Özgür B. Akan|   male|
|  Buyurman Baykal|unknown|
| Michael McCarthy|   male|
|        Paul Duff|   male|
|   Henk L. Muller|   male|
|    Cliff Randell|   male|
+-----------------+-------+



In [0]:
@udf("string")
def verifyAuthorName(authorName):
    authorsName = []
    URL_author = "http://dblp.org/search/author/api?q=" + authorName.replace(" ", "+") + "&format=json"
    
    r = requests.get(url = URL_author)
    authors = r.json()
    authors = authors['result']['hits']
    totalhits = authors['@total']

    if int(totalhits) > 1:
        for person in authors['hit']:
            authorName = person['info']['author']
            
    return authorName

In [0]:
df_author_sample = df_author_sample.withColumn("resolving_author_name", verifyAuthorname(col('author.name')))
df_author_sample.select(col('author.name'),col('resolving_author_name')).show(5)

+-----------------+---------------------+
|             name|resolving_author_name|
+-----------------+---------------------+
|Anatoli Degtyarev|    Anatoli Degtyarev|
|   Michael Fisher|    Michael W. Fisher|
|      Boris Konev|          Boris Konev|
|   Ghalib A. Shah|       Ghalib A. Shah|
|  Muslim Bozyiğit|      Muslim Bozyigit|
+-----------------+---------------------+
only showing top 5 rows



In [0]:
@udf("string")
def verifyTitleName(title):

    URL_publ = "http://dblp.org/search/publ/api?q=" + title.replace(" ", "+") + "&format=json"
    r = requests.get(url = URL_author)
    
    publs = r.json()
    publs = publs['result']['hits']
    totalhits = publs['@total']

    if int(totalhits) > 1:
        for publ in publs['hit']:
            title = publ['info']['title']
            
    return title

In [0]:
df_author_sample = df_author_sample.withColumn("resolving_title_name", verifyTitleName(col('title')))
df_author_sample.select(col('title'),col('resolving_title_name')).show(5)

+--------------------+--------------------+
|               title|resolving_title_name|
+--------------------+--------------------+
|Monodic temporal ...|Monodic temporal ...|
|Monodic temporal ...|Monodic temporal ...|
|Monodic temporal ...|Monodic temporal ...|
|Real-Time coordin...|Real-Time coordin...|
|Real-Time coordin...|Real-Time coordin...|
+--------------------+--------------------+
only showing top 5 rows



In [0]:
@udf("string")
def verifyVenueName(raw):
    URL_venue = "http://dblp.org/search/venue/api?q=" + raw.replace(" ", "+") + "&format=json"
    r = requests.get(url = URL_venue)
    
    publs = r.json()
    publs = publs['result']['hits']
    totalhits = publs['@total']

    if int(totalhits) > 1:
        for publ in publs['hit']:
            raw = publ['info']['venue']
            
    return raw

In [0]:
df_author_sample = df_author_sample.withColumn("resolving_venue_raw", verifyVenueName(col('venue.raw')))
df_author_sample.select(col('venue.raw'),col('resolving_venue_raw')).show(5)

+--------------------+--------------------+
|                 raw| resolving_venue_raw|
+--------------------+--------------------+
|ACM Trans. Comput...|ACM Trans. Comput...|
|ACM Trans. Comput...|ACM Trans. Comput...|
|ACM Trans. Comput...|ACM Trans. Comput...|
|              NEW2AN|              NEW2AN|
|              NEW2AN|              NEW2AN|
+--------------------+--------------------+
only showing top 5 rows



#### Transform (ETL): (cont.)
##### normalize field of study.

In [0]:
scientific_disciplines = spark.read\
                        .format("csv")\
                        .option("header", "true")\
                        .option("inferSchema", "true")\
                        .load("/FileStore/tables/scientific_disciplines.csv")

In [0]:
df_author = df_author.withColumn("fos",explode('fos'))

In [0]:
df_author = df_author.join(scientific_disciplines,scientific_disciplines.fos_lock ==  df_author.fos,"left")

In [0]:
df_author = df_author.withColumn("fos", \
         when(col('fos_lock').isNotNull(), col('fos_lock')) \
         .otherwise(col('fos')))

#### Transform (ETL): (cont.)
##### flatting keywords.

In [0]:
df_author = df_author.withColumn("keywords",explode('keywords'))

#### Load (ETL)
##### Build schema
  1. authors table.
  1. venues table.
  1. orgs table.
  1. publications table.
  1. fos table.
  1. keywords table.
  1. languages table.
  1. dates table.
  1. fact table.

In [0]:
authors = df_author.select(
    col('author._id').alias('author_id'),
    col('author.name').alias('name'),
    col('h_index_author').alias('h_index'),
    col('author_rank').alias('rank')
)\
.dropDuplicates()\
.where(col('author_id').isNotNull())

In [0]:
venues = df_author.select(
    col('venue._id').alias('venue_id'),
    col('venue.type').alias('type'),
    col('venue.raw').alias('name'),
)\
.dropDuplicates()\
.where(col('venue_id').isNotNull())

In [0]:
orgs = df_author.select(
    col('author.orgid').alias('org_id'),
    col('author.org').alias('type'),
    col('h_index_organization').alias('h_index'),
)\
.dropDuplicates()\
.where(col('org_id').isNotNull())

In [0]:
publications = df_author.select(
    col('_id').alias('publication_id'),
    col('title').alias('title'),
    col('volume').alias('volume'),
    col('n_citation').alias('n_citation'),
).dropDuplicates()\
.where(col('publication_id').isNotNull())

In [0]:
fos = df_author.select(
    col('fos').alias('text'),
).dropDuplicates()\
.withColumn("fos_id",monotonically_increasing_id())

In [0]:
keywords = df_author.select(
    col('keywords').alias('text'),
).dropDuplicates()\
.withColumn("keyword_id",monotonically_increasing_id())

In [0]:
languages = df_author.select(
    col('lang').alias('text'),
).dropDuplicates()\
.withColumn("language_id",monotonically_increasing_id())

In [0]:
dates = df_author.select(
    col('year'),
).dropDuplicates()\
.withColumn("date_id",monotonically_increasing_id())

In [0]:
fact = df_author\
.join(dates,dates.year ==  df_author.year,"inner")\
.join(languages,languages.text ==  df_author.lang,"inner")\
.join(keywords,keywords.text ==  df_author.keywords,"inner")\
.join(fos,fos.text ==  df_author.fos,"inner")\
.select(
    col('date_id'),
    col('language_id'),
    col('keyword_id'),
    col('fos_id'),
    col('_id').alias('publication_id'),
    col('author._id').alias('author_id'),
    col('author.orgid').alias('org_id'),
    col('venue._id').alias('venue_id'),
)\
  .dropDuplicates()\
  .where(col('publication_id').isNotNull())\
  .where(col('author_id').isNotNull())\
  .where(col('org_id').isNotNull())\
  .where(col('venue_id').isNotNull())

In [0]:
fact.show(5)

+-------+-----------+-----------+------+--------------------+--------------------+--------------------+--------------------+
|date_id|language_id| keyword_id|fos_id|      publication_id|           author_id|              org_id|            venue_id|
+-------+-----------+-----------+------+--------------------+--------------------+--------------------+--------------------+
|      2|          0|       9252|     0|53e99959b7602d970...|53f36e0edabfae4b3...|5f71b3d61c455f439...|555036d27cea80f95...|
|      2|          0| 8589944212|     0|53e99959b7602d970...|53f36e0edabfae4b3...|5f71b3d61c455f439...|555036d27cea80f95...|
|      2|          0|17179877087|     0|53e99959b7602d970...|53f36e0edabfae4b3...|5f71b3d61c455f439...|555036d27cea80f95...|
|      2|          0|25769812622|     0|53e99959b7602d970...|53f36e0edabfae4b3...|5f71b3d61c455f439...|555036d27cea80f95...|
|      2|          0|25769811609|     0|53e99959b7602d970...|53f43bb4dabfaedf4...|5f71b5001c455f439...|555036d27cea80f95...|


#### Load (ETL).
##### Initial population of dimension tables as Delta files.

In [0]:
authors.write.format("delta").mode("append").save("/delta_publications/authors/")
venues.write.format("delta").mode("append").save("/delta_publications/venues/")
fos.write.format("delta").mode("append").save("/delta_publications/fos/")
keywords.write.format("delta").mode("append").save("/delta_publications/keywords/")
orgs.write.format("delta").mode("append").save("/delta_publications/orgs/")
publications.write.format("delta").mode("append").save("/delta_publications/publications/")
dates.write.format("delta").mode("append").partitionBy("year").save("/delta_publications/dates/")
languages.write.format("delta").mode("append").save("/delta_publications/languages/")
fact.write.format("delta").mode("append").save("/delta_publications/fact/")

#### Load (ETL).
##### Loading dimension tables.

In [0]:
authors = spark.read.format("delta").load("/delta_publications/authors/")
fos = spark.read.format("delta").load("/delta_publications/fos/")
venues = spark.read.format("delta").load("/delta_publications/venues/")
keywords = spark.read.format("delta").load("/delta_publications/keywords/")
orgs = spark.read.format("delta").load("/delta_publications/orgs/")
publications = spark.read.format("delta").load("/delta_publications/publications/")
languages = spark.read.format("delta").load("/delta_publications/languages/")
dates = spark.read.format("delta").load("/delta_publications/dates/")
fact = spark.read.format("delta").load("/delta_publications/fact/")

#### Queries
  1. get count of publications for each year.
  1. top-5 authors based on e.g. their H-index that publish in a Genetic programming.

In [0]:
fact.join(dates,fact.date_id ==  dates.date_id,"inner").groupBy("year").count().show(5)

+----+------+
|year| count|
+----+------+
|1958|    45|
|1983|  5142|
|1972|   144|
|2007|322738|
|1979|  1960|
+----+------+
only showing top 5 rows



In [0]:
fact.join(authors,"author_id","inner")\
     .join(fos,"fos_id","inner")\
     .where(fos.text == 'Genetic programming')\
     .orderBy(col("h_index").desc())\
     .groupBy("name", "author_id")\
     .agg(first("h_index").alias('h_index'),first("text").alias('fos'))\
     .orderBy(col("h_index").desc())\
     .limit(5)\
     .show()

+-------------------+--------------------+-------+-------------------+
|               name|           author_id|h_index|                fos|
+-------------------+--------------------+-------+-------------------+
|Francesca Gasparini|540955cfdabfae8fa...|     17|Genetic programming|
| Raimondo Schettini|5409688bdabfae450...|     17|Genetic programming|
|      Ajith Abraham|53f483a0dabfaeb1a...|      6|Genetic programming|
|           Jing Liu|542a2155dabfae61d...|      4|Genetic programming|
|      Riccardo Poli|5433d312dabfaeb4c...|      4|Genetic programming|
+-------------------+--------------------+-------+-------------------+



#### Operations
  1. we added 2 files each file has 25000 objects in dblpv13_incremental folder.
  1. apply the same transformation to new files (I used the same transformation cells above).
  1. create the same schema for new files (I used the same schema cells above).

In [0]:
# File location and type
file_location = "/FileStore/tables/dblpv13_incremental/"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read \
      .option("inferSchema", "true") \
      .option("multiline", "true") \
      .json(file_location)


#### Operations
  1. i used append mode to add new data to an Delta table.
  1. trigger incremental updates.

In [0]:
authors.write.format("delta").mode("append").save("/delta_publications/authors/")
venues.write.format("delta").mode("append").save("/delta_publications/venues/")
fos.write.format("delta").mode("append").save("/delta_publications/fos/")
keywords.write.format("delta").mode("append").save("/delta_publications/keywords/")
orgs.write.format("delta").mode("append").save("/delta_publications/orgs/")
publications.write.format("delta").mode("append").save("/delta_publications/publications/")
dates.write.format("delta").mode("append").partitionBy("year").save("/delta_publications/dates/")
languages.write.format("delta").mode("append").save("/delta_publications/languages/")
fact.write.format("delta").mode("append").save("/delta_publications/fact/")

In [0]:
authors = spark.read.format("delta").load("/delta_publications/authors/")
fos = spark.read.format("delta").load("/delta_publications/fos/")
venues = spark.read.format("delta").load("/delta_publications/venues/")
keywords = spark.read.format("delta").load("/delta_publications/keywords/")
orgs = spark.read.format("delta").load("/delta_publications/orgs/")
publications = spark.read.format("delta").load("/delta_publications/publications/")
languages = spark.read.format("delta").load("/delta_publications/languages/")
dates = spark.read.format("delta").load("/delta_publications/dates/")
fact = spark.read.format("delta").load("/delta_publications/fact/")

In [0]:
fact.join(dates,fact.date_id ==  dates.date_id,"inner").groupBy("year").count().show(5)

+----+-------+
|year|  count|
+----+-------+
|1958|     45|
|1983|  13676|
|1972|    816|
|2007|1025186|
|1979|   6508|
+----+-------+
only showing top 5 rows



In [0]:
display(spark.sql("DROP TABLE IF EXISTS authors"))

display(spark.sql("CREATE TABLE authors USING DELTA LOCATION '/delta_publications/authors/'"))

display(spark.sql("DESCRIBE HISTORY authors"))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2022-04-26T17:37:00.000+0000,6304324629211577,ahsaeed@nu.edu.eg,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(914321848334856),0426-154314-7t1q0zqc,0.0,WriteSerializable,True,"Map(numFiles -> 2, numOutputRows -> 42488, numOutputBytes -> 1279569)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-04-26T16:25:18.000+0000,6304324629211577,ahsaeed@nu.edu.eg,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(914321848334856),0426-154314-7t1q0zqc,,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 73063, numOutputBytes -> 2229609)",,Databricks-Runtime/10.4.x-scala2.12
