<a href="https://colab.research.google.com/github/dymsson/spark-notebooks/blob/main/DBLP_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# installing Spark in Google Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
import findspark
findspark.init()

In [2]:
# creating Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [3]:
# providing access to Google drive from Colab
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


# **DBLP dataset**

## **A. Spark RDDs**

Analyse the file `author-large.txt` from the DBLP dataset.  We want to find the top 10 pairs of authors who published the largest number of papers together (with possible other collaborators). For example, if authors $a$, $b$ and $c$ published a paper with title $t$, then this contributes one joint publication for each author pair ($a$,$b$), ($b$,$c$) and ($a$,$c$). Use the first column of the input data for the author names and use the third column of the input data for the publication title. 

Task to be solved using RDD operations in PySpark. It is very important that the code is clear and well documented

In [4]:
# read a file from the local file system into RDD
myRDD = spark.sparkContext.textFile("/content/drive/MyDrive/Colab Notebooks/author-large.txt")
# with tab as a delimiter
myRDD = myRDD.map(lambda x: x.split("\t"))
# number of rows in rdd
myRDD.count()

2225370

In [5]:
# transform RDD to the form (publication title , author name). x[2] - publication title (key), x[0] - author name. 
myRDD = myRDD.map(lambda x: (x[2], x[0]))
myRDD.take(5)

[('Object SQL - A Language for the Design and Implementation of Object Databases.',
  'Jurgen Annevelink'),
 ('Object SQL - A Language for the Design and Implementation of Object Databases.',
  'Rafiul Ahad'),
 ('Object SQL - A Language for the Design and Implementation of Object Databases.',
  'Amelia Carlson'),
 ('Object SQL - A Language for the Design and Implementation of Object Databases.',
  'Daniel H. Fishman'),
 ('Object SQL - A Language for the Design and Implementation of Object Databases.',
  'Michael L. Heytens')]

Add all kinds of pairs of co-authors to RDD

In [6]:
# similar to sql query:
# SELECT r1.title, r1.author_name, r2.author_name FROM myRDD r1, myRDD r2 WHERE r1.title = r2.title
myRDD = myRDD.join(myRDD)
myRDD.take(5)

[('Cooperative Transactions for Multiuser Environments.',
  ('Gail E. Kaiser', 'Gail E. Kaiser')),
 ('Physical Object Management.', ('Alfons Kemper', 'Alfons Kemper')),
 ('Physical Object Management.', ('Alfons Kemper', 'Guido Moerkotte')),
 ('Physical Object Management.', ('Guido Moerkotte', 'Alfons Kemper')),
 ('Physical Object Management.', ('Guido Moerkotte', 'Guido Moerkotte'))]

The RDD now contains duplicate tuples (author, co-author) and (co-author, author), as well as tuples containing only one author (author, author). Delete them

In [7]:
# to do this, save only those raws where the author is located before the co-author in alphabetical order
myRDD = myRDD.filter(lambda x: x[1][0] < x[1][1])
myRDD.take(5)

[('Physical Object Management.', ('Alfons Kemper', 'Guido Moerkotte')),
 ('Pogo: A Declarative Representation System for Graphics.',
  ('Mark A. Tarlton', 'P. Nong Tarlton')),
 ('The Commercial INGRES Epilogue.',
  ('Lawrence A. Rowe', 'Michael Stonebraker')),
 ('Temporal Databases: A Prelude to Parametric Data.',
  ('Shashi K. Gadia', 'Sunil S. Nair')),
 ('Stream Processing: Temporal Query Processing and Optimization.',
  ('Richard R. Muntz', 'T. Y. Cliff Leung'))]

Transform RDD to the form (author name, co-author name), 1). x[1] - tuple (author name, co-author name) (new key).

**1** corresponds to a joint publication. Is needed for the subsequent calculation of the number of joint publications

In [8]:
myRDD = myRDD.map(lambda x: (x[1], 1))
myRDD.take(5)

[(('Alfons Kemper', 'Guido Moerkotte'), 1),
 (('Mark A. Tarlton', 'P. Nong Tarlton'), 1),
 (('Lawrence A. Rowe', 'Michael Stonebraker'), 1),
 (('Shashi K. Gadia', 'Sunil S. Nair'), 1),
 (('Richard R. Muntz', 'T. Y. Cliff Leung'), 1)]

In [9]:
# count the amount of papers together for each pair of authors
myRDD = myRDD.reduceByKey(lambda x, y: x + y)

# sort by number of papers together in descending order
myRDD = myRDD.sortBy(lambda x: x[1], False)

In [10]:
# list of the top 10 pairs of authors who published the largest number of papers together
myRDD.take(10)

[(('Irith Pomeranz', 'Sudhakar M. Reddy'), 249),
 (('Amr El Abbadi', 'Divyakant Agrawal'), 161),
 (('Makoto Takizawa', 'Tomoya Enokido'), 141),
 (('Didier Dubois', 'Henri Prade'), 122),
 (('Elizabeth Chang', 'Tharam S. Dillon'), 118),
 (('Hyun-Sung Kim', 'Kee-Young Yoo'), 111),
 (('Mary Jane Irwin', 'Narayanan Vijaykrishnan'), 107),
 (('Mahmut T. Kandemir', 'Mary Jane Irwin'), 100),
 (('Chun Chen', 'Jiajun Bu'), 99),
 (('Giuseppe De Giacomo', 'Maurizio Lenzerini'), 99)]

## **B. Spark DataFrame.API**

Do the same analysis as above, but this time use the Spark DataFrame.API.

In [53]:
from pyspark.sql.functions import monotonically_increasing_id 

# read a file from the local file system into dataframe with columns (Author, "Subject, Title, Year)
df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/Colab Notebooks/author-large.txt").toDF("Author", "Subject", "Title", "Year")

# sort the table alphabetically by author name
df = df.sort(df.Author)

# add index column
df = df.withColumn("id", monotonically_increasing_id())

# show the dataframe
df.show()

+-------------------+--------------------+--------------------+----+---+
|             Author|             Subject|               Title|Year| id|
+-------------------+--------------------+--------------------+----+---+
|             . Akin|Formal Design Met...|Formal representa...|1994|  0|
|             . Akin|Formal Design Met...|Discussion: Resea...|1994|  1|
|          . Aydemir|                GMDS|Vorgehensweise be...|1994|  2|
|          . Aydemir|                GMDS|Prognosefaktoren ...|1994|  3|
|. Belmonte Fernndez|International Con...|Triangle Strip Mu...|2006|  4|
|      . Chamberland|                HPCS|Performance Measu...|2008|  5|
|  . Erkan Mumcuoglu|                 IDA|Discriminative Re...|2005|  6|
|  . Erkan Mumcuoglu|               TAINN|Protein Solvent A...|2005|  7|
|             . Glat|International Con...|Parallel Implicit...|2003|  8|
|            . K. Co|                PPSC|Three Dimensional...|1997|  9|
|            . Kovcs|Optimization Tech...|On Large 

In [54]:
from pyspark.sql.functions import col

# self-join dataframe to add co-author column
# SELECT b1.Author as First, b2.Author as Second, b2.Title FROM books b1, books b2 WHERE b1.Title = b2.Title AND b1.Id < b2.Id
df = df.alias("b1").join(df.alias("b2"), col("b1.Title") == col("b2.Title"), "Inner").where(col("b1.id") < col("b2.id")).select(col("b1.Author").alias("First"), col("b2.Author").alias("Second"), col("b2.Title"))

# show the dataframe
df.show()

+--------------------+--------------------+--------------------+
|               First|              Second|               Title|
+--------------------+--------------------+--------------------+
| Chutiporn Anutariya|  Rachanee Ungrangsi| combiSQORE: An O...|
| Chutiporn Anutariya|      Vilas Wuwongse| combiSQORE: An O...|
|  Rachanee Ungrangsi|      Vilas Wuwongse| combiSQORE: An O...|
|       Adrian Stoica|Carlos Salazar-La...|"Genetically Engi...|
|       Adrian Stoica|     Gerhard Klimeck|"Genetically Engi...|
|       Adrian Stoica|      Thomas A. Cwik|"Genetically Engi...|
|Carlos Salazar-La...|     Gerhard Klimeck|"Genetically Engi...|
|Carlos Salazar-La...|      Thomas A. Cwik|"Genetically Engi...|
|     Gerhard Klimeck|      Thomas A. Cwik|"Genetically Engi...|
|     Susan F. Stager|       Tad Pinkerton|"Heads Up" for in...|
|     Susan F. Stager|Virginia E. Rezmi...|"Heads Up" for in...|
|       Tad Pinkerton|Virginia E. Rezmi...|"Heads Up" for in...|
| Barbara Koch-Priewe|Wil

In [55]:
from pyspark.sql.functions import desc

# grouping by (author, co-author), counting the number of papers together, sorting in descending order
# SELECT First, Second, COUNT(*) as Papers_together FROM authors GROUP BY First, Second ORDER BY Papers_together DESC
df = df.groupBy(df.First, df.Second).count().sort(desc("count"))

# show the dataframe
df.show(10)

+-------------------+--------------------+-----+
|              First|              Second|count|
+-------------------+--------------------+-----+
|     Irith Pomeranz|   Sudhakar M. Reddy|  249|
|      Amr El Abbadi|   Divyakant Agrawal|  161|
|    Makoto Takizawa|      Tomoya Enokido|  141|
|      Didier Dubois|         Henri Prade|  122|
|    Elizabeth Chang|    Tharam S. Dillon|  118|
|      Hyun-Sung Kim|       Kee-Young Yoo|  111|
|    Mary Jane Irwin|Narayanan Vijaykr...|  107|
| Mahmut T. Kandemir|     Mary Jane Irwin|  100|
|Giuseppe De Giacomo|  Maurizio Lenzerini|   99|
|          Chun Chen|           Jiajun Bu|   99|
+-------------------+--------------------+-----+
only showing top 10 rows



## **C. Spark SQL**

Do the same analysis as above, but this time use the Spark SQL. 

[Spark SQL documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html).

In [56]:
from pyspark.sql.functions import monotonically_increasing_id

# read a file from the local file system into dataframe with columns (Author, "Subject, Title, Year)
dfSQL = spark.read.option("delimiter", "\t").csv('/content/drive/MyDrive/Colab Notebooks/author-large.txt').toDF("Author", "Subject", "Title", "Year")

# register the dataframe as a SQL temporary view
dfSQL.createOrReplaceTempView("books")

# sort the table alphabetically by author name
dfSQL = spark.sql("SELECT * FROM books Order by Author")

# add index column
dfSQL = dfSQL.withColumn("id", monotonically_increasing_id())

# register the dataframe as a SQL temporary view
dfSQL.createOrReplaceTempView("books")

# show the dataframe
dfSQL.show()

+-------------------+--------------------+--------------------+----+---+
|             Author|             Subject|               Title|Year| id|
+-------------------+--------------------+--------------------+----+---+
|             . Akin|Formal Design Met...|Formal representa...|1994|  0|
|             . Akin|Formal Design Met...|Discussion: Resea...|1994|  1|
|          . Aydemir|                GMDS|Vorgehensweise be...|1994|  2|
|          . Aydemir|                GMDS|Prognosefaktoren ...|1994|  3|
|. Belmonte Fernndez|International Con...|Triangle Strip Mu...|2006|  4|
|      . Chamberland|                HPCS|Performance Measu...|2008|  5|
|  . Erkan Mumcuoglu|                 IDA|Discriminative Re...|2005|  6|
|  . Erkan Mumcuoglu|               TAINN|Protein Solvent A...|2005|  7|
|             . Glat|International Con...|Parallel Implicit...|2003|  8|
|            . K. Co|                PPSC|Three Dimensional...|1997|  9|
|            . Kovcs|Optimization Tech...|On Large 

In [57]:
# self-join temporary view to add co-author column to the dataframe
dfSQL = spark.sql("SELECT b1.Author as First, b2.Author as Second, b2.Title FROM books b1, books b2 WHERE b1.Title = b2.Title AND b1.Id < b2.Id")

# register the dataframe as a SQL temporary view
dfSQL.createOrReplaceTempView("authors")

# show the dataframe
dfSQL.show()

+--------------------+--------------------+--------------------+
|               First|              Second|               Title|
+--------------------+--------------------+--------------------+
| Chutiporn Anutariya|  Rachanee Ungrangsi| combiSQORE: An O...|
| Chutiporn Anutariya|      Vilas Wuwongse| combiSQORE: An O...|
|  Rachanee Ungrangsi|      Vilas Wuwongse| combiSQORE: An O...|
|       Adrian Stoica|Carlos Salazar-La...|"Genetically Engi...|
|       Adrian Stoica|     Gerhard Klimeck|"Genetically Engi...|
|       Adrian Stoica|      Thomas A. Cwik|"Genetically Engi...|
|Carlos Salazar-La...|     Gerhard Klimeck|"Genetically Engi...|
|Carlos Salazar-La...|      Thomas A. Cwik|"Genetically Engi...|
|     Gerhard Klimeck|      Thomas A. Cwik|"Genetically Engi...|
|     Susan F. Stager|       Tad Pinkerton|"Heads Up" for in...|
|     Susan F. Stager|Virginia E. Rezmi...|"Heads Up" for in...|
|       Tad Pinkerton|Virginia E. Rezmi...|"Heads Up" for in...|
| Barbara Koch-Priewe|Wil

In [58]:
# grouping by (author, co-author), counting the number of papers together, sorting in descending order
dfSQL = spark.sql("SELECT First, Second, count(*) as Papers_together FROM authors Group by First, Second Order by Papers_together DESC")

# show the answer
dfSQL.show(10)

+-------------------+--------------------+---------------+
|              First|              Second|Papers_together|
+-------------------+--------------------+---------------+
|     Irith Pomeranz|   Sudhakar M. Reddy|            249|
|      Amr El Abbadi|   Divyakant Agrawal|            161|
|    Makoto Takizawa|      Tomoya Enokido|            141|
|      Didier Dubois|         Henri Prade|            122|
|    Elizabeth Chang|    Tharam S. Dillon|            118|
|      Hyun-Sung Kim|       Kee-Young Yoo|            111|
|    Mary Jane Irwin|Narayanan Vijaykr...|            107|
| Mahmut T. Kandemir|     Mary Jane Irwin|            100|
|Giuseppe De Giacomo|  Maurizio Lenzerini|             99|
|          Chun Chen|           Jiajun Bu|             99|
+-------------------+--------------------+---------------+
only showing top 10 rows

