In [1]:
sc.version

'3.4.0'

### Imports

In [3]:
#from configparser import ConfigParser
from pathlib import Path
import pyspark.sql.functions as F
#import requests
from pyspark.sql.types import ArrayType, StringType


### Define directories

In [4]:
# Define directories
#
# Relevant directories are read from the config file:
# dir_data:    full path to hdfs directory where the raw data .gz files are stored
# dir_parquet: full path to hdfs directory where the parquet tables will be stored
# version:     Version of Semantic Scholar that is being processed
#              for information purposes only

# cf = ConfigParser()
# cf.read("../config.cf")

# dir_data = Path(cf.get("spark", "dir_data"))
# dir_parquet = Path(cf.get("spark", "dir_parquet"))
# version = cf.get("spark", "version")
# dir_pdfs = Path(cf.get("spark", "dir_pdfs"))

dir_data = Path('/export/clusterdata/jarenas/Datasets/semanticscholar/20230418/rawdata')
dir_parquet = Path('/export/ml4ds/IntelComp/Datalake/semanticscholar/20230418/parquet')

### Configuration hdfs

It is not possible to listdir() directly using Path as it is a hdfs

In [5]:
# Configuration hdfs
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
hdfs_dir_parquet = spark._jvm.org.apache.hadoop.fs.Path(dir_parquet.as_posix())
# Create output directories if they do not exist
# !hadoop dfs ...
# !hadoop dfs -put 20220201 /export/ml4ds/IntelComp/Datalake/SemanticScholar/

if not fs.exists(hdfs_dir_parquet):
    fs.mkdirs(hdfs_dir_parquet)

In [None]:
"""
# Configuration hdfs
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
hdfs_dir_data = spark._jvm.org.apache.hadoop.fs.Path(dir_data.as_posix())

print(hdfs_dir_data)

# Get selected version
releases = sorted(
    [
        f.getPath().getName()
        for f in fs.listStatus(hdfs_dir_data)
        if f.isDirectory() and f.getPath().getName().isdigit()
    ]
)
version = version.replace("-", "")
if version == "last":
    version = releases[-1]
if version not in releases:
    print(f"Version {version} not found")
    print(f"Available versions: {releases}")

hdfs_dir_data_files = spark._jvm.org.apache.hadoop.fs.Path(
    dir_data.joinpath(version).as_posix()
)
hdfs_dir_parquet = spark._jvm.org.apache.hadoop.fs.Path(dir_parquet.as_posix())
hdfs_dir_version = spark._jvm.org.apache.hadoop.fs.Path(
    dir_parquet.joinpath(version).as_posix()
)

# Create output directories if they do not exist
# !hadoop dfs ...
# !hadoop dfs -put 20220201 /export/ml4ds/IntelComp/Datalake/SemanticScholar/

if not fs.exists(hdfs_dir_parquet):
    fs.mkdirs(hdfs_dir_parquet)

if not fs.exists(hdfs_dir_version):
    fs.mkdirs(hdfs_dir_version)
"""

### Auxiliary functions

In [6]:
def normalize(text):
    """
    Removes extra spaces in text
    """
    if isinstance(text, str):
        text = " ".join(text.split())
    return text


def get_pdf(pdf_list):
    """
    Gets the first valid pdf url for a paper
    """
    pdf_list = [pdf for pdf in pdf_list if pdf.endswith(".pdf")]
    if len(pdf_list) > 0:
        return pdf_list[0]
    else:
        return None


#
# Create user defined functions to apply in dataframes
#

# Obtain ID from author
take_id = F.udf(lambda x: normalize(x[0] if len(x) > 0 else None), StringType())

# For each paper get all authors
take_authors_ids = F.udf(
    lambda x: [normalize(el[0] if len(el) > 0 else None) for el in x],
    ArrayType(StringType()),
)

# Remove extra spaces
norm_string = F.udf(normalize, StringType())

# Get first valid pdf url
get_first_pdf = F.udf(get_pdf, StringType())


### Read data files

In [18]:
dir_abstracts = dir_data.joinpath('abstracts')
df_abstracts = spark.read.json('file:///' + dir_abstracts.as_posix()).drop('updated')

print('Number of abstracts available:', df_abstracts.count())
df_abstracts.printSchema()
df_abstracts.show(n=2, truncate=120, vertical=True)

                                                                                

Number of abstracts available: 100048909
root
 |-- _corrupt_record: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- corpusid: long (nullable = true)
 |-- openaccessinfo: struct (nullable = true)
 |    |-- externalids: struct (nullable = true)
 |    |    |-- ACL: string (nullable = true)
 |    |    |-- ArXiv: string (nullable = true)
 |    |    |-- DOI: string (nullable = true)
 |    |    |-- MAG: string (nullable = true)
 |    |    |-- PubMedCentral: string (nullable = true)
 |    |-- license: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- url: string (nullable = true)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 _corrupt_record | null                                                                                                  

In [19]:
dir_papers = dir_data.joinpath('papers')
df_papers = spark.read.json('file:///' + dir_papers.as_posix()).select('corpusid', 'externalids', 'title', 'year', 'referencecount', 'citationcount', 'influentialcitationcount', 'S2fieldsofstudy')

print('Number of papers available:', df_papers.count())
df_papers.printSchema()
df_papers.show(n=2, truncate=120, vertical=True)

                                                                                

Number of papers available: 211633059
root
 |-- corpusid: long (nullable = true)
 |-- externalids: struct (nullable = true)
 |    |-- ACL: string (nullable = true)
 |    |-- ArXiv: string (nullable = true)
 |    |-- CorpusId: string (nullable = true)
 |    |-- DBLP: string (nullable = true)
 |    |-- DOI: string (nullable = true)
 |    |-- MAG: string (nullable = true)
 |    |-- PubMed: string (nullable = true)
 |    |-- PubMedCentral: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- referencecount: long (nullable = true)
 |-- citationcount: long (nullable = true)
 |-- influentialcitationcount: long (nullable = true)
 |-- S2fieldsofstudy: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- source: string (nullable = true)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------

In [23]:
dataset = (df_abstracts.join(df_papers, df_abstracts.corpusid ==  df_papers.corpusid, "left")
                      .drop(df_papers.corpusid)
                )

print('Number of documents in dataset:', dataset.count())
dataset.show(n=10, truncate=120, vertical=True)


                                                                                

Number of documents in dataset: 100048997


[Stage 42:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
 _corrupt_record          | null                                                                                                                     
 abstract                 | ABSTRACT We investigated seasonal differences in community structure and activity (leucine incorporation) of the plan... 
 corpusid                 | 100031                                                                                                                   
 openaccessinfo           | {{null, null, 10.1128/AEM.01089-06, 2137258175, null}, null, null, null}                                                 
 externalids              | {null, null, 100031, null, 10.1128/AEM.01089-06, 2137258175, 17021206, null}                                             
 title                    | Blooms of Single Bacterial Species in a Coastal Lagoon of the Southweste

                                                                                

In [24]:
dataset.coalesce(1000).write.parquet(
    dir_parquet.joinpath(f"papers_IMT.parquet").as_posix(),
    mode="overwrite",
)

                                                                                

In [27]:
dataset_small = dataset.sample(fraction=0.001).select("externalids", "S2fieldsofstudy")
dataset_small.show(n=2, truncate=120, vertical=True)

[Stage 66:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------------------------------------------------------------------------------
 externalids     | {null, null, 176711, null, 10.1590/S0004-28032015000100014, 1957784724, 26017086, null} 
 S2fieldsofstudy | [{Biology, s2-fos-model}, {Medicine, s2-fos-model}, {Medicine, external}]               
-RECORD 1--------------------------------------------------------------------------------------------------
 externalids     | {null, null, 290964, null, null, 2407907656, 25185378, null}                            
 S2fieldsofstudy | [{Medicine, s2-fos-model}, {Medicine, external}]                                        
only showing top 2 rows



                                                                                

In [28]:
dfPersist = dataset_small.persist()

In [29]:
dfPersist.show(n=2, truncate=120, vertical=True)

ERROR:root:KeyboardInterrupt while sending command.               (0 + 13) / 31]
Traceback (most recent call last):
  File "/opt/spark-3.4.0-bin-3.3.1/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark-3.4.0-bin-3.3.1/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

[Stage 67:====>          (10 + 21) / 31][Stage 69:>               (0 + 19) / 31]

In [None]:
udf_FOS = F.udf(lambda x:list(x), StringType() )
dataset_small = (
    dataset_small.withColumn("FOS",udf_FOS(F.col("S2fieldsofstudy")))
)

dataset_small.show(n=2, truncate=120, vertical=True)

In [12]:
%%time

# Read data files
#
# Create a spark df with all the papers in all json files

df = spark.read.json(dir_data.joinpath(version).as_posix())

22/02/09 23:54:15 WARN datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

CPU times: user 1.9 s, sys: 279 ms, total: 2.18 s
Wall time: 10min 18s


### Create papers dataframe and save as parquet file

In [None]:
%%time

# Create papers dataframe and save as parquet file
#
# Papers table will be created keeping only a subset of desired columns
# It is then stored in disk as a parquet file

# Columns to save
columns = [
    "id",
    "title",
    "paperAbstract",
    "s2Url",
    "pdfUrls",
    "year",
    "sources",
    "doi",
    "doiUrl",
    "pmid",
    "magId",
    "fieldsOfStudy",
    "journalName",
    "journalPages",
    "journalVolume",
    "venue",
]
# Select papers info
df_papers = df.select(columns)

# Clean info
for c in columns:
    if df.select(c).dtypes[0][1] == "string":
        df_papers = df_papers.withColumn(c, norm_string(c))

# Save dataframe as parquet
df_papers.write.parquet(
    dir_parquet.joinpath(version).joinpath("papers.parquet").as_posix(),
    mode="overwrite",
)

print('Number of papers in S2 version ' + version + ':', df_papers.count())

### Create authors dataframe and save as parquet file

In [None]:
%%time

# Create authors dataframe and save as parquet file
#
# Authors table will be created from all authors listed in every paper
# - Duplicates will be removed keeping only one row for each author id
# - Authors with empty ids will also be removed from dataframe

# Select only the authors
df_authors = df.select(F.explode("authors").alias("authors"))

# Convert dataframe into two columns (id, author name)
df_authors = (
    df_authors.select("authors.ids", "authors.name")
    .withColumn("ids", take_id("ids"))
    .withColumn("name", norm_string("name"))
    .withColumnRenamed("ids", "id")
    .drop_duplicates(subset=["id"])
    .dropna(subset=["id"])
)

# Save dataframe as parquet
df_authors.write.parquet(
    dir_parquet.joinpath(version).joinpath("authors.parquet").as_posix(), mode="overwrite"
)

print('Number of authors in S2 version ' + version + ':', df_authors.count())

### Create citations dataframe and save as parquet file

In [None]:
%%time

# Create citations dataframe and save as parquet file
#
# We create a row paper_source_id -> paper_destination_id
# by exploding all citations of all papers in the version

# Select paper-authors info
df_citations = df.select(["id", "outCitations"])
df_citations = (
    df_citations.withColumn("outCitations", F.explode("outCitations"))
    .withColumnRenamed("id", "source")
    .withColumnRenamed("outCitations", "dest")
)

# Save dataframe as parquet
df_citations.write.parquet(
    dir_parquet.joinpath(version).joinpath("citations.parquet").as_posix(),
    mode="overwrite",
)

print('Number of citations in S2 version ' + version + ':', df_citations.count())

### Create paper_author dataframe and save as parquet file

In [14]:
%%time

# Create paper_author dataframe and save as parquet file
#
# We create a row paper_id -> author_id
# by exploding all authors of all papers in the version

# Select paper-authors info
df_paperAuthor = df.select(["id", "authors"])
df_paperAuthor = df_paperAuthor.dropna()
df_paperAuthor = (
    df_paperAuthor.withColumn("authors", F.explode(take_authors_ids("authors.ids")))
    .withColumnRenamed("id", "paper_id")
    .withColumnRenamed("authors", "author_id")
    .dropna()
)

# Save dataframe as parquet
df_paperAuthor.write.parquet(
    dir_parquet.joinpath(version).joinpath("paper_author.parquet").as_posix(),
    mode="overwrite",
)

print('Number of authorships in S2 version ' + version + ':', df_paperAuthor.count())

[Stage 4:>                                                          (0 + 1) / 1]

Number of authorships in S2 version 20220201: 551656008
CPU times: user 4.01 s, sys: 714 ms, total: 4.72 s
Wall time: 21min 10s


                                                                                

### Download PDFs (IN PROGRESS)

In [15]:
# Get previously downloaded pdfs
list_pdfs = set(
    [x.stem for x in dir_pdfs.iterdir() if x.is_file()]
)

# Select pdfs to download
pdf_urls = (
    df.select(["id", "pdfUrls"])
    .withColumn("pdfUrls", get_first_pdf("pdfUrls"))
    .filter(F.length("pdfUrls") > 0)
)
pdf_urls = pdf_urls.where(~F.col("id").isin(list_pdfs))

pdf_test = pdf_urls.limit(5)


FileNotFoundError: [Errno 2] No such file or directory: '/export/ml4ds/IntelComp/Datalake/SemanticScholar/rawdata/20220201/pdfs'

In [None]:
## Download PDFs
#
# We download PDFs for all papers with valid a valid pdfUrl
# This option is not activated by default, since the number
# of papers to download would be huge
paper_download = 1

if paper_download:

    def download_pdfs(row, dir_pdfs=dir_pdfs):
        try:
            r = requests.get(row["pdfUrls"], stream=True)
            with dir_pdfs.joinpath(f"{row['id']}.pdf").open("wb") as f:
                f.write(r.content)
        except:
            pass

    pdf_test.foreach(download_pdfs)
