In [1]:
!pip3 install -q findspark

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JSON Analysis DF").master("local[*]").getOrCreate()

In [4]:
spark

In [16]:
Df1 = spark.read.json("/home/bigdata/Desktop/01 Bigdata RDD/archive/arxiv-metadata-oai-snapshot.json")

In [18]:
Df1.rdd.cache()

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

In [20]:
Df1.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [21]:
Df1.rdd.getNumPartitions()

25

In [22]:
Df1.rdd.repartition(100)

MapPartitionsRDD[42] at coalesce at NativeMethodAccessorImpl.java:0

In [28]:
Df1.rdd.getNumPartitions()

25

In [5]:
# Create a new Schema (Main feature of DF whereas in RDD We need to consider the Whole data)
# Also declaring Schema will drastically increase data import speed
# Define schema
from pyspark.sql.types import *

Schema = StructType([
    
    StructField('authors', StringType(), True),
    StructField('categories', StringType() , True),
    StructField('license', StringType(), True),
    StructField('comments', StringType() , True),
    StructField('abstract', StringType(), True),
    StructField('versions', ArrayType(StringType()), True)
    
])

print(Schema)

StructType(List(StructField(authors,StringType,true),StructField(categories,StringType,true),StructField(license,StringType,true),StructField(comments,StringType,true),StructField(abstract,StringType,true),StructField(versions,ArrayType(StringType,true),true)))


In [6]:
# Binding Data to a Schema

In [7]:
Df = spark.read.json("../01 Bigdata RDD/archive/arxiv-metadata-oai-snapshot.json", schema=Schema)

In [8]:
Df.show()

+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|             authors|       categories|             license|            comments|            abstract|            versions|
+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|           hep-ph|                null|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|    math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|   physics.gen-ph|                null| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|
|        David Callan|          math.CO|                null|            11 pages|  We show that a ...|[{"version":"v1",...|
|Wael Abu-Shammala...|  math.CA math.FA|                null|                null|  In this paper w...|[{"version":"v1",...|


In [9]:
# Missing values for "comments" and "license" attributes
Df = Df.na.drop(subset="comments")

In [10]:
Df = Df.na.fill(subset="license",value="unknown")

In [11]:
Df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|          categories|             license|            comments|            abstract|            versions|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|              hep-ph|             unknown|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|       math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|      physics.gen-ph|             unknown| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|
|        David Callan|             math.CO|             unknown|            11 pages|  We show that a ...|[{"version":"v1",...|
|Y. H. Pong and C....|   cond-mat.mes-hall|             unknown|6 pages, 4 figure...|  We study the tw..

In [12]:
# Get the author names who published a paper in a "math" category
Df.createOrReplaceTempView("arxiv")

sql_query = """
        SELECT authors,categories FROM arxiv
        WHERE categories like "math%"
"""

spark.sql(sql_query).show()



+--------------------+--------------------+
|             authors|          categories|
+--------------------+--------------------+
|Ileana Streinu an...|       math.CO cs.CG|
|        David Callan|             math.CO|
|  Sergei Ovchinnikov|             math.CO|
|Clifton Cunningha...|     math.NT math.AG|
|        Koichi Fujii|     math.CA math.AT|
|         Norio Konno|     math.PR math.AG|
|Simon J.A. Malham...|             math.NA|
|Robert P. C. de M...|             math.RA|
|  P\'eter E. Frenkel|     math.CA math.PR|
|          Mihai Popa|     math.OA math.FA|
|   Debashish Goswami|math.QA math-ph m...|
|      Mikkel {\O}bro|             math.CO|
|Nabil L. Youssef,...|       math.DG gr-qc|
|         Boris Rubin|             math.FA|
|         A. I. Molev|     math.AG math.CO|
| Branko J. Malesevic|     math.NT math.CV|
|   John W. Robertson|             math.DS|
|     Yu.N. Kosovtsov|     math-ph math.MP|
|        Osamu Fujino|             math.AG|
|Stephen C. Power ...|          

In [13]:
spark.sql(sql_query).count()

304590

In [14]:
# Get Licsences with 5 or more letters in the "abstract"

sql_query = """
                SELECT distinct(license) , abstract FROM arxiv
                WHERE LENGTH(abstract) >= 5;
"""

spark.sql(sql_query).show()

+--------------------+--------------------+
|             license|            abstract|
+--------------------+--------------------+
|             unknown|  We have success...|
|             unknown|  We characterize...|
|             unknown|  The existence o...|
|             unknown|  The search for ...|
|             unknown|  We measure the ...|
|             unknown|  AIMS: We invest...|
|             unknown|  We present a hi...|
|             unknown|  General relativ...|
|             unknown|  Let R be a comm...|
|             unknown|  Controlled Lagr...|
|             unknown|  The aim of the ...|
|             unknown|  In this paper w...|
|             unknown|  Necessary and s...|
|             unknown|  We show the asy...|
|             unknown|  It is well acce...|
|             unknown|  We present cosm...|
|             unknown|  This paper revi...|
|             unknown|  Neutron- and pr...|
|             unknown|  We address the ...|
|http://arxiv.org/...|  We discu

In [15]:
# Extract the statistic of the number of pages for unknown licenses
Df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|          categories|             license|            comments|            abstract|            versions|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|C. Bal\'azs, E. L...|              hep-ph|             unknown|37 pages, 15 figu...|  A fully differe...|[{"version":"v1",...|
|Ileana Streinu an...|       math.CO cs.CG|http://arxiv.org/...|To appear in Grap...|  We describe a n...|[{"version":"v1",...|
|         Hongjun Pan|      physics.gen-ph|             unknown| 23 pages, 3 figures|  The evolution o...|[{"version":"v1",...|
|        David Callan|             math.CO|             unknown|            11 pages|  We show that a ...|[{"version":"v1",...|
|Y. H. Pong and C....|   cond-mat.mes-hall|             unknown|6 pages, 4 figure...|  We study the tw..

In [30]:
Df.createOrReplaceTempView("Arxiv")

import re
def extract_pages(line):
    result = re.match(r"^(\d+)\s+pages", line)
    if result:
        return int(result.group(1))
    else:
        return 0
    
extract_pages("17 pages")
    
spark.udf.register("pageExtraction",extract_pages)
    

<function __main__.extract_pages(line)>

In [39]:
sql_query = """
            SELECT min(pageExtraction(comments)) AS min,max(pageExtraction(comments)) as max, 
            sum(pageExtraction(comments)) as sum,avg(pageExtraction(comments)) as avg,count(pageExtraction(comments)) as count,
            std(pageExtraction(comments)) as std
            FROM Arxiv
            WHERE license == "unknown"
"""

In [40]:

spark.sql(sql_query).show()

+---+---+---------+------------------+------+------------------+
|min|max|      sum|               avg| count|               std|
+---+---+---------+------------------+------+------------------+
|  0| 99|4769387.0|11.299294473295175|422096|15.001101909711023|
+---+---+---------+------------------+------+------------------+

