In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()



import numpy as np

from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem.porter import PorterStemmer

from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import NMF as NMF_sklearn
import pickle
from nmf import NMF
import pymongo
from pymongo import MongoClient

def build_text_vectorizer(contents, use_tfidf=True, use_stemmer=False, max_features=None):
    '''
    Build and return a **callable** for transforming text documents to vectors,
    as well as a vocabulary to map document-vector indices to words from the
    corpus. The vectorizer will be trained from the text documents in the
    `contents` argument. If `use_tfidf` is True, then the vectorizer will use
    the Tf-Idf algorithm, otherwise a Bag-of-Words vectorizer will be used.
    The text will be tokenized by words, and each word will be stemmed iff
    `use_stemmer` is True. If `max_features` is not None, then the vocabulary
    will be limited to the `max_features` most common words in the corpus.
    '''
    
    Vectorizer = TfidfVectorizer if use_tfidf else CountVectorizer
    tokenizer = RegexpTokenizer(r"[\w']+")
    stem = PorterStemmer().stem if use_stemmer else (lambda x: x)
    stop_set = set(stopwords.words('english'))

    # Closure over the tokenizer et al.
    def tokenize(text):
        tokens = tokenizer.tokenize(text)
        stems = [stem(token) for token in tokens if token not in stop_set]
        return stems

    vectorizer_model = Vectorizer(tokenizer=tokenize, max_features=max_features)
    vectorizer_model.fit(contents)
    vocabulary = np.array(vectorizer_model.get_feature_names())

    # Closure over the vectorizer_model's transform method.
    def vectorizer(X):
        return vectorizer_model.transform(X).toarray()

    return vectorizer, vocabulary


def softmax(v, temperature=1.0):
    '''
    A heuristic to convert arbitrary positive values into probabilities.
    See: https://en.wikipedia.org/wiki/Softmax_function
    '''
    expv = np.exp(v / temperature)
    s = np.sum(expv)
    return expv / s


def hand_label_topics(H, vocabulary):
    '''
    Print the most influential words of each latent topic, and prompt the user
    to label each topic. The user should use their humanness to figure out what
    each latent topic is capturing.
    '''
    hand_labels = []
    for i, row in enumerate(H):
        top_five = np.argsort(row)[::-1][:20]
        print('topic', i)
        print('-->', ' '.join(vocabulary[top_five]))
        label = input('please label this topic: ')
        hand_labels.append(label)
        print()
    return hand_labels


def analyze_article(article_index, contents, web_urls, W, hand_labels):
    '''
    Print an analysis of a single NYT articles, including the article text
    and a summary of which topics it represents. The topics are identified
    via the hand-labels which were assigned by the user.
    '''
    #print(web_urls[article_index])
    #print(contents[article_index])
    probs = softmax(W[article_index], temperature=0.01)
    
    top_prob = 0
    top_cat = 0
    i = 0
    for prob, label in zip(probs, hand_labels):
        if prob > top_prob:
            top_prob = prob
            top_cat = i
        #print('--> {:.2f}% {}'.format(prob * 100, label))
        i = i + 1
    return top_cat, probs
    #print()
    #    gotta assign all of these to the correct bin and then tfidf them 


In [8]:
g_df1 = spark.read.csv('data/all-the-news/articles1.csv')
g_df2 = spark.read.csv('data/all-the-news/articles2.csv')
g_df3 = spark.read.csv('data/all-the-news/articles3.csv')
merge1 = g_df1.union(g_df2)
full_dataframe = merge1.union(g_df3)
#g_arr1 = g_df1.values
#g_arr2 = g_df2.values
#g_arr3 = g_df3.values



In [None]:
##############################################################################
#fix
#g_df1 = g_df_full #pd.read_csv('data/all-the-news/articles1.csv')

g_df = g_df_full #pd.read_pickle("data/articles.pkl")
g_contents = g_df.content
g_web_urls = g_df.url

# Build our text-to-vector vectorizer, then vectorize our corpus.
g_vectorizer, g_vocabulary = build_text_vectorizer(g_contents,
                             use_tfidf=True,
                             use_stemmer=False,
                             max_features=5000)
g_X = g_vectorizer(g_contents)

# We'd like to see consistent results, so set the seed.
np.random.seed(12345)

g_rand_articles = list(range(len(g_df_full)))
#for i in rand_articles:
#        analyze_article(i, contents, web_urls, W, hand_labels)

# Do it all again, this time using scikit-learn.
g_nmf = NMF_sklearn(n_components=7, max_iter=100, random_state=12345, alpha=0.0)
g_W = g_nmf.fit_transform(g_X)
g_H = g_nmf.components_
print('reconstruction error:', g_nmf.reconstruction_err_)
g_hand_labels = ['Garbage', 'GOP', 'Intl Politics', 'Econ', 'Dems', 'Crime', 'Election'] #hand_label_topics(g_H, g_vocabulary)



        
#pickle.dump(H,open('nyt_clusters_H.p','wb'))
#pickle.dump(W,open('nyt_clusters_W.p','wb'))


In [7]:
g_df1.

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string]

In [9]:
full_dataframe.count9

<bound method DataFrame.count of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string]>