# Lemmatize CORDIS projects using Spark NLP

In [1]:
from sparknlp.base import *
from sparknlp.annotator import *
import sparknlp
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pathlib import Path

In [2]:
from pyspark.ml.clustering import LDA as pysparkLDA

corpusFile = Path("/export/usuarios_ml4ds/jarenas/github/IntelComp/ITMT/topicmodeler/testproject/TMmodels/sparkkk/corpus.parquet")
df = spark.read.parquet(f"file://{corpusFile.resolve().as_posix()}")
lda = pysparkLDA(featuresCol="bow", maxIter=20, k=25,
            optimizer='online', subsamplingRate=0.05,
            optimizeDocConcentration=True,
            docConcentration=25*[0.2])
ldaModel = lda.fit(df)

df.show(n=3, vertical=True, truncate=1000)

22/08/11 13:33:38 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 8.7 MiB
22/08/11 13:33:43 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 13.1 MiB
22/08/11 13:33:54 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 8.7 MiB
22/08/11 13:33:57 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 13.1 MiB
22/08/11 13:34:01 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 8.7 MiB
22/08/11 13:34:03 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 13.1 MiB
22/08/11 13:34:06 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 8.7 MiB
22/08/11 13:34:07 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 13.1 MiB
22/08/11 13:34:10 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 8.7 MiB
22/08/11 13:34:11 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 13.1 MiB
22/08/11 13:34:14 WARN scheduler.DA

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id     | 115843                                                                          

                                                                                

In [46]:
from pyspark.ml.functions import vector_to_array
import numpy as np

df = spark.read.parquet(f"file://{corpusFile.resolve().as_posix()}")
df = ldaModel.transform(df).select("topicDistribution").withColumn('topicDistribution', vector_to_array('topicDistribution'))
thetas32 = np.array([el[0] for el in df.toPandas().values.tolist()])
thetas32.sum(axis=1)
print(thetas32.shape)


22/08/11 15:35:07 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
	at org.apache.spark.scheduler.EventLoggingListener.r

(60596, 25)


In [49]:
ldaModel.describeTopics().show(n=3, vertical=True, truncate=1000)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 topic       | 0                                                                                                                                                                                                                                 
 termIndices | [4, 3, 52, 65, 31, 17, 27, 8, 43, 47]                                                                                                                                                                                             
 termWeights | [0.010632095659496191, 0.007795066999781101, 0.006849610171850877, 0.006711525464863618, 0.0061415744490459015, 0.005932557592324106, 0.005390082172262677, 0.005039874745339203, 0.00489998538561223, 0.004594162372837045]      
-RECORD 1-----------------------

In [62]:
from sklearn.preprocessing import normalize
betas = normalize(ldaModel.topicsMatrix().toArray().T, axis=1, norm='l1')
print(betas[0,:10])
print(betas.shape)

[0.00261458 0.00297528 0.00015629 0.00779507 0.0106321  0.0030569
 0.00148275 0.00041544 0.00503987 0.00031139]
(25, 22797)


## 1. Read project information and concatenate the `title` and `objective` fields

In [3]:
%%time

# Loading papers table text fields, and concatenating them for lemmatization
projects = spark.sql("SELECT projectID, title, objective FROM parquet.`/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet/projects.parquet`")
#projects = spark.read.parquet('/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet/projects.parquet')
projects = projects.repartition(numPartitions=1000)

#Concatenate text fields to lemmatize
projects = (
    projects.withColumn("rawtext",F.concat_ws('. ', "title", "objective"))
    .select("projectID","rawtext")
)

print('Number of projects before language filtering:', projects.count())

22/08/10 14:11:21 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/08/10 14:11:21 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/08/10 14:11:24 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/08/10 14:11:24 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore jarenas@192.168.148.225
22/08/10 14:11:24 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
22/08/10 14:11:24 WARN metastore.ObjectStore: Failed to get database parquet, returning NoSuchObjectException

Number of projects before language filtering: 61163
CPU times: user 25.7 ms, sys: 5.95 ms, total: 31.7 ms
Wall time: 16.5 s


                                                                                

## 2. Filter project abstracts that are not in English Language

In [4]:
%%time

#Pipeline for language detection
documentAssembler = DocumentAssembler() \
    .setInputCol("rawtext") \
    .setOutputCol("document")

languageDetector = LanguageDetectorDL.pretrained() \
    .setInputCols("document") \
    .setOutputCol("language")

pipeline = Pipeline() \
    .setStages([
      documentAssembler,
      languageDetector
    ])

#Apply language detection pipeline
projects = pipeline.fit(projects).transform(projects)
projects = (
    projects.filter(F.col("language.result")[0]=="en")
    .drop("language")
)

print('Number of projects in English:', projects.count())

ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
[ | ]ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
Download done! Loading the resource.
[ / ]



[ — ]



[ \ ]

                                                                                

[ | ]

2022-08-10 14:12:07.355070: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-08-10 14:12:07.425992: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:354] MLIR V1 optimization pass is not enabled


[OK!]




Number of projects in English: 60626
CPU times: user 68.7 ms, sys: 17.1 ms, total: 85.8 ms
Wall time: 54.9 s


                                                                                

## 3. Define and Run Lemmatization Pipeline

   - We work on documents created in Subsection 2
   - Sentence Detection and Tokenizer applied to detect tokens
   - Lemmatization is carried out
   - Stopwords are applied
   - Punctuation symbols are removed
   - Result is converted back from Spark NLP annotations to string format

In [5]:
%%time 

#Next, we carry out the lemmatization pipeline

sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["token"]) \
    .setOutputCol("lemma")

stopWords = StopWordsCleaner() \
    .setInputCols(["lemma"]) \
    .setOutputCol("cleanlemma")

normalizer = Normalizer() \
    .setInputCols(["cleanlemma"]) \
    .setOutputCol("normalizedlemma") \
    .setLowercase(True) \
    .setCleanupPatterns(["""[^\w\d\s]"""])

finisher = Finisher() \
     .setInputCols(['normalizedlemma'])

pipeline = Pipeline() \
    .setStages([
#      documentAssembler,
      sentenceDetector,
      tokenizer,
      lemmatizer,
      stopWords,
      normalizer,
      finisher
])

#We apply pipeline and recover lemmas as string
projects = pipeline.fit(projects).transform(projects)

udf_back2str = F.udf(lambda x:' '.join(list(x)), StringType() )
projects = (
    projects.withColumn("lemmas",udf_back2str(F.col("finished_normalizedlemma")))
    .drop("finished_normalizedlemma")
)

#Show results of validation for n papers
projects.show(n=10, truncate=120, vertical=True)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
Download done! Loading the resource.




[ — ]

                                                                                

[OK!]


[Stage 14:>                                                         (0 + 1) / 1]

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------
 projectID | 963941                                                                                                                   
 rawtext   | Acquiring assembly skills by robot learning. Present-day industrial robots are made for the purpose of repeating seve... 
 lemmas    | acquiring assembly skill robot learn presentday industrial robot make purpose repeat several task thousands time them... 
-RECORD 1-----------------------------------------------------------------------------------------------------------------------------
 projectID | 268351                                                                                                                   
 rawtext   | Manycore Application Development and Modeling Environment. The multicore revolution (parallel computing on a chip) is... 
 lemmas    | manycore application development modeling 

                                                                                

## 4. Save a table with `projectID`, `rawtext` and `lemmas` to HDFS

In [6]:
%%time

#Save calculated lemmas to HDFS
dir_parquet = Path("/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet")

projects.coalesce(40).write.parquet(
    dir_parquet.joinpath(f"projects_NLP.parquet").as_posix(),
    mode="overwrite",
)

                                                                                

CPU times: user 55.4 ms, sys: 2.32 ms, total: 57.8 ms
Wall time: 33 s


## 5. Optional: Check that the generated table looks OK

In [21]:
%%time

#Test that the saved table is correct
projects = spark.read.parquet('/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet/projects_NLP.parquet')
print('Number of lemmatized projects:', projects.count())
projects.show(n=10, truncate=120, vertical=True)



Number of lemmatized projects: 61163
-RECORD 0-----------------------------------------------------------------------------------------------------------------------------
 projectID | 689074                                                                                                                   
 lemmas    | holistic optimisation ship design operation life cycle maritime product typically associate large investment seldom b... 
-RECORD 1-----------------------------------------------------------------------------------------------------------------------------
 projectID | 251681                                                                                                                   
 lemmas    | development nitric oxide releasing stent treatment coronary artery disease coronary artery disease one lead cause dea... 
-RECORD 2-----------------------------------------------------------------------------------------------------------------------------
 projectID | 75605

                                                                                

In [22]:
%%time

projects = spark.read.parquet('/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet/projects.parquet')
lemmas = spark.read.parquet('/export/ml4ds/IntelComp/Datalake/CORDIS/20220221/new_parquet/projects_NLP.parquet')

projects_lemmas = (projects.join(lemmas, projects.projectID ==  lemmas., "right")
                      .drop(lemmas.id)
                )

print('Number of projects in joint table:', projects_lemmas.count())
projects_lemmas.show(n=10, truncate=120, vertical=True)

AttributeError: 'DataFrame' object has no attribute 'id'

In [18]:
%%time

#Using SQL Expression
papers_lemmas = papers_lemmas.filter(F.array_contains("fieldsOfStudy", 'Computer Science'))
print('Number of papers in Computer Science:', papers_lemmas.count())
papers_lemmas.show(n=10, truncate=120, vertical=True)

                                                                                

Number of papers in Computer Science: 13654631


[Stage 70:>                                                         (0 + 1) / 1]

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------
 id            | 00004ddfe8089303589fb12cddc05fefc7a0bd96                                                                                 
 fieldsOfStudy | [Computer Science]                                                                                                       
 lemmas        | using static total causal ordering protocols achieve ordered view synchrony view synchronous communication vsc servic... 
-RECORD 1---------------------------------------------------------------------------------------------------------------------------------
 id            | 000051c2d8eff18654e5eaf3e636c02028ef96bb                                                                                 
 fieldsOfStudy | [Computer Science]                                                                                                       
 lemmas        | author thi

                                                                                