In [1]:
#initial imports
import csv
import pandas as pd
import numpy as np

import pyspark as ps    # for the pyspark suite


#NLP imports
from src.nltk_pipe import indexing_pipeline
from pyspark.ml.clustering import LDA

In [2]:
#setting up spark
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df JulesVerne") \
            .getOrCreate()

sc = spark.sparkContext

In [14]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
import string
import unicodedata

import nltk

from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.stem.snowball import SnowballStemmer
from nltk.util import ngrams
from nltk import pos_tag
from nltk import RegexpParser

from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

import sys

def extract_bow_from_raw_text(text_as_string):
    """Extracts bag-of-words from a raw text string.
    Parameters
    ----------
    text (str): a text document given as a string
    Returns
    -------
    list : the list of the tokens extracted and filtered from the text
    """
    if (text_as_string == None):
        return []

    if (len(text_as_string) < 1):
        return []

    if sys.version_info[0] < 3:
        nfkd_form = unicodedata.normalize('NFKD', unicode(text_as_string))
    else:
        nfkd_form = unicodedata.normalize('NFKD', str(text_as_string))

    text_input = str(nfkd_form.encode('ASCII', 'ignore'))

    sent_tokens = sent_tokenize(text_input)

    tokens = list(map(word_tokenize, sent_tokens))

    sent_tags = list(map(pos_tag, tokens))

    grammar = r"""
        SENT: {<(J|N).*>}                # chunk sequences of proper nouns
    """

    cp = RegexpParser(grammar)
    ret_tokens = list()
    stemmer_snowball = SnowballStemmer('english')

    for sent in sent_tags:
        tree = cp.parse(sent)
        for subtree in tree.subtrees():
            if subtree.label() == 'SENT':
                t_tokenlist = [tpos[0].lower() for tpos in subtree.leaves()]
                t_tokens_stemsnowball = list(map(stemmer_snowball.stem, t_tokenlist))
                #t_token = "-".join(t_tokens_stemsnowball)
                #ret_tokens.append(t_token)
                ret_tokens.extend(t_tokens_stemsnowball)
            #if subtree.label() == 'V2V': print(subtree)
    #tokens_lower = [map(string.lower, sent) for sent in tokens]

    return(ret_tokens)


def indexing_pipeline(input_df, **kwargs):
    """Runs a full text indexing pipeline on a collection of texts contained in a DataFrame.
    Parameters
    ----------
    input_df (DataFrame): a DataFrame that contains a field called 'text'
    Returns
    -------
    df : the same DataFrames with a column called 'features' for each document
    wordlist : the list of words in the vocabulary with their corresponding IDF
    """
    inputCol_ = kwargs.get("inputCol", "reviews")
    vocabSize_ = kwargs.get("vocabSize", 5000)
    minDF_ = kwargs.get("minDF", 2.0)

    # ugly: to add that to our slave nodes so that it finds the bootstrapped nltk_data
    nltk.data.path.append('/home/hadoop/nltk_data')

    extract_bow_from_raw_text("")  # ugly: for instanciating all dependencies of this function
    tokenizer_udf = udf(extract_bow_from_raw_text, ArrayType(StringType()))
    df_tokens = input_df.withColumn("bow", tokenizer_udf(col(inputCol_)))

    cv = CountVectorizer(inputCol="bow", outputCol="vector_tf", vocabSize=vocabSize_, minDF=minDF_)
    cv_model = cv.fit(df_tokens)
    df_features_tf = cv_model.transform(df_tokens)

    idf = IDF(inputCol="vector_tf", outputCol="features")
    idfModel = idf.fit(df_features_tf)
    df_features = idfModel.transform(df_features_tf)

    return(df_features, cv_model.vocabulary)

In [4]:
df = pd.read_csv('data/reviews_unique.csv')

In [8]:
reviews = df.reviews
ls_attr = df.attractions

In [5]:
spark_df = spark.createDataFrame(df)

In [31]:
df_total, ls_total = indexing_pipeline(spark_df)

In [41]:
len(df_total.columns)

6

In [48]:
ls_total[200:300]

[u'detail',
 u'center',
 u'summer',
 u'right',
 u'servic',
 u'light',
 u'happi',
 u'lunch',
 u'highlight',
 u'war',
 u'step',
 u'station',
 u'ancient',
 u'centr',
 u'varieti',
 u'stun',
 u'onli',
 u'money',
 u'magnific',
 u'even',
 u'least',
 u'entri',
 u'opera',
 u'trail',
 u'set',
 u'drink',
 u'paint',
 u'sun',
 u'noth',
 u'adult',
 u'insid',
 u'same',
 u'second',
 u'opportun',
 u'piec',
 u'help',
 u'clear',
 u'look',
 u'quiet',
 u'avail',
 u'reason',
 u'perform',
 u'space',
 u'path',
 u'bar',
 u'number',
 u'sculptur',
 u'hard',
 u'feel',
 u'sceneri',
 u'plant',
 u'eleph',
 u'gorgeous',
 u'week',
 u'modern',
 u'floor',
 u'flower',
 u'famous',
 u'middl',
 u'waterfal',
 u'make',
 u'cruis',
 u'queue',
 u'dolphin',
 u'travel',
 u'ship',
 u'import',
 u'young',
 u'various',
 u'relax',
 u'home',
 u'villag',
 u'audio',
 u'fall',
 u'close',
 u'countri',
 u'structur',
 u'cave',
 u'theatr',
 u'pleas',
 u'aquarium',
 u'cours',
 u'wife',
 u'disappoint',
 u'must',
 u'seat',
 u'front',
 u'fountain'

In [None]:
lda_total = LDA(k=20, optimizer="em")
model_total = lda_total.fit(df_total)
topics_total = model_total.describeTopics().collect()

In [33]:
for topic in topics:
    print("- TOPIC {} -".format(topic[0]))
    topickeys = topic[1]
    topicvalues = topic[2]
    for i in range(len(topickeys)):
        print("  - word '{}': {}".format(ls_total[topickeys[i]],topicvalues[i]))
#print("--- [time elapsed: {}]".format(time.time()-t_begin))

- TOPIC 0 -
  - word 'build': 0.0623157180202
  - word 'new': 0.0424890351302
  - word 'island': 0.024226885583
  - word 'audio': 0.0184868332883
  - word 'children': 0.0178801621863
  - word 'hotel': 0.0175890600618
  - word 'dear': 0.0160662963942
  - word 'boat': 0.0136781364991
  - word 'close': 0.0132073301112
  - word 'mani': 0.0107445206337
- TOPIC 1 -
  - word 'templ': 0.0781235369527
  - word 'zoo': 0.0757329021635
  - word 'sever': 0.0370382179936
  - word 'collect': 0.0283182115014
  - word 'fantast': 0.019732223729
  - word 'road': 0.0163277550923
  - word 'much': 0.0156146280723
  - word 'climb': 0.0150178690864
  - word 'heart': 0.0145702458646
  - word 'chanc': 0.0107870421279
- TOPIC 2 -
  - word 'inform': 0.0330075223316
  - word 'big': 0.0181275971522
  - word 'photo': 0.0153232248022
  - word 'facil': 0.0104507422189
  - word 'full': 0.00982983631205
  - word 'trip': 0.00903992715895
  - word 'memori': 0.00899826595316
  - word 'stroll': 0.0065412915939
  - word 'hou

# Test Model

## Test dataframe

In [23]:
r_test = reviews[0:len(reviews):50]
a_test = ls_attr[0:len(ls_attr):50]
df_test = pd.DataFrame({'attractions': a_test, 'reviews': r_test})

## Testing LDA

In [24]:
spark_df_test = spark.createDataFrame(df_test)

In [25]:
df_test.shape

(38, 2)

In [26]:
df_test_nlp, ls_test_nlp = indexing_pipeline(spark_df_test)

In [27]:
lda = LDA(k=6, optimizer="em")
model = lda.fit(df_test_nlp)
topics = model.describeTopics().collect()

In [None]:
for topic in topics:
    print("- TOPIC {} -".format(topic[0]))
    topickeys = topic[1]
    topicvalues = topic[2]
    for i in range(len(topickeys)):
        print("  - word '{}': {}".format(ls_test_nlp[topickeys[i]],topicvalues[i]))
#print("--- [time elapsed: {}]".format(time.time()-t_begin))

In [30]:
len(topics)

6