In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import format_number, mean, min, max, corr, stddev
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format, asc, desc)
from pyspark.sql.functions import explode, col, element_at, size, split
from pyspark.sql.functions import udf

In [2]:
# Build a SparkSession named as "test123"
spark = SparkSession.builder \
    .appName('test_123') \
    .master('local[*]') \
    .config('spark.sql.execution.arrow.pyspark.enabled', True) \
    .config('spark.sql.session.timeZone', 'UTC') \
    .config('spark.driver.memory','8g') \
    .config('spark.ui.showConsoleProgress', True) \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .getOrCreate()

## Loading and Cleaning the Data

In [3]:
papers_ = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").option("encoding", "ascii").json("../data/AL_papers.json")
papers = papers_.select(explode(col("hits.hits")).alias("paper"))

In [4]:
#papers.printSchema()

In [5]:
# short_papers = papers.select(
#     col("paper.created"),
#     element_at(col("paper.metadata.titles.title"),1).alias("title"),
#     element_at(col("paper.metadata.abstracts.value"),1).alias("abstract"),
#     col("paper.metadata.citation_count").alias("citation_count"),
#     col("paper.metadata.number_of_pages"),
#     col("paper.metadata.keywords"),
#     col("paper.metadata.authors.full_name").alias("authors"),
#     size(col("paper.metadata.references")).alias("num_refs")
# );
# short_papers.printSchema()

In [4]:
def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')
ascii_udf = udf(ascii_ignore)


In [5]:
short_papers = papers.select(
    element_at(col("paper.metadata.titles.title"), 1).alias("title"),
    element_at(col("paper.metadata.abstracts.value"), 1).alias("abstract"),
    col("paper.created"), col("paper.metadata.number_of_pages"),
    col("paper.metadata.keywords"), size(
        col("paper.metadata.references")).alias("num_refs"),
    col("paper.metadata.authors.full_name").alias("authors")
).withColumn("title", ascii_udf("title"));
short_papers.printSchema()

root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- created: string (nullable = true)
 |-- number_of_pages: long (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- schema: string (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- num_refs: integer (nullable = false)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Keywords Extraction

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

### Assigned

Extracting assigned keywords

In [7]:
# returns first keyword
@udf(returnType=ArrayType(StringType()))
def list_keywords(a):
    return [aa["value"] for aa in a] if a else ["None"]

aaa = short_papers.withColumn("keywords_", list_keywords(col("keywords")))
aaa

title,abstract,created,number_of_pages,keywords,num_refs,authors,keywords_
Charmonia product...,"In this paper, pr...",2021-03-26T00:00:...,15,"[{PACS, null, 14....",35,"[Luchinsky, A.V.,...","[14.40.Pq, 12.39...."
Exclusive decays ...,Exclusive decays ...,2020-07-09T00:00:...,10,"[{null, publisher...",42,"[Luchinsky, A.V.,...",[Strong Interacti...
Doubly heavy bary...,The theoretical a...,2019-12-11T00:00:...,8,"[{INSPIRE, null, ...",40,"[Berezhnoy, A.V.,...","[talk, baryon: he..."
Weak decays of do...,We consider exclu...,2019-05-29T00:00:...,10,"[{null, publisher...",21,"[Gerasimov, A.S.,...",[Electroweak inte...
$B_c$ excitations...,Status of the Bc ...,2019-11-21T00:00:...,8,"[{INSPIRE, null, ...",49,"[Berezhnoy, A.V.,...","[talk, B/c: excit..."
Excited $\rho$ me...,"In this paper, ex...",2018-12-27T00:00:...,7,"[{null, publisher...",22,"[Luchinsky, A.V.]",[Phenomenological...
Doubly heavy bary...,The theoretical a...,2018-09-27T00:00:...,14,"[{null, publisher...",56,"[Berezhnoy, A.V.,...",[Electroweak inte...
Charmonia Product...,In the presented ...,2018-01-30T00:00:...,9,"[{PACS, null, 14....",30,"[Luchinsky, A.V.]","[14.40.Pq, 12.39...."
Lifetimes of Doub...,The inclusive dec...,2019-02-25T00:00:...,11,"[{INSPIRE, null, ...",49,"[Likhoded, A.K., ...","[baryon: heavy, b..."
Double Charmonia ...,This paper is dev...,2017-12-11T00:00:...,14,"[{PACS, null, 13....",36,"[Likhoded, A.K., ...","[13.38.Dg, 14.40...."


Which of them are most popular?

In [8]:
aaa.select(explode("keywords_").alias("K")).groupby("K").count().sort(desc("count"))

K,count
numerical calcula...,31
CERN LHC Coll,21
quantum chromodyn...,17
LHC-B,14
14.40.Pq,13
charmonium,12
electron positron...,10
12.38.Bx,10
13.66.Bc,10
quarkonium: heavy,10


Search papers with **charmonium** keyword

In [9]:
from pyspark.sql.functions import array_contains

aaa.filter( array_contains(col("keywords_"), "charmonium"))

title,abstract,created,number_of_pages,keywords,num_refs,authors,keywords_
Production of hea...,Processes of sing...,2017-08-24T00:00:...,13,"[{INSPIRE, null, ...",57,"[Likhoded, A.K., ...",[quarkonium: heav...
BC_NPI module for...,The module for th...,2011-04-05T00:00:...,15,"[{INSPIRE, null, ...",19,"[Berezhnoy, A.V.,...",[B/c: hadronic de...
Light hadron prod...,Decays of ground ...,2009-10-18T00:00:...,10,"[{PACS, null, 13....",26,"[Likhoded, A.K., ...","[13.35.Dx, 13.20...."
Leading twist dis...,This paper is dev...,2008-10-20T00:00:...,8,"[{PACS, null, 13....",19,"[Braguta, V.V., L...","[13.25.Gv, 12.38...."
Lepton pair produ...,Coherent producti...,2007-09-04T00:00:...,15,"[{PACS, null, 13....",10,"[Berezhnoy, A.V.,...","[13.60.-r, 13.60...."
Charmonium produc...,The production of...,2007-03-08T00:00:...,28,"[{INSPIRE, null, ...",30,"[Likhoded, A.K., ...",[p p: inclusive r...
Systematics of he...,"It is shown that,...",2007-06-15T00:00:...,10,"[{PACS, null, 14....",37,"[Gershtein, S.S.,...","[14.40.Gx, 12.40...."
The Processes e+ ...,In this paper we ...,2006-02-08T00:00:...,8,"[{PACS, null, 13....",13,"[Braguta, V.V., L...","[13.25.Gv, 12.38...."
Systematics of he...,In this paper we ...,2006-02-08T00:00:...,9,"[{PACS, null, 14....",33,"[Gershtein, S.S.,...","[14.40.Gx, 12.40...."
Excited charmoniu...,In this paper the...,2005-07-25T00:00:...,7,"[{PACS, null, 13....",16,"[Braguta, V.V., L...","[13.25.Gv, 13.66...."


In [None]:
aaa.

### From titles

In [10]:
from pyspark.sql.functions import split

In [11]:
short_papers.printSchema()

root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- created: string (nullable = true)
 |-- number_of_pages: long (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- schema: string (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- num_refs: integer (nullable = false)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [12]:
tits = papers_.select(explode(col("hits.hits.metadata.titles")).alias("titles"));
tits = tits.select( element_at(col("titles.title"), 0).alias("title"))
tits.printSchema()
#tits

root
 |-- title: string (nullable = true)



In [13]:
tits.limit(1).toPandas()

Py4JJavaError: An error occurred while calling o169.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (9b184f827618 executor driver): java.lang.ArrayIndexOutOfBoundsException: SQL array indices start at 1
	at org.apache.spark.sql.errors.QueryExecutionErrors$.sqlArrayIndexNotStartAtOneError(QueryExecutionErrors.scala:1088)
	at org.apache.spark.sql.errors.QueryExecutionErrors.sqlArrayIndexNotStartAtOneError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:476)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ArrayIndexOutOfBoundsException: SQL array indices start at 1
	at org.apache.spark.sql.errors.QueryExecutionErrors$.sqlArrayIndexNotStartAtOneError(QueryExecutionErrors.scala:1088)
	at org.apache.spark.sql.errors.QueryExecutionErrors.sqlArrayIndexNotStartAtOneError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [14]:
short_papers.limit(1).select("title")

AnalysisException: Invalid call to exprId on unresolved object

AnalysisException: Invalid call to exprId on unresolved object

In [54]:
kwds_tit = short_papers.select(split(col("title"), " ").alias("W")).select(explode("W").alias("W")).groupby("W").count().sort(desc("count"))
kwds_tit.printSchema()

root
 |-- W: string (nullable = true)
 |-- count: long (nullable = false)



In [56]:
kws_title = short_papers.select("title").                \
    withColumn("title", split(col("title"), " ")).     \
    select(explode(col("title")).alias("K")).          \
    groupBy("K").count().sort(asc("count"))


In [68]:
short_papers.limit(3).select(col("abstract"))

AnalysisException: Invalid call to exprId on unresolved object

AnalysisException: Invalid call to exprId on unresolved object

In [61]:
kws_title = short_papers.select(col("title"))


AnalysisException: Invalid call to exprId on unresolved object

AnalysisException: Invalid call to exprId on unresolved object

In [55]:
kwds_tit.limit(10)

AnalysisException: Invalid call to nullable on unresolved object

AnalysisException: Invalid call to nullable on unresolved object