<a href="https://colab.research.google.com/github/bharath637/python/blob/main/pyspark_arami2_(JSON_DF).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Arxiv metadata Analytics with PySpark DF: JSON case study**

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark

### Author: Amin Karami (PhD, FHEA)
#### email: amin.karami@ymail.com

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=aa21a7a226069fb594c6e1c22d9dee06973872369571ba166901699c466a9d47
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [2]:
# import SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [6]:
# Read and Load Data to Spark
json_arx = spark.read.format("json").load("/content/samplearx.json", multiLine=True)
json_arx.printSchema()


root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [9]:
# check the partitions
df = json_arx
df.rdd.getNumPartitions() 

1

## Question 1: Create a new Schema

In [15]:
# schema prep
from pyspark.sql.types import *
Schema = StructType (
    [
                    StructField('authors', StringType(), True),
                    StructField('categories', StringType(), True),
                    StructField('license', StringType(), True),
                    StructField('comments', StringType(), True),
                    StructField('abstract', StringType(), True),
                    StructField('versions', ArrayType(StringType()), True)
    ]
)


## Question 2: Binding Data to a Schema

In [22]:
df1 = spark.read.json("/content/samplearx.json",schema=Schema,multiLine=True)
df1.printSchema()


root
 |-- authors: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- license: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [24]:
df1.show(1)

+--------------------+----------+-------+--------------------+--------------------+--------------------+
|             authors|categories|license|            comments|            abstract|            versions|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|    hep-ph|   null|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
+--------------------+----------+-------+--------------------+--------------------+--------------------+



## Question 3: Missing values for "comments" and "license" attributes

In [29]:
df1.dropna(subset=["comments"])
df1.count()
df2=df1.fillna(value="unknown",subset=["license"])
df2.show()

+--------------------+----------+-------+--------------------+--------------------+--------------------+
|             authors|categories|license|            comments|            abstract|            versions|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|    hep-ph|unknown|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
+--------------------+----------+-------+--------------------+--------------------+--------------------+



## Question 4: Get the author names who published a paper in a 'math' category

In [35]:
df2.createOrReplaceTempView("Archive")

sql_q = "select * from Archive where categories LIKE 'math%' "

dd=spark.sql(sql_q)
dd.show()


+-------+----------+-------+--------+--------+--------+
|authors|categories|license|comments|abstract|versions|
+-------+----------+-------+--------+--------+--------+
+-------+----------+-------+--------+--------+--------+



## Question 5: Get linceses with 5 or more letters in the "abstract"

In [68]:
from pyspark.sql.functions import *

sql_query1 = """ SELECT versions,comments FROM Archive
            """
spark.sql(sql_query1).collect()

import re
def get_Page(line):
    search = re.findall('\d+ pages', line)
    if search:
        return search[0].split(" ")[0]
    else:
        return 0
#get_Page('37 pages, 15 figures')
spark.udf.register("get_page_num",get_Page)


<function __main__.get_Page(line)>

In [110]:
df3=df2.withColumn("versions_str",df2.versions.cast(StringType()))
df4 = df3.select(df3.comments,substring(substring_index(substring_index(df3.versions_str,":",3),",",-2),12,3).alias('extract_day'))
df4.show(truncate=False)
#df.select(substring_index(df.s, '.', 2).alias('s')).collect()

+---------------------------------------+-----------+
|comments                               |extract_day|
+---------------------------------------+-----------+
|37 pages, 15 figures; published version|Mon        |
+---------------------------------------+-----------+



In [123]:
%%time
df1 = spark.read.json("/content/samplearx.json",schema=Schema,multiLine=True)
df2=df1.withColumn("versions_str",df1.versions.cast(StringType()))
df3=df2.select(df2.comments,substring(substring_index(substring_index(df2.versions_str,":",3),",",-2),12,3).alias('extract_day'))
df3.createOrReplaceTempView("Archive")
#get Page Function
import re
def get_Page(line):
    search = re.findall('\d+ pages', line)
    if search:
        return search[0].split(" ")[0]
    else:
        return 0
#get_Page('37 pages, 15 figures')
spark.udf.register("get_page_num",get_Page)
sql_q = """select avg(get_page_num(comments)) as average_num_of_page_per_day,extract_day from Archive 
            where get_page_num(comments) <> 0 group by extract_day  
        """
dd=spark.sql(sql_q)
dd.show(truncate=False)


+---------------------------+-----------+
|average_num_of_page_per_day|extract_day|
+---------------------------+-----------+
|67.5                       |Mon        |
+---------------------------+-----------+

CPU times: user 28 ms, sys: 4.52 ms, total: 32.5 ms
Wall time: 757 ms


In [118]:
get_Page('None')

0

## Question 6: Extract the statistic of the number of pages for unknown licenses