# **Arxiv metadata Analytics with PySpark DF: JSON case study**

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark

### Author: Amin Karami (PhD, FHEA)
#### email: amin.karami@ymail.com

In [None]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

In [1]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [2]:
# import SparkSession
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

spark

23/03/19 20:08:32 WARN Utils: Your hostname, Adrian-Laptop.local resolves to a loopback address: 127.0.0.1; using 192.168.100.19 instead (on interface en0)
23/03/19 20:08:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/19 20:08:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/19 20:08:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/19 20:08:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# Read and Load Data to Spark

df = spark.read.json('data.json')
df.printSchema()



root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



                                                                                

In [4]:
# check the partitions
df.rdd.getNumPartitions()
df.persist()

DataFrame[abstract: string, authors: string, authors_parsed: array<array<string>>, categories: string, comments: string, doi: string, id: string, journal-ref: string, license: string, report-no: string, submitter: string, title: string, update_date: string, versions: array<struct<created:string,version:string>>]

## Question 1: Create a new Schema

In [5]:
from pyspark.sql.types import *

#Define schema
Schema = StructType([
    StructField('authors', StringType(), True),
    StructField('categories', StringType(), True),
    StructField('license', StringType(), True),
    StructField('comments', StringType(), True),
    StructField('abstract', StringType(), True),
    StructField('versions', ArrayType(StringType()), True),

])

print(Schema)

StructType([StructField('authors', StringType(), True), StructField('categories', StringType(), True), StructField('license', StringType(), True), StructField('comments', StringType(), True), StructField('abstract', StringType(), True), StructField('versions', ArrayType(StringType(), True), True)])


## Question 2: Binding Data to a Schema

In [6]:
df = spark.read.json("data.json", schema=Schema)

df.printSchema()
df.show()

root
 |-- authors: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- license: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|             authors|       categories|             license|            comments|            abstract|            versions|
+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|           hep-ph|                null|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|    math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|   physics.gen-ph|                null| 23 pages, 3 figures|

## Question 3: Missing values for "comments" and "license" attributes

In [7]:
#drop 

df= df.dropna(subset=["comments"])

#replace
df = df.fillna(value="unknown", subset=["license"])

df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|          categories|             license|            comments|            abstract|            versions|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|              hep-ph|             unknown|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|       math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|      physics.gen-ph|             unknown| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|
|        David Callan|             math.CO|             unknown|            11 pages|  We show that a ...|[{"version":"v1",...|
|Y. H. Pong and C....|   cond-mat.mes-hall|             unknown|6 pages, 4 figure...|  We study the tw..

In [8]:

df.createOrReplaceTempView("Archive")


## Question 4: Get the author names who published a paper in a 'math' category

In [9]:
#register DF to be use in SparkSQL

sql_query= """
    SELECT 
        authors
    FROM
        Archive
    WHERE
        categories LIKE 'math%'
"""

spark.sql(sql_query).show()

print(spark.sql(sql_query).count())




+--------------------+
|             authors|
+--------------------+
|Ileana Streinu an...|
|        David Callan|
|  Sergei Ovchinnikov|
|Clifton Cunningha...|
|        Koichi Fujii|
|         Norio Konno|
|Simon J.A. Malham...|
|Robert P. C. de M...|
|  P\'eter E. Frenkel|
|          Mihai Popa|
|   Debashish Goswami|
|      Mikkel {\O}bro|
|Nabil L. Youssef,...|
|         Boris Rubin|
|         A. I. Molev|
| Branko J. Malesevic|
|   John W. Robertson|
|     Yu.N. Kosovtsov|
|        Osamu Fujino|
|Stephen C. Power ...|
+--------------------+
only showing top 20 rows





304590


                                                                                

## Question 5: Get linceses with 5 or more letters in the "abstract"

In [None]:
sql_query = """ SELECT distinct(license) FROM Archive
                WHERE abstract REGEXP '%\(([A-Za-z][^_ /\\<>]{5,})\)%'
            """
spark.sql(sql_query).show()



## Question 6: Extract the statistic of the number of pages for unknown licenses

In [None]:
import re
def get_page(line):
    search = re.findall('\d+ pages', line)
    if search:
        return int(search[0].split(" ")[0])
    else :
        return 0
    

spark.udf.register("PageNumbres", get_page)

sql_query1 = """

    SELECT
        AVG(PageNumbres(comments)) AS avg,
        SUM(PageNumbres(comments)) AS sum,
        STD(PageNumbres(comments)) AS std
    FROM
        Archive
    WHERE
        license = 'unknown'

"""


spark.sql(sql_query1).show()

In [11]:
df.head(2)

[Row(authors="C. Bal\\'azs, E. L. Berger, P. M. Nadolsky, C.-P. Yuan", categories='hep-ph', license='unknown', comments='37 pages, 15 figures; published version', abstract='  A fully differential calculation in perturbative quantum chromodynamics is\npresented for the production of massive photon pairs at hadron colliders. All\nnext-to-leading order perturbative contributions from quark-antiquark,\ngluon-(anti)quark, and gluon-gluon subprocesses are included, as well as\nall-orders resummation of initial-state gluon radiation valid at\nnext-to-next-to-leading logarithmic accuracy. The region of phase space is\nspecified in which the calculation is most reliable. Good agreement is\ndemonstrated with data from the Fermilab Tevatron, and predictions are made for\nmore detailed tests with CDF and DO data. Predictions are shown for\ndistributions of diphoton pairs produced at the energy of the Large Hadron\nCollider (LHC). Distributions of the diphoton pairs from the decay of a Higgs\nboson

In [33]:

def get_Day_from_version(item) :
    values = ''.join(str(v) for v in item)
    
    print(values.split(",")[1].split(": '")[1])
    return values.split(",")[1].split(": '")[1]

get_Day_from_version('{"version":"v1","created":"Mon, 2 Apr 2007 19:18:42 GMT"}')

IndexError: list index out of range

In [13]:
def get_page(line):
    search = re.findall('\d+ pages', line)
    if search:
        return int(search[0].split(" ")[0])
    else :
        return 0

In [22]:
print(df.iloc[:1])

AttributeError: 'DataFrame' object has no attribute 'iloc'

In [23]:
df2 = df.limit(20)

In [30]:
df3=df2.rdd.map(lambda x: x['versions'])

In [31]:
df3.take(10)

[['{"version":"v1","created":"Mon, 2 Apr 2007 19:18:42 GMT"}',
  '{"version":"v2","created":"Tue, 24 Jul 2007 20:10:27 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 02:26:18 GMT"}',
  '{"version":"v2","created":"Sat, 13 Dec 2008 17:26:00 GMT"}'],
 ['{"version":"v1","created":"Sun, 1 Apr 2007 20:46:54 GMT"}',
  '{"version":"v2","created":"Sat, 8 Dec 2007 23:47:24 GMT"}',
  '{"version":"v3","created":"Sun, 13 Jan 2008 00:36:28 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 03:16:14 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 04:24:59 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 04:27:22 GMT"}',
  '{"version":"v2","created":"Wed, 22 Aug 2007 22:42:11 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 04:47:20 GMT"}',
  '{"version":"v2","created":"Thu, 10 Apr 2008 08:42:28 GMT"}',
  '{"version":"v3","created":"Tue, 1 Jul 2008 18:54:28 GMT"}'],
 ['{"version":"v1","created":"Sat, 31 Mar 2007 05:10:16 GMT"}'],
 ['{"version":"v1","created":"Sat, 3

In [None]:
df.rdd.map(lambda x: (get_Day_from_version(x['versions']), (get_page(x['comments'] if x['comments'] != None else "None"), 1)))\
.take(10)
