# PySpark NLP Topic Modeling - News Articles
---

In [1]:
%%time

# Import libraries
import findspark
findspark.init()
import pandas as pd
import nltk
import string
import re
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark import SparkContext
from pyspark.sql.types import *
from sparknlp.base import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.sql.functions import udf
from gensim import corpora, models, similarities
import os
import gensim
from gensim.models import CoherenceModel
import gensim.corpora as corpora
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.model_selection import GridSearchCV
import numpy as np
import networkx as nx
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import colorcet as cc
import ast
import umap
import re
import spacy
from nltk.corpus import stopwords
from pyspark.sql import functions as sf
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import col
import operator

# Configure workspace
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

plt.style.use('fivethirtyeight')

# Function to parse documents
def process_text(x):
    
    x = x.lower()
    x = re.sub("[^a-zA-Z]", ' ', x)
    
    return x.split(' ')

# Set working directory
os.chdir('/Users/zxs/Documents/data/kaggle_articles/')

# Regex info
ws = re.compile(r'\s+')

# Load data for POS tagging
pos = spacy.load('en_core_web_sm')

# Initialize spark
spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "32g").appName('medium_nlp').getOrCreate()
sc = SparkContext.getOrCreate()
sc.setSystemProperty('spark.executor.memory', '32g')
sql = SQLContext(sc)

# Read data
data = {}

# Iterate
for file in os.listdir():
    
    data[file] = pd.read_csv(file)
    
# Combine results
df = pd.DataFrame()

for k, v in data.items():
    
    df = df.append(v)

# Subset columns
df = df[['title', 'publication', 'author', 'date', 'content']]
df.fillna('unknown', inplace = True)

# Pattern for parsing
ws = re.compile(r'\s+')

  from collections import Mapping, defaultdict


CPU times: user 7.83 s, sys: 1.66 s, total: 9.49 s
Wall time: 2min 8s


In [2]:
%%time

# Define data structure for pyspark
schema = StructType([StructField("title", StringType(), True),
                     StructField("publication", StringType(), True),
                     StructField('author', StringType(), True),
                     StructField('date', StringType(), True),
                     StructField('content', StringType(), True)])

# Convert data and process
df1 = spark.createDataFrame(df, schema)
df2 = df1.withColumn('words', sf.concat(sf.col('title'),sf.lit(' '), sf.col('content')))

# Tokenize text and sub unwanted chars
converter = udf(lambda x: process_text(x), ArrayType(StringType()))

# Apply pyspark func
df3 = df2.withColumn("parsed", converter(df2.words))

In [9]:
%%time

# TF
cv = CountVectorizer(inputCol = "parsed", outputCol = "raw", vocabSize = 1000, minDF = 100)
m = cv.fit(df3)
r = m.transform(df3)

# IDF
idf = IDF(inputCol = "raw", outputCol = "features")
idfm = idf.fit(r)
out = idfm.transform(r)

CPU times: user 17.5 ms, sys: 7.58 ms, total: 25.1 ms
Wall time: 48.5 s


In [10]:
%%time

# Train LDA
lda = LDA(k = 10, maxIter = 10)
model = lda.fit(out)

CPU times: user 20.3 ms, sys: 10.6 ms, total: 30.9 ms
Wall time: 48.1 s


In [59]:
%time print('Log Perplexity for LDA Topic Model: {}'.format(model.logPerplexity(out)))

CPU times: user 13.2 ms, sys: 8.19 ms, total: 21.4 ms
Wall time: 2min 13s


6.590102110786155

In [37]:
# Get topics from LDA model
tpcs = model.describeTopics(5).rdd  
tpcsw = tpcs.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

# Inspect
for idx, topic in enumerate(tpcsw):
    
    print("Topic # {}\n".format(idx))
    
    print([word for word in topic], '\n')

Topic # 0

['trump', 'house', 'senate', 'ryan', 'fbi'] 

Topic # 1

['sanders', 'i', 'you', 'clinton', 'students'] 

Topic # 2

['percent', 'korea', 'north', 'tax', 'oil'] 

Topic # 3

['russia', 'syria', 'isis', 'iran', 'u'] 

Topic # 4

['health', 'company', 'water', 'care', 'insurance'] 

Topic # 5

['trump', 'cruz', 'republican', 'rubio', 'campaign'] 

Topic # 6

['her', 'she', 'comey', 'i', 'clinton'] 

Topic # 7

['trump', 'china', 'clinton', 'obama', 'trade'] 

Topic # 8

['mr', 'court', 'law', 'refugees', 'judge'] 

Topic # 9

['i', 'police', 'my', 'he', 'you'] 



In [28]:
%%time

# Apply model to data for labeling
transformed = model.transform(out)
transformed.show(5)

+--------------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               title|   publication|              author|      date|             content|               words|              parsed|                 raw|            features|   topicDistribution|
+--------------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|House Republicans...|New York Times|          Carl Hulse|2016-12-31|WASHINGTON  —   C...|House Republicans...|[house, republica...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[0.61421097680255...|
|Rift Between Offi...|New York Times|Benjamin Mueller ...|2017-06-19|After the bullet ...|Rift Between Offi...|[rift, between, o...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[3.31000078164045...|
|Tyrus Wong, ‘Bamb..

In [77]:
# Function to select topic from weight
def select_topic(x):
    
    tpc, val = max(enumerate(x), key = operator.itemgetter(1))
    
    return tpc

In [78]:
topic_selector = udf(lambda x: select_topic(x), IntegerType())
t1 = transformed.withColumn('topic', topic_selector(transformed.topicDistribution))

In [79]:
%time t1.show()

+--------------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               title|   publication|              author|      date|             content|               words|              parsed|                 raw|            features|   topicDistribution|topic|
+--------------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|House Republicans...|New York Times|          Carl Hulse|2016-12-31|WASHINGTON  —   C...|House Republicans...|[house, republica...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[0.61421097680255...|    0|
|Rift Between Offi...|New York Times|Benjamin Mueller ...|2017-06-19|After the bullet ...|Rift Between Offi...|[rift, between, o...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[3.31000078164045.