# DATA EXTRACTION

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType,
BooleanType, LongType, DoubleType, ArrayType, FloatType)
from pyspark.sql import functions as F

In [2]:
spark = (SparkSession
     .builder
     .master('local[*]')
     .getOrCreate())

This schema was acquired by loading a dataset and through https://docs.openalex.org/api-entities/works/work-object

In [3]:
schema = StructType([
    StructField("authorships", ArrayType(
        StructType([
            StructField("author", StructType([
                StructField("display_name", StringType(), True),
                StructField("id", StringType(), True),
                StructField("orcid", StringType(), True)
            ]), True),
            StructField("author_position", StringType(), True),
            StructField("institutions", ArrayType(
                StructType([
                    StructField("country_code", StringType(), True),
                    StructField("display_name", StringType(), True),
                    StructField("id", StringType(), True),
                    StructField("ror", StringType(), True),
                    StructField("type", StringType(), True)
                ])
            ), True),
            StructField("raw_affiliation_string", StringType(), True)
        ])
    ), True),
    StructField("cited_by_count", LongType(), True),
    StructField("concepts", ArrayType(
        StructType([
            StructField("display_name", StringType(), True),
            StructField("id", StringType(), True),
            StructField("level", LongType(), True),
            StructField("score", DoubleType(), True),
            StructField("wikidata", StringType(), True)
        ])
    ), True),
    StructField("counts_by_year", ArrayType(
        StructType([
            StructField("cited_by_count", LongType(), True),
            StructField("year", LongType(), True)
        ])
    ), True),
    StructField("created_date", StringType(), True),
    StructField("display_name", StringType(), True),
    StructField("doi", StringType(), True),
    StructField("host_venue", StructType([
        StructField("display_name", StringType(), True),
        StructField("id", StringType(), True),
        StructField("is_oa", BooleanType(), True),
        StructField("issn", ArrayType(StringType()), True),
        StructField("issn_l", StringType(), True),
        StructField("license", StringType(), True),
        StructField("publisher", StringType(), True),
        StructField("type", StringType(), True),
        StructField("url", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),
    StructField("id", StringType(), True),
    StructField("publication_date", StringType(), True),
    StructField("corresponding_institution_ids", StringType(), True),
    StructField("publication_year", LongType(), True),
    StructField("title", StringType(), True),
    StructField("type", StringType(), True),
    StructField("updated_date", StringType(), True),
    StructField("language", StringType(), True)
])


In [4]:
subfield_schema = StructType([
    StructField("id", StringType(), True),
    StructField("display_name", StringType(), True)
])

# Define the primary topic schema which is a nested structure itself
primary_topic_schema = StructType([
    StructField("id", StringType(), True),
    StructField("display_name", StringType(), True),
    StructField("score", FloatType(), True),
    StructField("subfield", subfield_schema, True),  # Nested structure for subfield
    StructField("field", subfield_schema, True),     # Reusing schema for field
    StructField("domain", subfield_schema, True)     # Reusing schema for domain
])

topic_schema = StructType([
    StructField("id", StringType(), True),
    StructField("display_name", StringType(), True),
    StructField("score", FloatType(), True),
    StructField("subfield", subfield_schema, True),
    StructField("field", subfield_schema, True),
    StructField("domain", subfield_schema, True)
])

sdg_schema = StructType([
    StructField("id", StringType(), True),
    StructField("display_name", StringType(), True),
    StructField("score", FloatType(), True)
])

key_schema = StructType([
    StructField("id", StringType(), True),
    StructField("display_name", StringType(), True),
    StructField("score", FloatType(), True)
])


In [5]:
new_schema = StructType(schema.fields + [
    StructField("primary_topic", primary_topic_schema, True),
    StructField("sustainable_development_goals", ArrayType(sdg_schema), True),
    StructField("topics", ArrayType(topic_schema), True),
    StructField("keywords", ArrayType(key_schema), True)
])

For the sake of fatser run time, we will be using all the updated works of march 27, 2024. Moreover we will be using one dataset as an representative for understanding and preprocessing the data later on.

In [6]:
oa = spark.read.schema(new_schema).json('/mnt/data/public/openalex/data/'
                                        'works/updated_date=2024-03-27/*')

Base from our problem statement, we will only be using the follow columns:

| Variable                                         | Description                                            |
|--------------------------------------------------|--------------------------------------------------------|
| title                                            | Title of the work                                      |
| authorships.author.id                            | Identification of the authors                          |
| authorships.author_position                      | Position of the author                                 |
| publication_year                                 | Publication year of the work                           |
| corresponding_institution_ids                    | Institution where it was published                     |
| type                                             | Type of work                                           |
| primary_topic.field.display_name                 | Field of the published work                            |
| primary_topic.domain.display_name                | Domain of the published work                           |
| primary_topic.score                              | Score of the published work                            |
| sustainable_development_goals.display_name       | Corresponding Sustainable Development Goals            |
| sustainable_development_goals.score              | Score of Sustainable Development Goals                 |
| keywords.display_name                            | Keywords related to this study                         |
| updated_date                                     | Date of the update of this work                        |

In [10]:
df_oa = (oa.select(F.col('title'),
                         F.col('authorships.author.id').alias('author_id'),
                         F.col('authorships.author_position').alias(
                             'author_pos'),
                         F.col('publication_year'),
                         F.col('corresponding_institution_ids').alias(
                             'inst_id'),
                         F.col('type'),
                         F.col('primary_topic.field.display_name').alias(
                             'field'),
                         F.col('primary_topic.domain.display_name').alias(
                             'domain'),
                         F.col('primary_topic.score').alias('primary_score'),
                         F.col('sustainable_development_goals.display_name')
                         .alias('sgd_name'),
                         F.col('sustainable_development_goals.score').alias(
                             'sgd_score'),
                         F.col('keywords.display_name').alias('keywords'),
                         F.col('updated_date'),
                     F.col('language'),
                     F.col('cited_by_count')))

Save to parquent <br>
note. the code was comment out because it is for writing a parquet file, if you want to create the parquet file, then uncomment it.

In [20]:
# (df_final.write.parquet('oa', mode='overwrite',
#                      partitionBy=['publication_year', 'domain']))

### Data Understanding and Preprocessing

In [6]:
df_oa = spark.read.parquet('oa')

Let's look at the column `title`

In [4]:
df_oa.groupBy('title').count().orderBy(F.desc('count')).show()

+-----------------+------+
|            title| count|
+-----------------+------+
|             NULL|116504|
|     Introduction| 16957|
|      Frontmatter| 13287|
|  Editorial Board| 11702|
|         Contents|  8593|
|            Index|  8112|
|        Editorial|  7275|
|     Front Matter|  7083|
|          Preface|  6311|
|Table of Contents|  6137|
| [Not Available].|  5720|
|Issue Information|  4554|
|         Masthead|  3612|
|          Erratum|  3518|
|  Acknowledgments|  3434|
|     Book Reviews|  3409|
|            Notes|  3307|
|       Conclusion|  3156|
|     Bibliography|  2995|
|           Inhalt|  2840|
+-----------------+------+
only showing top 20 rows



Looking from the result above it turns out not all titles are work titles and since this is a title, it should be unique, hence we will all drop all duplicates. Moreover as per the answers from the forum in researchgate, a title should have at least 5 words, therefore we will drop all rows that has less than 5 words.

Lets look on the `publication_year`

In [5]:
df_oa.groupby('publication_year').count().orderBy(F.desc('publication_year')).show(3)

+----------------+------+
|publication_year| count|
+----------------+------+
|            2025|    49|
|            2024| 59570|
|            2023|384246|
+----------------+------+
only showing top 3 rows



In [9]:
df_oa.groupby('publication_year').count().orderBy(F.asc('publication_year')).show(5)

+----------------+-----+
|publication_year|count|
+----------------+-----+
|              21|    1|
|            1400|    2|
|            1451|    1|
|            1482|    1|
|            1487|    1|
+----------------+-----+
only showing top 5 rows



It seemms that we have some wide range of publication year, let us look into those earlier years

In [11]:
df_oa.select(F.col('title')).filter(F.col('publication_year') == 1500).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                               |
+------------------------------------------------------------------------------------------------------------------------------------+
|dummyReport of the ISC2006 Open Symposium “The Sumatra earthquake and tsunami in the Indian Ocean and tsunami-related sedimentology”|
|dummyNovel purification of vitronectin from human plasma by heparin affinity chromatography.                                        |
|dummyUpper Triassic and Lower Jurassic fluvial and eolian deposits, Zion National Park, Utah, U.S.A.                                |
|dummyThe 17th International Sedimentological Congress (ISC2006) was held in Fukuoka Japan from August 27 to September 1, 2006       |
|[رباعی حافظ] /                                        

it seems that earlier publication dates, make sense since there are some mixture or mistakes, in this case, we will only consider publication year from 20th century onwards. 

In [15]:
df_oa.printSchema()

root
 |-- title: string (nullable = true)
 |-- author_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- author_pos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- inst_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- field: string (nullable = true)
 |-- primary_score: float (nullable = true)
 |-- sgd_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sgd_score: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- updated_date: string (nullable = true)
 |-- language: string (nullable = true)
 |-- publication_year: integer (nullable = true)
 |-- domain: string (nullable = true)



Lets look at the columns `author_id` and `inst_id`

In [19]:
df_oa.select('author_id', 'inst_id').show(5, False, True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 author_id | [https://openalex.org/A5028090784, https://openalex.org/A5031543940, https://openalex.org/A5031530665, https://openalex.org/A5059057970, https://openalex.org/A5044347263]                                                                                                                                         
 inst_id   | ["https://openalex.org/I29457043"]                                                                                                                                                                                                                                                                                 
-RECORD 1----------------------------

From the schema, the `inst_id` has a string type, despite having an array values, and because of that, we will transform its type into Array. Moreover, from above the value shows the URL directing us to their profile. For simplicity purposes, we will only get the identifier which is the id after `.org/`

In [22]:
df_oa.select('language').groupby('language').count().show(10)

+--------+--------+
|language|   count|
+--------+--------+
|      en|10128253|
|      vi|   16416|
|      ne|     118|
|      ro|   22267|
|      sl|    5129|
|      ur|     129|
|      lv|    1199|
|      pl|   56484|
|      sk|    2685|
|      pt|  225402|
+--------+--------+
only showing top 10 rows



From above results, it seems that there are multiple published works in different language. Since we are not familiar with some of those, we will limit the publication works into english only.

In [10]:
sch = ArrayType(StringType())

In [11]:
def extract_id(url):
    return (F.when(F.regexp_extract(url, "org/(.*)", 1) != "",
                   F.regexp_extract(url, "org/(.*)", 1)).otherwise(url))

In [14]:
df_final = (
    df_oa.filter((F.col('publication_year') >= 1901)
                 & (F.col('publication_year') <= 2024)
                 & (F.col('language') == 'en')
                 & (F.size(F.split(F.col('title'), ' ')) >= 5)
                 & (F.col('title').isNotNull()))
    .withColumn('inst_id', F.from_json(F.col('inst_id'), sch))
    .withColumn("inst_id", F.transform(F.col("inst_id"), extract_id))
    .withColumn('author_id', F.transform(F.col('author_id'), extract_id))
    .dropDuplicates(['title'])
    .drop('language')
    )

In [15]:
df_final.show(2, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------
 title            |  A Closer Look at Non-Uniqueness During Dynamic Data Integration into Reservoir Characterization                   
 author_id        | [A5024941030, A5031014994, A5072724263]                                                                            
 author_pos       | [first, middle, last]                                                                                              
 publication_year | 1998                                                                                                               
 inst_id          | []                                                                                                                 
 type             | article                                                                                                            
 field            | Engineering                 

Finally we can drop the `language` since it has only 1 value

Save to parquet <br>
note. the code was comment out because it is for writing a parquet file, if you want to create the parquet file, then uncomment it.

In [4]:
# (df_final.write.parquet('FINAL_OA_1', mode='overwrite',
#                   partitionBy=['publication_year', 'domain']))