In [1]:
import os
# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip3 install --ignore-installed pyspark==2.4.5

# Install Spark NLP
! pip3 install --ignore-installed spark-nlp==2.4.5

# Install nltk
! pip3 install nltk

openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-8u275-b01-0ubuntu1~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)
Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 65kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 39.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257928 sha256=03edaa7b261723adc428cf83e8964cac9bbdcd2d0d7b5a9ff29550913d90f530
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939

In [2]:
import sparknlp

spark = sparknlp.start()

In [3]:
from pyspark.sql import functions as F

data_path = './Part9.csv'
data = spark.read.csv(data_path, header=True)

In [4]:
data.columns

['_c0',
 'Unnamed: 0',
 'date',
 'year',
 'month',
 'day',
 'author',
 'title',
 'article',
 'url',
 'section',
 'publication']

In [5]:
text_col = 'article'
article_text = data.select(text_col).filter(F.col(text_col).isNotNull())

In [6]:
article_text.limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                   article|
+------------------------------------------------------------------------------------------+
|big ticket The biggest sales in October, though, were once again at 220 Central Park So...|
|George P. Kent testified that he saw President Trump’s demands for Ukraine to “initiate...|
|The top American diplomat in Ukraine, who is to be the first witness in public House im...|
|“The Minutes,” which aims to capture fractious American politics by focusing on a City ...|
|(Want to get this briefing by email? Here’s the sign-up.) Good evening. Here’s the late...|
+------------------------------------------------------------------------------------------+



In [7]:
from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

In [8]:
from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

In [9]:
from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

In [10]:
from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [11]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [12]:
from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

In [13]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(['lemmatized']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [14]:
from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'lemmatized']) \
    .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


In [15]:
from sparknlp.base import Finisher

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos']) \

In [16]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([documentAssembler,                  
                 tokenizer,
                 normalizer,                  
                 lemmatizer,                  
                 stopwords_cleaner, 
                 pos_tagger,
                 ngrammer,  
                 finisher])

In [17]:
processed_review = pipeline.fit(article_text).transform(article_text)

In [18]:
processed_review.limit(5).show()

+--------------------+--------------------+--------------------+--------------------+
|             article|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|big ticket The bi...|[big, ticket, big...|[big, ticket, the...|[JJ, NN, DT, JJ, ...|
|George P. Kent te...|[george, p, kent,...|[george, p, kent,...|[NNP, NN, NN, NN,...|
|The top American ...|[top, american, d...|[the, top, americ...|[DT, JJ, JJ, NN, ...|
|“The Minutes,” wh...|[minute, aim, cap...|[the, minute, whi...|[DT, NN, WDT, NN,...|
|(Want to get this...|[want, get, brief...|[want, to, get, t...|[VB, TO, VB, DT, ...|
+--------------------+--------------------+--------------------+--------------------+



In [19]:
from pyspark.sql import types as T

udf_join_arr = F.udf(lambda x: ' '.join(x), T.StringType())
processed_review  = processed_review.withColumn('finished_pos', udf_join_arr(F.col('finished_pos')))

In [20]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')

In [21]:
pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')

In [22]:
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [23]:
pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams']) \

In [24]:
pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,                  
                 pos_tokenizer,
                 pos_ngrammer,  
                 pos_finisher])

In [25]:
processed_review = pos_pipeline.fit(processed_review).transform(processed_review)

In [26]:
processed_review.columns

['article',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams']

In [27]:
processed_review.select('finished_ngrams', 'finished_pos_ngrams').limit(5).show()

+--------------------+--------------------+
|     finished_ngrams| finished_pos_ngrams|
+--------------------+--------------------+
|[big, ticket, the...|[JJ, NN, DT, JJ, ...|
|[george, p, kent,...|[NNP, NN, NN, NN,...|
|[the, top, americ...|[DT, JJ, JJ, NN, ...|
|[the, minute, whi...|[DT, NN, WDT, NN,...|
|[want, to, get, t...|[VB, TO, VB, DT, ...|
+--------------------+--------------------+



In [28]:
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ['JJ', 'NN', 'NNS', 'VB', 'VBP']]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

In [29]:
processed_review = processed_review.withColumn('filtered_unigrams',
                                               udf_filter_pos(F.col('finished_unigrams'), 
                                                              F.col('finished_pos')))

In [30]:
processed_review.select('filtered_unigrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                         filtered_unigrams|
+------------------------------------------------------------------------------------------+
|[big, ticket, sales, october, south, million, fashion, hilfiger, wife, duplex, atop, la...|
|[p, kent, testify, trump, demand, ukraine, initiate, motivate, corrupt, senior, state, ...|
|[american, diplomat, ukraine, witness, house, hearings, investigator, rudolph, giuliani...|
|[aim, fractious, politics, focus, city, council, also, hammer, jessie, mueller, city, c...|
|[want, brief, signup, even, soar, hope, country, agree, roll, tariff, final, agreement,...|
+------------------------------------------------------------------------------------------+



In [31]:
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]
    
udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

In [32]:
processed_review = processed_review.withColumn('filtered_ngrams',
                                               udf_filter_pos_combs(F.col('finished_ngrams'),
                                                                    F.col('finished_pos_ngrams')))

In [33]:
processed_review.select('filtered_ngrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                           filtered_ngrams|
+------------------------------------------------------------------------------------------+
|[big_ticket, big_sales, central_park, fashion_designer, designer_tommy, tommy_hilfiger,...|
|[p_kent, kent_testify, see_president, president_trump, trump_demand, motivate_prosecuti...|
|[top_american, american_diplomat, first_witness, public_house, house_impeachment, impea...|
|[capture_fractious, fractious_american, american_politics, city_council, council_meet, ...|
|[signup_good, late_stock, stock_soar, uschina_trade, trade_deal, be_part, final_agreeme...|
+------------------------------------------------------------------------------------------+



In [34]:
from pyspark.sql.functions import concat

processed_review = processed_review.withColumn('final', 
                                               concat(F.col('filtered_unigrams'), 
                                                      F.col('filtered_ngrams')))

In [35]:
processed_review.select('final').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                     final|
+------------------------------------------------------------------------------------------+
|[big, ticket, sales, october, south, million, fashion, hilfiger, wife, duplex, atop, la...|
|[p, kent, testify, trump, demand, ukraine, initiate, motivate, corrupt, senior, state, ...|
|[american, diplomat, ukraine, witness, house, hearings, investigator, rudolph, giuliani...|
|[aim, fractious, politics, focus, city, council, also, hammer, jessie, mueller, city, c...|
|[want, brief, signup, even, soar, hope, country, agree, roll, tariff, final, agreement,...|
+------------------------------------------------------------------------------------------+



In [36]:
from pyspark.ml.feature import CountVectorizer

import time
start = time.time()

tfizer = CountVectorizer(inputCol='final', outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

end = time.time()
print('Train time: ', end - start)

Train time:  679.9560749530792


In [37]:
from pyspark.ml.feature import IDF

import time
start = time.time()

idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

end = time.time()
print('Train time: ', end - start)

Train time:  669.8000109195709


In [38]:
tfidf_result.show(1)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             article|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|   filtered_unigrams|     filtered_ngrams|               final|         tf_features|     tf_idf_features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|big ticket The bi...|[big, ticket, big...|[big, ticket, the...|[JJ, NN, DT, JJ, ...|[JJ, NN, DT, JJ, ...|[big, ticket, sal...|[big_ticket, big_...|[big, ticket, sal...|(262144,[1,4,6,8,...|(262144,[1,4,6,8,...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----

In [39]:
from pyspark.ml.clustering import LDA

num_topics = 10
max_iter = 10

import time
start = time.time()

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

end = time.time()
print('Train time: ', end - start)

Train time:  7726.943974733353


In [40]:
vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [41]:
num_top_words = 20

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

+-----+------------------------------------------------------------------------------------------+
|topic|                                                                                topicWords|
+-----+------------------------------------------------------------------------------------------+
|    0|[yard, goal, score, game, touchdown, ornament, pass, second, tide, third_period, period...|
|    1|[mr, trump, new, year, president, like, make, say, would, company, get, think, wine, wa...|
|    2|[world_cup, hun_sen, q, prim, sokha, manuka, m_share, rapinoe, ipo_m, m_ipo_m, share_pr...|
|    3|[trade, oil, deal, market, prime_day, say, price, index, million, china, year, rise, cr...|
|    4|[sexy_man, man_alive, taliban, million, legend, sexy, company_coverage, source_text, it...|
|    5|[billion, million, revenue, quarter, profit, big_machine, net, year_early, company, swi...|
|    6|[la, epstein, los, patient, que, con, una, sex, garrett, de, crime, police, un, murder,...|
|    7|[de