# Test on Dataproc PySpark

1. https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-
2. https://github.com/JohnSnowLabs/spark-nlp/issues/232
    - issue: TypeError because the path for sparknlp is missung; the jar path is not called
    - solution: create a new conda envrionment named sparknlp

# Set-up

In [1]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [2]:
from google.cloud import storage

In [3]:
import sparknlp
spark = sparknlp.start()

#Ensure we are using the right kernel
spark.version

23/05/15 04:22:45 INFO SparkEnv: Registering MapOutputTracker
23/05/15 04:22:45 INFO SparkEnv: Registering BlockManagerMaster
23/05/15 04:22:45 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/15 04:22:45 INFO SparkEnv: Registering OutputCommitCoordinator


'3.3.0'

In [4]:
#!pwd
import os

# Google Bucket
# file name checkpoint_0512_sent_split.parquet
path_bucket = 'gs://msca-sp23-bucket/nlp_data'
dataPath = path_bucket + '/' + 'df_cleaned_0514.parquet'
#runtime_path = '/home/jupyter/data/nlp_final'

In [5]:
import pandas as pd
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 500)

import warnings
warnings.filterwarnings("ignore")

In [6]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## Build pipeline

In [7]:
%%time

df_raw = spark.read.parquet(dataPath)

                                                                                

CPU times: user 7.6 ms, sys: 0 ns, total: 7.6 ms
Wall time: 4.5 s


In [8]:
df_raw.printSchema()

root
 |-- url: string (nullable = true)
 |-- date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- text_split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text: string (nullable = true)



In [9]:
df_raw.limit(5)

                                                                                

url,date,title,text_split,text
http://en.people....,2021-03-18,Artificial intell...,[Chinese Japanese...,Chinese Japanese ...
http://newsparlia...,2020-02-27,Children With Aut...,[ Children With A...,Children With Au...
http://www.datawe...,2021-03-26,"Forget ML, AI and...","[Forget ML, AI an...","Forget ML, AI and..."
http://www.homeof...,2021-03-10,Strategy Analytic...,[Strategy Analyti...,Strategy Analytic...
http://www.itbusi...,2020-10-20,Olympus to Suppor...,[Search for: Ho...,Search for: Hom...


In [10]:
# select data
text_col = 'text'
news_df = df_raw.select(text_col).filter(F.col(text_col).isNotNull())
#news_text.limit(5)

import re
# remove spaces from column names
#new_cols = [F.col(column).alias(re.sub('\s*', '', column)) for column in news_df.columns]

In [11]:
news_df.printSchema()

root
 |-- text: string (nullable = true)



In [12]:
from sparknlp.base import *
from sparknlp.annotator import *
from nltk.corpus import stopwords

In [13]:
# build pipelines
documentAssembler = DocumentAssembler() \
                     .setInputCol(text_col) \
                     .setOutputCol('document')

tokenizer = Tokenizer() \
             .setInputCols(['document']) \
             .setOutputCol('tokenized')

normalizer = Normalizer() \
             .setInputCols(['tokenized']) \
             .setOutputCol('normalized') \
             .setLowercase(True)

lemmatizer = LemmatizerModel.pretrained() \
             .setInputCols(['normalized']) \
             .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
Download done! Loading the resource.
[ / ]

                                                                                

[OK!]


In [14]:
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

eng_stopwords = stopwords.words('english')
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('no_stop_lemmatized') \
     .setStopWords(eng_stopwords)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [15]:
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatized']) \
     .setOutputCol('pos')

allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']

chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[ | ]pos_anc download started this may take some time.
Approximate size to download 3.9 MB
Download done! Loading the resource.
[ / ]



[ — ]

                                                                                

[OK!]


In [16]:
finisher = Finisher() \
     .setInputCols(['ngrams']) # some bugs with unigram

In [17]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 pos_tagger,
                 chunker,
                 finisher])

In [18]:
%%time
processed_news = pipeline.fit(news_df).transform(news_df)



CPU times: user 101 ms, sys: 16.9 ms, total: 118 ms
Wall time: 1.28 s


In [19]:
processed_news.limit(5).toPandas()

                                                                                

Unnamed: 0,text,finished_ngrams
0,"Chinese Japanese French Spanish Russian Arabic Korean German Portuguese Thursday, March 18, 2021 Home Artificial intelligence improves parking efficiency in Chinese cities By Liu Shiyao People's Daily 09:16, March 18, 2021 Photo taken on July 1, 2019, shows a sign for electronic toll collection ETC newly set up at a roadside parking space on Yangzhuang road, Shijingshan district, Beijing. Some urban areas of the city started to use ETC system for roadside parking spaces since July 1,...","[Chinese Japanese French Spanish Russian Arabic, Korean German, Artificial intelligence, Chinese cities, Daily 09:16, March, electronic toll, Shijingshan district, urban areas, Daily Online/Li Wenming, artificial intelligence, electronic toll, significant improvement, normal lanes, mute time, wider, Wang, smart roadside, smart parking, intelligent system, full play, integrate AI, real economy, Traditional parking, actual needs, technical capacity, many deficiencies, traditional parking, ma..."
1,"Children With Autism Saw Their Learning and Social Skills Boosted After Playing With This AI Robot News Parliament Children With Autism Saw Their Learning and Social Skills Boosted After Playing With This AI Robot Author Recent Posts admin Latest posts by admin see all Mansplaining in conferences: How can we get him to forestall? February 27, 2020 Coronavirus Could Explode in the U.S. Overnight Like it Did in Italy February 27, 2020 Levi Strauss marks the nex...","[Social Skills, Social Skills, Recent Posts, Latest posts, next phase, corporate paid, social talents, such era, average autism, assistive robotic, named, incorrect Kiwi, Robotics discovered, higher social talents, mentary. Cameras, engagement ranged, possible distractions, domestic home, real time, educational and/o, therapeutic activity, proper, lead, assistive robotic. Haotian, similar manner, neurotypical other folks, individualized products, such youngsters, social talents, outstanding ..."
2,"Forget ML, AI and Industry 4.0 obsolescence should be your focus 26 February 2021 Test Rework Solutions Dataweek Home About us Back issues / E book / PDF EMP Handbook Subscribe Advertise Editor's Choice Multimedia, Videos Analogue, Mixed Signal, LSI Circuit System Protection Computer/Embedded Technology Design Automation DSP, Micros Memory Electronics Technology Enclosures, Racks, Cabinets Panel Products Events Interconnection Manufacturing / Productio...","[Micros Memory, Passive Components, Programmable Logic, Smart Home, Wireless, IoT, friendly version, new era, accelerated transformation, last eighteen, new timeline, careful planning, longer exists, pompous meeting, selected leading, green energy, due course.Whomeve, big businesses, automotive, defence, new installations, smaller turbines, multiple suppliers, biggest change, industrial asset, recent years, many rail, chief mechanical engineer, electronic failures, ideal means, cheap solut..."
3,"Strategy Analytics: 71 of Smartphones Sold Globally in 2021 will be AI Powered Consumer Electronics Net Skip to content Search for: HomeNewsStrategy Analytics: 71 of Smartphones Sold Globally in 2021 will be AI Powered News Strategy Analytics: 71 of Smartphones Sold Globally in 2021 will be AI Powered 7 hours ago BOSTON BUSINESS WIRE Strategy Analytics in a newly published report, S...","[AI Powered, Net Skip, Global Artificial Intelligence, Artificial Intelligence, rapidly implemented, various functions, intelligent power, virtual assistants, important technology, putational power, Analytics estimates, Associate Director, Ukonaho. Advantages, lower latency, overall lower power, Artificial Intelligence, deep learning, Artificial Intelligence, key technology, higher end, important tasks, longer battery, efficient power, digital assistants, other tasks, useful tools, own pe..."
4,"Search for: Home2020OctoberOlympus to Support Endoscopic AI Diagnosis Education for Doctors in India and to Launch AI Diagnostic Support Application News Olympus to Support Endoscopic AI Diagnosis Education for Doctors in India and to Launch AI Diagnostic Support Application TOKYO, Oct 20, 2020 ACN Newswire Olympus Corporation took part in a ground breaking project as a business promoter, in cooperation with the Min...","[Endoscopic AI, Diagnostic Support, Endoscopic AI, Diagnostic Support, Internal Affairs, MIC , entitled, International Expansion, High Magnifying, few endoscopists, diagnostic support, major medical institution, Asian Institute, Northern Yokohama, differential diagnosis, next generation, Internal Affairs, medical care, future leaders, last year, diagnostic support, endoscopic diagnostic support, enables observation, real time, optical magnification, real time, ultra high magnifying, neoplast..."


## Vectorization with PySpark

In [None]:
%%time
from pyspark.ml.feature import CountVectorizer
tfizer = CountVectorizer(inputCol='finished_ngrams',
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_news)
tf_result = tf_model.transform(processed_news)

In [None]:
from pyspark.ml.feature import IDF
idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 6
max_iter = 10
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)


In [None]:
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [None]:
num_top_words = 6
topics = lda_model
     .describeTopics(num_top_words)
     .withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=100)