In [1]:
import pandas as pd
import pyspark as spark
from pyspark.sql import SparkSession
import re
from pyspark.sql.types import ArrayType, StringType, FloatType
import struct
from pyspark.sql.functions import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark - Text Processing") \
    .getOrCreate()

In [3]:
df = spark.read.csv("/FileStore/tables/hacker_news_sample.csv", header=True)
rawdata = df.select("text", "by").filter(df['type'] == 'story').filter(df['text'] != "null")

In [4]:
rawdata.count()

In [5]:
def cleanup_text(record):
    text = record
    words = text.split()
    
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['']
    stopwords = stopwords_core + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    

    text_out = [re.sub(r'<[^>]*>',' ',word) for word in words] # Remove html tags
    text_out = [re.sub(r'http://\S+|https://\S','',word) for word in words] #remove valid links
    text_out = [re.sub(r'http:\S+|https://\S+',' ',word) for word in words] #remove invalid links
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]     # Remove stopwords and words under X length
    return text_out

udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(rawdata['text']))

In [6]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF

In [7]:
# rescaledData.select('features').show(10)

In [8]:
from pyspark.ml.clustering import LDA

lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

ldatopics = ldamodel.describeTopics()
#ldatopics.show(25)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

In [9]:
ldaResults = ldamodel.transform(rescaledData)
ldaResults.show()

In [10]:
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())

# Extract document weights for Topics 12 and 20
enrichedData = ldaResults.withColumn("Topic_12", udf_breakout_array(lit(12), ldaResults.topicDistribution)).withColumn("topic_20", udf_breakout_array(lit(20), ldaResults.topicDistribution)) 

In [11]:
enrichedData.select("text", "by", "Topic_12").sort(desc("Topic_12")).show(5)

In [12]:
# enrichedData.select("text", "Topic_12").filter(enrichedData['by'] == 'karl_gluck').sort(desc("Topic_12")).show(5)

In [13]:
data = enrichedData.toPandas()

In [14]:
max_idx = []
for i in range(len(data2['topicDistribution'])):
  tempHigh = -100000
  tempHighIdx = -1
  for j in range(len(data2['topicDistribution'][i])):
    if data2['topicDistribution'][i][j] > tempHigh:
      tempHigh = data2['topicDistribution'][i][j]
      tempHighIdx = j
  
  max_idx.append(tempHighIdx)

data2['maxTopic'] = max_idx

In [15]:
data2['maxTopic'].value_counts()
mostPopular = [1,4,0,15]

In [16]:
# Visualization

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.colors as mcolors
import numpy as np

cols = [color for name, color in mcolors.TABLEAU_COLORS.items()]  # more colors: 'mcolors.XKCD_COLORS'

fig, axes = plt.subplots(2,2,figsize=(16,14), dpi=160, sharex=True, sharey=True)

for i, ax in enumerate(axes.flatten()):    
    df_dominant_topic_sub = data2.loc[data2.maxTopic == mostPopular[i], :]
    doc_lens = [len(d) for d in df_dominant_topic_sub.words]
    
    ax.hist(doc_lens, bins = 1000, color=cols[i])
    ax.tick_params(axis='y', labelcolor=cols[i], color=cols[i])
    sns.kdeplot(doc_lens, color="black", shade=False, ax=ax.twinx())
    ax.set(xlim=(0, 1000), xlabel='Document Word Count')
    ax.set_ylabel('Number of Documents', color=cols[i])
    ax.set_title('Topic: '+str(mostPopular[i]), fontdict=dict(size=16, color=cols[i]))

fig.tight_layout()
fig.subplots_adjust(top=0.90)
plt.xticks(np.linspace(0,1000,9))
fig.suptitle('Distribution of Document Word Counts by Dominant Topic', fontsize=22)
display()