In [1]:
import random
import re
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from collections import defaultdict
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.sql import SQLContext

In [2]:
instances_file_train = "clickbait17-validation-170630/instances.jsonl"
truth_file_train     = "clickbait17-validation-170630/truth.jsonl"

In [3]:
# read thr data from files
instances = []
truth = []

with open(instances_file_train, "r") as inf:
    instances = [json.loads(x) for x in inf.readlines()]
with open(truth_file_train, "r") as inf:
    truth = [json.loads(x) for x in inf.readlines()]

In [4]:
# compact relevant data into one list of dicts
dataset = {}

# lists: postText, targetParagraphs, targetCaptions
for i in instances:
    dataset[i['id']] = {'postText': i['postText'], 'targetTitle': i['targetTitle'],
                        'targetDescription': i['targetDescription'], 'targetKeywords': i['targetKeywords'], 
                        'targetParagraphs': i['targetParagraphs'], 'targetCaptions': i['targetCaptions']}

for t in truth:
    dataset[t['id']]['truthMean'] = t['truthMean']

In [5]:
texts = [v['targetParagraphs'] for v in dataset.values()] 
flattened = [item for sublist in texts for item in sublist]

In [6]:
text_dict = {}
for i,t in enumerate(flattened):
    text_dict[i] = t

In [7]:
sqlContext = HiveContext(sc)

In [8]:
df = sqlContext.createDataFrame([(x,) for x in text_dict.values()], ['text'])

In [9]:
data = df.rdd

In [10]:
tokens = data                                                   \
    .map( lambda document: document['text'].strip().lower())            \
    .map( lambda document: re.split("[\s;,#]", document))       \
    .map( lambda word: [x for x in word if x.isalpha()])

In [11]:
termCounts = tokens                             \
    .flatMap(lambda document: document)         \
    .map(lambda word: (word, 1))                \
    .reduceByKey( lambda x,y: x + y)            \
    .map(lambda tuple: (tuple[1], tuple[0]))    \
    .sortByKey(False)

In [12]:
# Index each one and collect them into a map
vocabulary = termCounts                         \
    .map(lambda x: x[1])                        \
    .zipWithIndex()                             \
    .collectAsMap()

In [13]:
inv_voc = {value: key for (key, value) in vocabulary.items()}

In [14]:
# Convert the given document into a vector of word counts
def document_vector(document):
    id = document[1]
    counts = defaultdict(int)
    for token in document[0]:
        if token in vocabulary:
            token_id = vocabulary[token]
            counts[token_id] += 1
    counts = sorted(counts.items())
    keys = [x[0] for x in counts]
    values = [x[1] for x in counts]
    return (id, Vectors.sparse(len(vocabulary), keys, values))

# Process all of the documents into word vectors using the 
# `document_vector` function defined previously
documents = tokens.zipWithIndex().map(document_vector).map(list)

In [None]:
# Open an output file
with open("lda_model.txt", 'w') as f:
    lda_model = LDA.train(documents, k=50, maxIterations=int(1e3))

    topic_indices = lda_model.describeTopics(maxTermsPerTopic=int(1e3))
        
    # Print topics, showing the top-weighted 10 terms for each topic
    for i in range(len(topic_indices)):
        f.write("Topic #{0}\n".format(i + 1))
        for j in range(len(topic_indices[i][0])):
            f.write("{0}\t{1}\n".format(inv_voc[topic_indices[i][0][j]] \
                .encode('utf-8'), topic_indices[i][1][j]))
            

    f.write("{0} topics distributed over {1} documents and {2} unique words\n"  \
        .format(num_topics, documents.count(), len(vocabulary)))