# Environment Setup

In [2]:
#!pip install spark-nlp
#!pip install fastparquet 
#!pip install spark-nlp==2.6.1

In [3]:
import re
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

from pyspark.sql.functions import udf
import pyspark.sql.types as T

import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.ml.clustering import LDA

from utils import CUSTOM_STOP_WORDS

In [None]:
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.1")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()

spark

In [3]:
"""
from pyspark.sql import SparkSession

# start spark session configured for spark nlp
spark = SparkSession.builder \
     .master('local[*]') \
     .appName('Spark NLP') \
     .config('spark.jars.packages') \
     .getOrCreate()

spark
"""
print()




In [7]:
#spark = sparknlp.start()

#spark

In [8]:
sc = spark
sqlContext = SQLContext(spark)
sqlContext

<pyspark.sql.context.SQLContext at 0x7f5d55379ba8>

In [9]:
#sc = SparkContext('local', 'PySPARK LDA Example')
#sqlContext = SQLContext(sc)

# Load data

In [10]:
df_spark = sqlContext.read.parquet("newsgroup_20_data.parquet")
print(df_spark.count())
#df_spark.head(2)

18846


In [11]:
d = df_spark.head(1)
d = d[0]

print(d.asDict()['category'])
#print(d.asDict()['news'])

rec.sport.hockey


In [None]:
#(trainingData, testData) = df_spark.randomSplit([0.7, 0.3], seed = 100)
trainingData = df_spark.copy()

# Pre-Process Data

## 1. data cleaning

In [None]:
col_input = "news"
col_label = "category"
col_nlp = 'col_nlp'

In [None]:
def text_cleaner(sentence):
    
    # clean the punctuations
    punc_re = r'[^a-zA-Z0-9 &]'
    sentence = re.sub(punc_re, ' ', sentence)
    
    # tokens
    arr = sentence.split()
    
    # remove white spaces
    # lowercase
    # filter words having lenght <= 3
    arr = [word.strip().lower() for word in arr if word.isalpha() and len(word)>=4]
    
    arr = " ".join(arr)
    return arr

In [17]:
#data = trainingData.limit(1000)
data = trainingData

udf_text_cleaner = F.udf(text_cleaner, StringType())

data_train_clean = data.withColumn(col_nlp, udf_text_cleaner(col_input))

print(data_train_clean.count())
data_train_clean.limit(2).show()

1000
+--------------------+------------+--------------------+
|                news|    category|             col_nlp|
+--------------------+------------+--------------------+
| agate!ames!purdu...|misc.forsale|agate ames purdue...|
| agate!iat.holone...|   rec.autos|agate holonet psi...|
+--------------------+------------+--------------------+



## 2. vectorizer

In [25]:
def type_changer(sentence):
    return sentence.split(" ")
udf_type_changer = F.udf(type_changer, ArrayType(elementType=StringType()))

# get tokens
data_arr = data_train_clean.withColumn("col_nlp_arr", udf_type_changer(col_nlp))

# TF
cv = CountVectorizer(inputCol="col_nlp_arr", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(data_arr)
result_cv = cvmodel.transform(data_arr)

# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)

pp_train_data = result_tfidf.select('category', col_input, "features")

print(type(pp_train_data))
pp_train_data.limit(2).show()

<class 'pyspark.sql.dataframe.DataFrame'>
+------------+--------------------+--------------------+
|    category|                news|            features|
+------------+--------------------+--------------------+
|misc.forsale| agate!ames!purdu...|(2341,[1,5,9,10,1...|
|   rec.autos| agate!iat.holone...|(2341,[2,4,5,8,9,...|
+------------+--------------------+--------------------+



# LDA Model

## 1. train model

In [29]:
numTopics = 20 # number of topics
 
lda = LDA(k=numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True,
          maxIter = 10,           # number of iterations
          learningDecay = 0.51,   # kappa, learning rate
          learningOffset = 64.0,  # tau_0, larger values downweigh early iterations
          subsamplingRate = 0.05, # mini batch fraction 
          )
 
model = lda.fit(pp_train_data)
 
ll = model.logLikelihood(pp_train_data)
lp = model.logPerplexity(pp_train_data)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -2617672.0557389343
The upper bound on perplexity: 7.414561064286616


## 2. topic insights

In [34]:
model.vocabSize()

2341

In [37]:
model.describeTopics().first()

Row(topic=0, termIndices=[100, 152, 270, 173, 141, 143, 652, 79, 1333, 1207], termWeights=[0.024727203510017844, 0.023794269140414222, 0.019429802217925642, 0.01463371667707337, 0.014203720926158618, 0.011568025166145409, 0.0077366301886060765, 0.007355781593427101, 0.00674755516396202, 0.006632266944292659])

In [107]:
print("The topics described by their top-weighted terms:")
model.describeTopics(5).limit(6).show()

The topics described by their top-weighted terms:
+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[100, 152, 270, 1...|[0.02472720351001...|
|    1|[464, 1198, 516, ...|[0.03841356958562...|
|    2|[29, 15, 0, 12, 471]|[0.01256180374072...|
|    3|[149, 42, 16, 544...|[0.01433550461279...|
|    4|[606, 1619, 362, ...|[0.02711100743922...|
|    5|[622, 469, 752, 1...|[0.04554353468421...|
+-----+--------------------+--------------------+



In [38]:
model.topicsMatrix()

DenseMatrix(2341, 20, [66.9965, 18.5971, 28.7697, 20.0368, 51.356, 0.3535, 49.3757, 49.3248, ..., 0.3328, 0.3344, 0.4823, 0.3447, 0.3529, 0.6234, 0.3979, 0.3094], 0)

## 3. topic assignment

In [104]:
max_index = F.udf(lambda x: x.tolist().index(max(x)), IntegerType())


lda_train_data = model.transform(pp_train_data)
lda_train_data = lda_train_data.withColumn("topicID", max_index("topicDistribution"))

print(lda_train_data.count())
lda_train_data.limit(2).show()

1000
+------------+--------------------+--------------------+--------------------+-------+
|    category|                news|            features|   topicDistribution|topicID|
+------------+--------------------+--------------------+--------------------+-------+
|misc.forsale| agate!ames!purdu...|(2341,[1,5,9,10,1...|[0.00125609086439...|     11|
|   rec.autos| agate!iat.holone...|(2341,[2,4,5,8,9,...|[0.23104235605356...|      0|
+------------+--------------------+--------------------+--------------------+-------+



In [140]:
"""
topicDistribution : list of topic weights (len==num_topics)
"""
print()




## 4. topic model assesment

In [105]:
#lda_train_data.first()

In [108]:
X_topics = lda_train_data.select("category", "topicID").toPandas()

print(X_topics.shape)
X_topics.head(2)

(1000, 2)


Unnamed: 0,category,topicID
0,misc.forsale,11
1,rec.autos,0


In [134]:
def topic_metrics(df):
    #print(df.head(2))
    arr = df["topicID"].value_counts()
    max_topic = arr.index.values[0]
    perc_dominance = arr[max_topic] / arr.sum()
    
    result = pd.Series(data=[int(max_topic), perc_dominance], index=["category_pred", "perc_dominance"])
    
    return result

In [136]:
X = X_topics.copy()
#X = X_topics.head(10)

X_label_mapping = X.groupby("category").apply(topic_metrics).reset_index()
X_label_mapping["category_pred"] = X_label_mapping["category_pred"].astype("int")
X_label_mapping["perc_dominance"] = np.round(X_label_mapping["perc_dominance"], 2)
X_label_mapping

Unnamed: 0,category,category_pred,perc_dominance
0,alt.atheism,18,0.37
1,comp.graphics,12,0.46
2,comp.os.ms-windows.misc,12,0.74
3,comp.sys.ibm.pc.hardware,12,0.39
4,comp.sys.mac.hardware,12,0.39
5,comp.windows.x,12,0.38
6,misc.forsale,7,0.18
7,rec.autos,12,0.24
8,rec.motorcycles,14,0.22
9,rec.sport.baseball,14,0.27


In [None]:
dict_mapper = {}
for i in zip(X_label_mapping["category"], X_label_mapping["category_pred"]):
    dict_mapper[i[0]] = int(i[1])
    
X = X_topics.copy()
X["category_pred"] = X["topicID"].replace(dict_mapper)

print(X.shape)
X.head(2)

In [None]:
X_topics.info()