In [10]:
sc.version

'3.2.1'

[Stage 0:>                (0 + 31) / 31][Stage 1:>                 (0 + 1) / 31]

## 1. Imports

In [11]:
from pathlib import Path
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType, BooleanType, IntegerType
import pandas as pd
from pyspark.sql.functions import col, regexp_replace, when, coalesce, count

[Stage 0:>                (1 + 30) / 31][Stage 1:>                 (0 + 2) / 31]

## 2. Define directories

In [12]:
dir_data = Path('/export/data_ml4ds/AI4U/Datasets/semanticscholar/20231205/rawdata')
dir_parquet_sematicscholar = Path('/export/data_ml4ds/AI4U/Datasets/semanticscholar/20231205/parquet')
dir_parquet_researchportal = Path('/export/data_ml4ds/AI4U/Datasets/ResearchPortal/publications.parquet')
dir_parquet_sematicscholar = Path('/export/data_ml4ds/AI4U/Datasets/semanticscholar/20231205/parquet')

[Stage 0:>                (1 + 30) / 31][Stage 1:>                (0 + 10) / 31]

## 1. Import tables

### 1.1. Table **`papers`**

In [13]:
%%time
dir_papers = dir_data.joinpath('papers')
df_papers = spark.read.json('file:///' + dir_papers.as_posix())

#We drop corrupt records
df_papers = df_papers.cache()
df_papers = df_papers.where(F.col("_corrupt_record").isNull()).drop("_corrupt_record")

#print('Number of papers available:', df_papers.count())
#df_papers.printSchema()
#df_papers.show(n=2, truncate=120, vertical=True)



CPU times: user 147 ms, sys: 36 ms, total: 183 ms
Wall time: 9min 32s


                                                                                

In [14]:
%%time

# This function will be used for extracting only the Semantic Scholar FOS in string format
# Semantic Scholar uses several models, but we keep only FOS from s2-fos-model
def extractFOS(x):
    try:
        return [el['category'] for el in x
            if el['source'] == "s2-fos-model"]
    except:
        return None
    
extractFOS_UDF = F.udf(extractFOS, ArrayType(StringType()))

# Adapt columns names and formats for backwards compatibility
dataset = df_papers.select(F.col('corpusid').alias('id'), \
                           'title', \
                           F.col('url').alias('S2Url'), \
                           F.col('year').cast(IntegerType()), \
                           F.col('externalids.DOI').alias('doi'), \
                           F.col('externalids.PubMed').alias('pmid'), \
                           F.col('externalids.MAG').alias('magId'), \
                           'externalids', \
                           extractFOS_UDF(F.col('s2fieldsofstudy')).alias("fieldsOfStudy"), \
                           'publicationtypes', \
                           'publicationdate', \
                           F.col('journal.name').alias('journalName'), \
                           F.col('journal.pages').alias('journalPages'), \
                           F.col('journal.volume').alias('journalVolume'), \
                           'venue', \
                           'publicationvenueid', \
                           'isopenaccess', \
                           F.col('referencecount').cast(IntegerType()), \
                           F.col('citationcount').cast(IntegerType()), \
                           F.col('influentialcitationcount').cast(IntegerType()) \
                          )
#dataset.printSchema()
#dataset.show(n=2, truncate=120, vertical=True)

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 73.8 ms


In [15]:
%%time

dir_abstracts = dir_data.joinpath('abstracts')
df_abstracts = spark.read.json('file:///' + dir_abstracts.as_posix())

#We drop corrupt records
df_abstracts = df_abstracts.cache()
df_abstracts = df_abstracts.where(F.col("_corrupt_record").isNull()).drop("_corrupt_record")

#print('Number of abstracts available:', df_abstracts.count())
#df_abstracts.printSchema()
#df_abstracts.show(n=2, truncate=120, vertical=True)



CPU times: user 63.6 ms, sys: 8.35 ms, total: 72 ms
Wall time: 3min 53s


                                                                                

In [16]:
%%time

# Adapt columns names and formats for backwards compatibility
df_abstracts = df_abstracts.select(F.col('corpusid').alias('id'), \
                           F.col('abstract').alias('paperAbstract'), \
                           'openaccessinfo' \
                          )
#df_abstracts.printSchema()
#df_abstracts.show(n=2, truncate=120, vertical=True)

CPU times: user 2.01 ms, sys: 0 ns, total: 2.01 ms
Wall time: 11.2 ms


In [17]:
%%time

dataset = (dataset.join(df_abstracts, dataset.id ==  df_abstracts.id, "left")
                      .drop(df_abstracts.id)
                ).cache()

#print('Number of documents in dataset:', dataset.count())
#dataset.printSchema()
#dataset.show(n=2, truncate=120, vertical=True) 

CPU times: user 2.72 ms, sys: 272 µs, total: 2.99 ms
Wall time: 249 ms


In [None]:
print("Papers with PMID:", dataset.where(F.col("pmid").isNotNull()).count())
print("Papers with DOI:", dataset.where(F.col("doi").isNotNull()).count())
print("Unique DOIs:", dataset.where(F.col("doi").isNotNull()).select('doi').distinct().count())

In [11]:
dataset = dataset.select('id', 'title', 'doi', 'paperAbstract')
dataset.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- paperAbstract: string (nullable = true)



In [18]:
# Print number of publications in total in the Semantic Scholar database
print('Number of documents in Semantic Scholar:', dataset.count())

# Remove publications without Abstarct
dataset_no_na = dataset.filter(~(col("paperAbstract").isNull() | (col("paperAbstract") == '')))
print('Number of documents in Semantic Scholar: with abstract:', dataset_no_na.count())

### 1.2. Table **`publications`**

In [19]:
df_publications = spark.read.parquet(str(Path(dir_parquet_researchportal)))

In [None]:
df_publications.printSchema()

In [None]:
# Any duplicated? - NO
duplicates_df_publications = df_publications.groupBy("actID").count().filter(col("count") > 1)
print('Number of duplicated publications:', duplicates_df_publications.count())

[Stage 0:>                (0 + 28) / 31][Stage 1:>                 (0 + 0) / 31]

## 2. Matching

In [20]:
# Print number of publications in total in the Research Portal database
print('Number of publications in the Research Portal before the matching:', df_publications.count())

# Remove publications which already have Abstarct
df_publications_na = df_publications.filter("Abstract IS NULL OR Abstract = ''")
print('Number of publications in the Research Portal without abstract before the matching:', df_publications_na.count())


                                                                                

Number of publications in the Research Portal before the matching: 40244




Number of publications in the Research Portal without abstract before the matching: 24725


                                                                                

In [21]:
# Make the format of the DOI of publications in research portal match the format of Sematic Scholar
doi_pattern = "https://doi.org/"
# df_publications_na = df_publications_na.withColumn("clear_DOI", regexp_replace(col("DOI"), doi_pattern, ""))
df_publications_na = df_publications_na.withColumn("doi_pub", regexp_replace(col("DOI"), doi_pattern, ""))
df_publications_na = df_publications_na.withColumnRenamed("Title", "title_pub")


In [None]:
df_publications_na.printSchema()

In [22]:
dataset_no_na = dataset_no_na.withColumnRenamed("title", "title_ss")
dataset_no_na = dataset_no_na.withColumnRenamed("doi", "doi_ss")
#dataset_no_na.printSchema()


In [23]:
# perform a join of both tables in order of getting the number of matched publications
joined_df = df_publications_na.join(
    dataset_no_na,
    (df_publications_na.doi_pub == dataset_no_na.doi_ss) & (df_publications_na.title_pub == dataset_no_na.title_ss),
    "inner"
)
print('Number of matches:', joined_df.count())

24/01/24 09:59:12 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on node34.cluster.tsc.uc3m.es: Executor finished with state FAILED
24/01/24 09:59:12 WARN server.TransportChannelHandler: Exception in connection from /10.0.13.54:33438
java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:

Number of matches: 404


                                                                                

In [None]:
joined_df.printSchema()

In [24]:
joined_df_0 = joined_df.select('actID', 'id', 'title_pub', 'title_ss', 'doi_pub', 'doi_ss', 'paperAbstract')

In [25]:
# Unir las DataFrames en función de la columna actID
merged_df = df_publications.join(joined_df_0, "actID", "left_outer")

# Actualizar el valor de Abstract con el valor de paperAbstract donde esté disponible
result_df = merged_df.withColumn("Abstract", coalesce(col("paperAbstract"), col("Abstract")))

# Seleccionar las columnas necesarias
result_df = result_df.select("actID", "ActivityType", "Title", "Abstract", "Keywords", "Research_Areas",
                             "DOI", "Year", "Publisher", "ISSN", "EISSN", "ISBN")



In [26]:
print('Number of publications in the Research Portal without abstract after the matching:', result_df.filter("Abstract IS NULL OR Abstract = ''").count())


24/01/24 10:09:35 ERROR scheduler.TaskSchedulerImpl: Lost executor 14 on node69.cluster.tsc.uc3m.es: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/01/24 10:09:35 WARN scheduler.TaskSetManager: Lost task 10.0 in stage 23.0 (TID 501) (node69.cluster.tsc.uc3m.es executor 14): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/01/24 10:09:35 WARN scheduler.TaskSetManager: Lost task 16.0 in stage 23.0 (TID 512) (node69.cluster.tsc.uc3m.es executor 14): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/01/24 10:09:35 WARN scheduler.TaskSetManager: Lost task 19.0

Number of publications in the Research Portal without abstract after the matching: 24322


In [None]:
# check if there is any null value
null_check = result_df.select([F.count(F.when(F.col(column).isNull() | (F.trim(F.col(column)) == ""), column)).alias(column) for column in ['actID', 'Title', 'Abstract', 'DOI']])
null_check.show()

In [None]:
# Check if there is any duplicated
dup_check = result_df.groupBy(['actID', 'Title', 'Abstract', 'DOI']).agg(F.count("*").alias("count"))
dup_check = dup_check.filter("count > 1")
dup_check.show()

In [None]:
# delete this duplicated instance
result_df_no_duplicates = result_df.dropDuplicates(["actID", "Title", "Abstract", "DOI"])

In [None]:
# Save as a parquet the enriched datasets 
result_df_no_duplicates.write.parquet('export/usuarios_ml4ds/mbalairon/publications.parquet')