# Lemmatize Semantic Scholar papers using Spark NLP

In [7]:
from sparknlp.base import *
from sparknlp.annotator import *
import sparknlp
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pathlib import Path

## 1. Read papers and concatenate the `title` and `paperAbstract` fields

In [8]:
%%time

# Loading papers table text fields, and concatenating them for lemmatization
S2papers = spark.sql("SELECT id, title, paperAbstract FROM parquet.`/export/ml4ds/IntelComp/Datalake/SemanticScholar/20220201/papers.parquet`")
S2papers = S2papers.repartition(numPartitions=20000)
##For development purposes only
#S2papers = S2papers.sample(fraction=0.0001)

#Concatenate text fields to lemmatize
S2papers = (
    S2papers.withColumn("rawtext",F.concat_ws('. ', "title", "paperAbstract"))
    .drop("title")
    .drop("paperAbstract")
)

print('Number of papers before language filtering:', S2papers.count())

22/03/22 01:10:18 WARN metastore.ObjectStore: Failed to get database parquet, returning NoSuchObjectException

Number of papers before language filtering: 204457855
CPU times: user 62.5 ms, sys: 5.27 ms, total: 67.8 ms
Wall time: 19.5 s


[Stage 17:>                                                         (0 + 1) / 1]                                                                                

## 2. Filter abstracts that are not in English Language

In [9]:
%%time

#Pipeline for language detection
documentAssembler = DocumentAssembler() \
    .setInputCol("rawtext") \
    .setOutputCol("document")

languageDetector = LanguageDetectorDL.pretrained() \
    .setInputCols("document") \
    .setOutputCol("language")

pipeline = Pipeline() \
    .setStages([
      documentAssembler,
      languageDetector
    ])

#Apply language detection pipeline
S2papers = pipeline.fit(S2papers).transform(S2papers)
S2papers = (
    S2papers.filter(F.col("language.result")[0]=="en")
    .drop("language")
)

print('Number of papers in English:', S2papers.count())

ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
[OK!]


[Stage 20:>                                                         (0 + 1) / 1]

Number of papers in English: 150077538
CPU times: user 1.37 s, sys: 236 ms, total: 1.61 s
Wall time: 2h 12min 1s


                                                                                

## 3. Define and Run Lemmatization Pipeline

   - We work on documents created in Subsection 2
   - Sentence Detection and Tokenizer applied to detect tokens
   - Lemmatization is carried out
   - Stopwords are applied
   - Punctuation symbols are removed
   - Result is converted back from Spark NLP annotations to string format

In [10]:
%%time 

#Next, we carry out the lemmatization pipeline

sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["token"]) \
    .setOutputCol("lemma")

stopWords = StopWordsCleaner() \
    .setInputCols(["lemma"]) \
    .setOutputCol("cleanlemma")

normalizer = Normalizer() \
    .setInputCols(["cleanlemma"]) \
    .setOutputCol("normalizedlemma") \
    .setLowercase(True) \
    .setCleanupPatterns(["""[^\w\d\s]"""])

finisher = Finisher() \
     .setInputCols(['normalizedlemma'])

pipeline = Pipeline() \
    .setStages([
      sentenceDetector,
      tokenizer,
      lemmatizer,
      stopWords,
      normalizer,
      finisher
])

#We apply pipeline and recover lemmas as string
S2papers = pipeline.fit(S2papers).transform(S2papers)

udf_back2str = F.udf(lambda x:' '.join(list(x)), StringType() )
S2papers = (
    S2papers.withColumn("lemmas",udf_back2str(F.col("finished_normalizedlemma")))
    .drop("rawtext")
    .drop("finished_normalizedlemma")
)

#Show results of validation for n papers
#S2papers.show(n=10, truncate=120, vertical=True)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
CPU times: user 94.8 ms, sys: 7.38 ms, total: 102 ms
Wall time: 4.17 s


## 4. Save a table with `id` and `lemmas` to HDFS

In [11]:
%%time

#Save calculated lemmas to HDFS
dir_parquet = Path("/export/ml4ds/IntelComp/Datalake/SemanticScholar/20220201")

S2papers.coalesce(1000).write.parquet(
    dir_parquet.joinpath(f"papers_NLP.parquet").as_posix(),
    mode="overwrite",
)

                                                                                

CPU times: user 2.33 s, sys: 403 ms, total: 2.73 s
Wall time: 3h 8min 48s


## 5. Optional: Check that the generated table looks OK

In [12]:
%%time

#Test that the saved table is correct
S2papers = spark.sql("SELECT * FROM parquet.`/export/ml4ds/IntelComp/Datalake/SemanticScholar/20220201/papers_NLP.parquet`")
print('Number of lemmatized papers:', S2papers.count())
S2papers.show(n=10, truncate=120, vertical=True)

22/03/22 09:20:40 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/03/22 09:20:40 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/03/22 09:20:40 WARN metastore.ObjectStore: Failed to get database parquet, returning NoSuchObjectException
                                                                                

Number of lemmatized papers: 150077538
-RECORD 0--------------------------------------------------------------------------------------------------------------------------
 id     | 64706fbc01a509210366aedac123fe534159e3b7                                                                                 
 lemmas | exhibition gustave courbet                                                                                               
-RECORD 1--------------------------------------------------------------------------------------------------------------------------
 id     | b16390c663cf43db0e9301cb81ddb00e894447d5                                                                                 
 lemmas | role k channel alveolar bronchial epithelial repair process                                                              
-RECORD 2--------------------------------------------------------------------------------------------------------------------------
 id     | 1ce5def24ba497a27c8742cb8b0

In [17]:
%%time

papers = spark.sql("SELECT id, fieldsOfStudy FROM parquet.`/export/ml4ds/IntelComp/Datalake/SemanticScholar/20220201/papers.parquet`")
lemmas = spark.sql("SELECT id, lemmas FROM parquet.`/export/ml4ds/IntelComp/Datalake/SemanticScholar/20220201/papers_NLP.parquet`")

papers_lemmas = (papers.join(lemmas, papers.id ==  lemmas.id, "right")
                      .drop(lemmas.id)
                )

print('Number of papers in joint table:', papers_lemmas.count())
papers_lemmas.show(n=10, truncate=120, vertical=True)

22/03/22 09:39:43 WARN metastore.ObjectStore: Failed to get database parquet, returning NoSuchObjectException
22/03/22 09:39:43 WARN metastore.ObjectStore: Failed to get database parquet, returning NoSuchObjectException
                                                                                

Number of papers in joint table: 150077680


[Stage 63:>                                                         (0 + 1) / 1]

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------
 id            | 000007d87901458ae6f88092ab0ac01388b11fcf                                                                                 
 fieldsOfStudy | []                                                                                                                       
 lemmas        | philippines people country call vernacular architecture call folk architecture mostly identify rural bahay kubo liter... 
-RECORD 1---------------------------------------------------------------------------------------------------------------------------------
 id            | 00001c5efb21112a47918810af4281e3922803b8                                                                                 
 fieldsOfStudy | []                                                                                                                       
 lemmas        | reason bec

                                                                                

In [18]:
%%time

#Using SQL Expression
papers_lemmas = papers_lemmas.filter(F.array_contains("fieldsOfStudy", 'Computer Science'))
print('Number of papers in Computer Science:', papers_lemmas.count())
papers_lemmas.show(n=10, truncate=120, vertical=True)

                                                                                

Number of papers in Computer Science: 13654631


[Stage 70:>                                                         (0 + 1) / 1]

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------
 id            | 00004ddfe8089303589fb12cddc05fefc7a0bd96                                                                                 
 fieldsOfStudy | [Computer Science]                                                                                                       
 lemmas        | using static total causal ordering protocols achieve ordered view synchrony view synchronous communication vsc servic... 
-RECORD 1---------------------------------------------------------------------------------------------------------------------------------
 id            | 000051c2d8eff18654e5eaf3e636c02028ef96bb                                                                                 
 fieldsOfStudy | [Computer Science]                                                                                                       
 lemmas        | author thi

                                                                                