In [1]:
%%init_spark
launcher.master = "local[6]"
launcher.driver_memory = '8g'

In [2]:
import org.apache.spark.sql.functions._

Intitializing Scala interpreter ...

Spark Web UI available at http://pc:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[6], app id = local-1631801415406)
SparkSession available as 'spark'


import org.apache.spark.sql.functions._


In [3]:
val rawDataPath = "/mnt/a/Projects/101DataSets/Open-Academic-Graph/raw-data/"
val dataPath = "/mnt/a/Projects/101DataSets/Open-Academic-Graph/data/"

rawDataPath: String = /mnt/a/Projects/101DataSets/Open-Academic-Graph/raw-data/
dataPath: String = /mnt/a/Projects/101DataSets/Open-Academic-Graph/data/


# Read Data

In [4]:
val authors = spark.read.parquet(rawDataPath+"spark/authors.parquet")

authors: org.apache.spark.sql.DataFrame = [id: string, n_citation: bigint ... 7 more fields]


In [5]:
authors.printSchema

root
 |-- id: string (nullable = true)
 |-- n_citation: long (nullable = true)
 |-- n_pubs: long (nullable = true)
 |-- name: string (nullable = true)
 |-- normalized_name: string (nullable = true)
 |-- org: string (nullable = true)
 |-- orgs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pubs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- i: string (nullable = true)
 |    |    |-- r: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- t: string (nullable = true)



In [7]:
authors.show(2)

+----------+----------+------+------------------+------------------+----+----+-----------------+----+
|        id|n_citation|n_pubs|              name|   normalized_name| org|orgs|             pubs|tags|
+----------+----------+------+------------------+------------------+----+----+-----------------+----+
|2736393860|         0|     1|            马晓东|            马晓东|null|null|[{2738068945, 2}]|null|
|2736393861|         0|     1|Farras Amany Husna|farras amany husna|null|null|[{2738013386, 0}]|null|
+----------+----------+------+------------------+------------------+----+----+-----------------+----+
only showing top 2 rows



## n_citation and n_pubs frequeny tables

In [15]:
authors
.groupBy($"n_citation",$"n_pubs")
.count
.coalesce(1)
.write.option("header","true").mode("overwrite").csv(dataPath+"n_citation_n_pubs_count.csv")

## Find tags Frequeny

In [35]:
authors.filter(!isnull($"tags"))
.select("tags")
.take(2)
.foreach(println)

[WrappedArray([Magazine])]
[WrappedArray([Technology], [Entrepreneurship], [Design methods], [Risk aversion], [Impact], [Public sector], [Commerce], [Social network], [Technology management], [Perception], [Service provider], [Intervention], [Social environment], [Venture capital], [Social science], [Best practice], [Economics], [Innovation], [Management], [Organization development], [Public policy], [Superconducting magnetic energy storage], [Comparative research], [Futures Studies], [Ageing], [Primary source], [Finance], [Social capital], [Ethnic group], [Open learning], [Operations research])]


In [51]:
authors.filter(!isnull($"tags"))
.select(explode($"tags").as("tags"))
.select("tags.t")
.withColumnRenamed("t","tags")
.groupBy("tags")
.count
.coalesce(1)
.write.option("header","true").mode("overwrite").csv(dataPath+"tags_count.csv")

##  Add Location to Authors data

In [66]:
val aff = spark.read.option("header","true").csv(dataPath+"affiliation.csv")
.withColumnRenamed("DisplayName","org")

aff: org.apache.spark.sql.DataFrame = [org: string, Latitude: string ... 4 more fields]


In [67]:
aff.show(10)

+--------------------+---------+---------+-----+-----------+-----+
|                 org| Latitude|Longitude|   id|    country|iso3c|
+--------------------+---------+---------+-----+-----------+-----+
|Illinois College ...| 41.83574|-87.62341| 4605|        USA|  USA|
|   Sangji University| 37.36995|127.92861| 9507|South Korea|  KOR|
|Manchester Instit...|   53.468|   -2.236|15855|         UK|  GBR|
|Ateneo de Manila ...| 14.63889|121.07778|19722|Philippines|  PHL|
|Instituto Militar...|-22.95589|-43.16614|41870|         NA|   NA|
|Sapientia University|  46.7677|  23.5911|43886|    Romania|  ROU|
|Kahramanmaraş Süt...|  37.5856|  36.8212|46017|     Turkey|  TUR|
|Al Akhawayn Unive...| 33.53925| -5.10555|47844|    Morocco|  MAR|
|Oswaldo Cruz Foun...|-22.87488|-43.24544|52325|     Brazil|  BRA|
|     Curie Institute| 48.84333|  2.34417|80043|     France|  FRA|
+--------------------+---------+---------+-----+-----------+-----+
only showing top 10 rows



In [6]:
authors
.filter(!isnull($"org"))
.groupBy("org")
.agg(
    sum($"n_citation").as("total_citation"),
    sum($"n_pubs").as("total_pubs")
    )
.coalesce(1)
.write.option("header","true").mode("overwrite").csv(dataPath+"pubs_citation_org_count1.csv")

In [7]:
authors
.filter(!isnull($"tags") && !isnull($"org"))
.select($"org",explode($"tags").as("tags"))
.select("org","tags.t")
.withColumnRenamed("t","tags")
.groupBy("tags","org")
.count
.coalesce(1)
.write.option("header","true").mode("overwrite").csv(dataPath+"tags_org_count.csv")

In [48]:
val authorPaper = authors.select($"id",$"n_citation",$"n_pubs",explode($"pubs").as("pc"))
.select($"id",$"n_citation",$"n_pubs",$"pc.i".as("paperId"),$"pc.r".as("order"))

authorPaper: org.apache.spark.sql.DataFrame = [id: string, n_citation: bigint ... 3 more fields]


In [49]:
authorPaper
.groupBy($"n_citation",$"n_pubs",$"order")
.count
.coalesce(1)
.write.option("header","true").mode("overwrite").csv(dataPath+"n_citation_n_pubs_order_count.csv")