In [1]:
import sys
print(sys.version)

2.7.12 |Anaconda 4.2.0 (64-bit)| (default, Jul  2 2016, 17:42:40) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]


In [2]:
spark

<pyspark.sql.session.SparkSession at 0x7f465ea1ec10>

In [4]:
import spacy

## Download and Subset Data

We will first subset down the dataset of Amazon Book reviews located at [this link](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz).  This dataset contains 8,898,041 book reviews.

This dataset includes reviews (ratings, text, helpfulness votes), product metadata (descriptions, category information, price, brand, and image features), and links (also viewed/also bought graphs).  For more information please refer to [this page](http://jmcauley.ucsd.edu/data/amazon/).

In [3]:
url = "s3n://galvanize-ds/reviews_Books_5.json.gz"

full_review_df = spark.read.json(url)

In [4]:
full_review_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



Let's subset our dataframe using the [sample DataFrame method](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sample) to only include 0.2% of the review text which will leave us with approximately 17,700 reviews.

In [5]:
review_subset = full_review_df.select('reviewText', 'overall') \
                              .sample(False, 0.002, 42)

# View how many reviews are left
print(review_subset.count())

17513


In [6]:
review_subset.show(10, truncate=True)

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|Set in the Great ...|    4.0|
|I don't know how ...|    5.0|
|"Water for Elepha...|    5.0|
|I wanted to see h...|    5.0|
|I found this very...|    4.0|
|The ur-text for t...|    5.0|
|I was told would ...|    5.0|
|Too many threads ...|    3.0|
|Totally great wri...|    5.0|
|I won't rehash th...|    3.0|
+--------------------+-------+
only showing top 10 rows



In [7]:
import pyspark as ps    # for the pyspark suite
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
import string
import unicodedata

from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS
import spacy
import numpy as np

from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

import sys


def extract_bow_from_raw_text(text_as_string):
    """ Extracts bag-of-words from a raw text string.

    Parameters
    ----------
    text (str): a text document given as a string

    Returns
    -------
    list : the list of the tokens extracted and filtered from the text
    """
    if (text_as_string == None):
        return []

    if (len(text_as_string) < 1):
        return []

    # Load nlp object if it isn't accessible
    if 'nlp' not in globals():
        global nlp
        try:
            # When running locally
            nlp = spacy.load('en')
        except RuntimeError:
            # When running on AWS EMR Cluster
            nlp = spacy.load('en', via='/mnt/spacy_en_data/')

    # Run through spacy English module
    doc = nlp(text_as_string)

    # Part's of speech to keep in the result
    pos_lst = ['ADJ', 'ADV', 'NOUN', 'PROPN', 'VERB']

    # Lemmatize text and split into tokens
    tokens = [token.lemma_.lower() for token in doc if token.pos_ in pos_lst]
    
    stop_words = {'book', 'author', 'read', "'", 'character'}.union(ENGLISH_STOP_WORDS)

    # Remove stop words
    no_stop_tokens = [token for token in tokens if token not in stop_words]

    return(no_stop_tokens)


def indexing_pipeline(input_df, **kwargs):
    """ Runs a full text indexing pipeline on a collection of texts contained
    in a DataFrame.

    Parameters
    ----------
    input_df (DataFrame): a DataFrame that contains a field called 'text'

    Returns
    -------
    df : the same DataFrame with a column called 'features' for each document
    wordlist : the list of words in the vocabulary with their corresponding IDF
    """
    inputCol_ = kwargs.get("inputCol", "text")
    vocabSize_ = kwargs.get("vocabSize", 5000)
    minDF_ = kwargs.get("minDF", 2.0)

    tokenizer_udf = udf(extract_bow_from_raw_text, ArrayType(StringType()))
    df_tokens = input_df.withColumn("bow", tokenizer_udf(col(inputCol_)))

    cv = CountVectorizer(inputCol="bow", outputCol="vector_tf", vocabSize=vocabSize_, minDF=minDF_)
    cv_model = cv.fit(df_tokens)
    df_features_tf = cv_model.transform(df_tokens)

    idf = IDF(inputCol="vector_tf", outputCol="features")
    idfModel = idf.fit(df_features_tf)
    df_features = idfModel.transform(df_features_tf)

    return(df_features, np.array(cv_model.vocabulary))

In [8]:
review_df, vocab = indexing_pipeline(review_subset, inputCol='reviewText')

review_df.printSchema()
review_df.persist()

print(vocab[:20])

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- bow: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector_tf: vector (nullable = true)
 |-- features: vector (nullable = true)

[u'book' u'read' u'story' u'love' u"'" u'character' u'make' u'just' u'time'
 u'good' u'really' u'life' u'think' u'know' u'great' u'author' u'write'
 u'way' u'want' u'series']


## Train LDA Model

Now that we have a DataFrame with column `features` containing a vector object representing the [Tf-Idf](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) values for our words, we can apply the [Latent Dirichlet allocation algorithm contained in the `ml` package](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.clustering.LDA).

For the sake of this demonstration we will be specifying 3 clusters.

In [9]:
from pyspark.ml.clustering import LDA

lda = LDA(k=10, seed=42, optimizer='em', featuresCol='features')
model = lda.fit(review_df)

In [10]:
sc.defaultParallelism

3

In [16]:
type(model)

pyspark.ml.clustering.DistributedLDAModel

In [11]:
import pandas as pd

model_description = model.describeTopics(20).toPandas()

In [12]:
vocab = np.array(vocab)

In [13]:
for idx, row in model_description.iterrows():
    print("Top words associated with topic {}:".format(row['topic']))
    print("{}\n".format(vocab[row['termIndices']]))

Top words associated with topic 0:
[u"'" u'story' u'book' u'make' u'read' u'character' u'love' u'just' u'time'
 u'know' u'world' u'think' u'life' u'really' u'author' u'good' u'great'
 u'people' u'write' u'want']

Top words associated with topic 1:
[u'love' u'story' u'character' u"'" u'read' u'just' u'life' u'time'
 u'novel' u'feel' u'really' u'make' u'want' u'know' u'series' u'book'
 u'woman' u'think' u'man' u'enjoy']

Top words associated with topic 2:
[u'book' u"'" u'work' u'use' u'author' u'read' u'make' u'story' u'series'
 u'time' u'good' u'chapter' u'way' u'write' u'great' u'just' u'people'
 u'life' u'character' u'think']

