In [1]:
# general pyspark
from pyspark import SparkContext
from pyspark import SparkConf

# conf = SparkConf().setMaster("local").setAppName("svd_cluster.py")
# sc = SparkContext(conf = conf)

#import mllib
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import StandardScalerModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix, BlockMatrix
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.clustering import BisectingKMeans, BisectingKMeansModel

# python imports
from math import sqrt
import numpy as np
from numpy import array
import os, csv, sys, time
from random import randint
from itertools import izip, izip_longest
import string
import translitcodec

sc = SparkContext.getOrCreate()

In [2]:
def get_tfidf(rdd, verbose=True):
    if verbose: print("in get_tfidf")
    # While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
    # First to compute the IDF vector and second to scale the term frequencies by IDF.
    hashingTF = HashingTF()
    tf = hashingTF.transform(rdd)
    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
    # spark.mllib's IDF implementation provides an option for ignoring terms
    # which occur in less than a minimum number of documents.
    # In such cases, the IDF for these terms is set to 0.
    # This feature can be used by passing the minDocFreq value to the IDF constructor.
    idfIgnore = IDF(minDocFreq=1).fit(tf)
    tfidf_rdd = idfIgnore.transform(tf)
    # rdd of SparseVectors [(doc_id_i: {word_id_j: tfidfscore_j, ...}), ... }]
    # or m docs x n counts
    return tfidf_rdd

def get_svd(tfidf_rdd, n_topics=3, verbose=True):
    if verbose: print("in get_svd")
    # distributed matrix
    matrix_rdd = RowMatrix(tfidf_rdd)
#     left singular vectors
#     type = RowMatrix
#     svd_u = svd.U
#     array of DenseVectors, m_documents x n_topics
#     [[topic_i, ...], ...]
#     return svd_u.rows.collect()
    svd = matrix_rdd.computeSVD(n_topics, computeU=True)
    return svd

def save_svd_U(svd_U_rdd, fn="svd_u_results.npz"):
    np.savez(fn,np.array(svd_U_rdd))

#     sentence = "aa bb ab" * 10 + "a cd " * 10
#     localDoc = [sentence, sentence]
#     doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
#     model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc)save_cluster_metrics
# i think it is expecting a list of document lists [[word1, word2,...], ...]
def get_word2vec(rdd):
    word2vec = Word2Vec()
    model = word2vec.fit(ads_rdd)

def save_cluster_predictions(cluster_results, model="km", fn="cluster_results.pkl"):
    results_fn = "{}_{}".format(model, fn)
    np.savez(results_fn, cluster_results)

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))


# Save and load model
def save_cluster_model(clusters, fn="test_model"):
    clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

def load_cluster_model(clusters, fn="test_model"):
    sameModel = KMeansModel.load(sc, fn)


def save_cluster_metrics(model, score, svd_dims, k=None, max_iters=None, clust_size=None, doc_concept=None, topic_concept=None):
    print('saving cluster metrics to csv')
    row = [model, score, svd_dims, k, max_iters, clust_size, doc_concept, topic_concept]
    with open(fn, 'a+') as f:
        writer = csv.writer(f)
        writer.writerow(row)

In [6]:
# def get_input(fn):

# fn = "cl_tiny.csv"
fn = "combined_ads.csv"
cur_dir = os.path.abspath(os.curdir)
input_file_path = os.path.normpath(os.path.join(cur_dir, "..", "data", fn))
print(input_file_path)

os.path.isfile(input_file_path)

C:\Users\shane\programming\cs657_mining_massive_datasets\craigslist_clustering\data\combined_ads.csv


True

In [7]:
# [(postTitle, postingURL, postLocation, time, lat, long, address, dateRetrieved, post_date, ad), ...]
# tiny input has 30 reviews
raw_ads = sc.textFile(input_file_path)
# processed_rdd.take(2)

In [8]:
raw_ads.first()

u'ad,title,url,loc,scrape stamp,lat,long,addr,scrape date,source'

In [None]:
def check_ascii_text(text):
    for c in 
    

In [127]:
import translitcodec
str('ldkjfk'.encode('translit/short').encode('ascii'))

'ldkjfk'

In [9]:
header = raw_ads.first()
# just pull ads
raw_ads = raw_ads.filter(lambda line: line != header)
raw_ads.first()
# raw_ads = raw_ads.filter(lambda line: line !=header).map(lambda x: (x[0], x[1]))
processed_rdd = raw_ads.map(lambda x: str(x.encode('utf-8'))).map(lambda x: x.split(",",2))
processed_rdd.take(3)

[['Best Asian Masseuse!!Professional Sweet Asian StaffSuper Clean and Cozy Private RoomsAmazing Price!! Come and Try!!Real Experienced Asian Magic Hands!!!!!Deep TissueSwedish MassageOpen 7 days a week 9:30am-9:30pm715 East Glenn Ave Suite 101 Auburn AL 36830Call  show contact info',
  '\xe2\x9d\xa4\xe2\x9d\xa4Grand Opening Oasis Massage \xe2\x9d\xa4\xe2\x9d\xa4 3345215388',
  'https://auburn.craigslist.org/thp/d/grand-opening-oasis-massage/6417553605.html,Auburn,12/8/2017 11:32,32.609014,-85.467371,715 east glenn Ave suite 101,20:10.8,CL1'],
 ['"Mon-Sat 9am-7:30pmSun-11am-5pmDeep tissue',
  'Swedish',
  'Relaxation mMili-Mon and Sat -$FireFight-Fri-$Call 4aptText 4infoLic Info only in person",Sun mas,https://montgomery.craigslist.org/thp/d/sun-mas/6420838785.html,montgomery,12/11/2017 5:08,32.3129,-86.2421,east blvd at vaughn,20:11.5,CL1'],
 [' ',
  '',
  'https://montgomery.craigslist.org/thp/d/wbest-asian-massagecall/6420405684.html,, ,,,,20:12.4,CL1']]

In [10]:
kv_rdd = processed_rdd.map(lambda x: ((x[0], x[1]), x[2]))
print(kv_rdd.take(3))
only_ads_rdd = kv_rdd.map(lambda x: x[1]).map(lambda text: ''.join(x for x in text if ord(x) < 128))
only_ads_rdd.take(3)



[(('Best Asian Masseuse!!Professional Sweet Asian StaffSuper Clean and Cozy Private RoomsAmazing Price!! Come and Try!!Real Experienced Asian Magic Hands!!!!!Deep TissueSwedish MassageOpen 7 days a week 9:30am-9:30pm715 East Glenn Ave Suite 101 Auburn AL 36830Call  show contact info', '\xe2\x9d\xa4\xe2\x9d\xa4Grand Opening Oasis Massage \xe2\x9d\xa4\xe2\x9d\xa4 3345215388'), 'https://auburn.craigslist.org/thp/d/grand-opening-oasis-massage/6417553605.html,Auburn,12/8/2017 11:32,32.609014,-85.467371,715 east glenn Ave suite 101,20:10.8,CL1'), (('"Mon-Sat 9am-7:30pmSun-11am-5pmDeep tissue', 'Swedish'), 'Relaxation mMili-Mon and Sat -$FireFight-Fri-$Call 4aptText 4infoLic Info only in person",Sun mas,https://montgomery.craigslist.org/thp/d/sun-mas/6420838785.html,montgomery,12/11/2017 5:08,32.3129,-86.2421,east blvd at vaughn,20:11.5,CL1'), ((' ', ''), 'https://montgomery.craigslist.org/thp/d/wbest-asian-massagecall/6420405684.html,, ,,,,20:12.4,CL1')]


['https://auburn.craigslist.org/thp/d/grand-opening-oasis-massage/6417553605.html,Auburn,12/8/2017 11:32,32.609014,-85.467371,715 east glenn Ave suite 101,20:10.8,CL1',
 'Relaxation mMili-Mon and Sat -$FireFight-Fri-$Call 4aptText 4infoLic Info only in person",Sun mas,https://montgomery.craigslist.org/thp/d/sun-mas/6420838785.html,montgomery,12/11/2017 5:08,32.3129,-86.2421,east blvd at vaughn,20:11.5,CL1',
 'https://montgomery.craigslist.org/thp/d/wbest-asian-massagecall/6420405684.html,, ,,,,20:12.4,CL1']

In [21]:
filtered_ads = only_ads_rdd.filter(lambda x: x!=" ")

In [22]:
hashingTF = HashingTF()
tf = hashingTF.transform(filtered_ads)


In [38]:
svd_n = 10
k = 4
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidf_rdd = idfIgnore.transform(tf)
matrix_rdd = RowMatrix(tfidf_rdd)
svd_i = matrix_rdd.computeSVD(svd_n, computeU=True)
rdd = svd_i.U.rows
clust_model = KMeans.train(rdd, k)
cost = clust_model.computeCost(rdd)
save_cluster_metrics("kmeans", cost, svd_n, k=k)
predictions_rdd = rdd.map(lambda x: (x, clust_model.predict(x)))
kmeans_predictions_fn="predictions_{}_topics{}_k{}.npy".format("kmeans", svd_n, k)
save_cluster_predictions(np.array(predictions_rdd.collect()), model="km", fn=kmeans_predictions_fn)


saving cluster metrics to csv


saving cluster metrics to csv


In [35]:

# [ad0, ad1, ..]
# processed = raw_ads.filter(lambda line: line !=header).map(lambda x: (x[0], x[1]))
# ads_rdd = raw_ads.map(lambda x: x.split(",", 2)).map(lambda x: ((x[0], x[1]),x[2].encode('utf-8', 'ignore')))
# ads_rdd.take(3)
# processed_rdd = ads_rdd.map(lambda x: x.split(",", 2))#.map(lambda x: ((x[0], x[1]), x[2]))
# processed_rdd = ads_rdd.map(lambda x: x.split(",", 1)).map(lambda x: (int(x[0]), x[1]))
# processed_rdd = ads_rdd.map(lambda x: [(int(i[0]), i[1]) for i in x.split(",", 1)])
# processed_rdd.take(3)
# processed_rdd.first()
# ads_rdd.first()

In [62]:
run_kmeans_gs(filtered_rdd)

k:2, topic:2
in get_tfidf


Py4JJavaError: An error occurred while calling o716.fitIDF.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 128.0 failed 1 times, most recent failure: Lost task 0.0 in stage 128.0 (TID 226, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:67)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.fitIDF(PythonMLLibAPI.scala:672)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)


In [18]:
def run_kmeans_gs(rdd, verbose=True):
    model = "bimeans"
    ks = [x for x in range(2, 5, 2)]
    svd_topics = [x for x in range(2, 5, 2)]
    for topic in svd_topics:
        for k in ks:
            if verbose: print("k:{}, topic:{}".format(k, topic))
            # calculate tfidf scores
            tfidf_rdd = get_tfidf(rdd)
            
            # transform bag of words to svd
            # the svd object has U, Sigma, V
            # 
            svd = get_svd(tfidf_rdd, topic)
            
            # run kmeans with left singular vectors 
            predictions_rdd = kmeans(svd, topic, k)
            kmeans_predictions_fn="predictions_{}_topics{}_k{}.npy".format(model, topic, k)
            save_cluster_predictions(np.array(predictions_rdd.collect()), model="km", fn=kmeans_predictions_fn)
    
# k_means
# Build the model (cluster the data)
# kmeans(rdd, k, maxIterations, runs, InitializationMode, seed, initializationSteps, epsilon, initialModel)
def kmeans(svd, svd_dims, k=2, n_iters=10, save_model=False, verbose=True):
    model="kmeans"
    if verbose: print("in kmeans")
        
    # left singular vectors, U
    # array of DenseVectors, m_documents x n_topics
    # [ doc_i, doc_i+1, ...]
    # [[topic_j_score, topic_j+1_score ...], ...]
    rdd = svd.U.rows
    
    # Build the model (cluster the data)
    model = KMeans.train(rdd, k, maxIterations=n_iters)

    # Evaluate clustering
    cost = model.computeCost(rdd)
    save_cluster_metrics("bimeans", cost, svd_dims, k=k)
    
    if save_model:
        save_cluster_model(model, fn)
    

    # returns an rdd of [(topic_values, cluster_id), ...]
    return rdd.map(lambda x: (x, model.predict(x)))


In [None]:
predictions = run_bimeans_gs(ads_rdd)

In [12]:
def run_bimeans_gs(rdd, verbose=True):
    model = "bimeans"
    predictions = []
    ks = [x for x in range(2, 10, 2)]
    minDivisibleClusterSize = [float(x/100.0) for x in range(1, 50, 101)], #percent
    svd_topics = [x for x in range(2, 10, 2)]
    for topic in svd_topics:
        for k in ks:
            for c_size in minDivisibleClusterSize:
                if verbose: print("k:{}, topic:{}, cluster_size:{}".format(k, topic, c_size))
                # calculate tfidf scores
                tfidf_rdd = get_tfidf(rdd)

                # transform bag of words to svd
                svd = get_svd(tfidf_rdd, topic)
                if verbose: print(svd)

                # run kmeans with left singular vectors 
                predictions_rdd = bimeans(svd, topic, k, c_size)
                
                predictions.append(predictions_rdd)
    return predictions    
#     bimeans_predictions_fn="predictions_{}_topics{}_k{}_csize{}.npy".format(model, topic, k, c_size)
#     save_cluster_predictions(np.array(predictions), model="bimeans", fn=bimeans_predictions_fn)

def bimeans(svd, svd_dims, k, cluster_size, verbose=True, save_model=False):
    model = "bimeans"
    if verbose: print("in {}".format(model))
        
    # left singular vectors, U
    # array of DenseVectors, m_documents x n_topics
    # [ doc_i, doc_i+1, ...]
    # [[topic_j_score, topic_j+1_score ...], ...]
    rdd = svd.U.rows
    
    # Build the model (cluster the data)
#     model = BisectingKMeans.train(rdd, k=k, minDivisibleClusterSize=cluster_size)
    model = BisectingKMeans.train(rdd)

    # Evaluate clustering
    cost = model.computeCost(rdd)
    save_cluster_metrics("bimeans", cost, svd_dims, k=k, clust_size=cluster_size)
    
    if save_model:
        save_cluster_model(model, fn)
        
    # returns an rdd of [(topic_values, cluster_id), ...]
    return rdd.map(lambda x: (x, model.predict(x)))

In [None]:
run_lds_gs(ads_rdd)


In [None]:
def run_lda_gs(rdd, verbose=True):

    model = "lda"
    predictions = None
    ks = [x for x in range(2, 3, 2)]
    doc_concepts = [float(x/100.0) for x in range(1, 10)], # default -1.0
    topic_concepts =[float(x/100.0) for x in range(1, 10)] # default -1.0
    for k in ks:
        for d_concept in doc_concepts:
            for t_concept in topic_concepts:
                if verbose: print("k:{}, d_concept:{}, t_concept:{}".format(k, d_concept, t_concept))
                # calculate tfidf scores
                tfidf_rdd = get_tfidf(rdd)

                # transform bag of words to svd
                svd = get_svd(tfidf_rdd, topic)
                if verbose: print(svd)

                # run kmeans with left singular vectors 
                predictions_rdd = bimeans(svd, k, cluster_size)
                # first run
                if predictions is None:
                    #
                    predictions = predictions_rdd.collect()
                    print(type(predictions))
                else:
                    predictions.append(predictions_rdd.collect())
    bimeans_predictions_fn="predictions_{}_topics{}_k{}_csize{}.npz".format(model, topic, k, c_size)
    save_cluster_predictions(np.array(predictions), model="bimeans", fn=kmeans_predictions_fn)

def lda(rdd, k, save_model=False):
    model = "lda"
    if verbose: print("in {}", model)
        
    # Index documents with unique IDs
    corpus = rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

    # Cluster the documents into k topics using LDA
    model = LDA.train(corpus, k=3)

    # Output topics. Each is a distribution over words (matching word count vectors)
    print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
          + " words):")
    topics = model.topicsMatrix()
    for topic in range(3):
        print("Topic " + str(topic) + ":")
        for word in range(0, ldaModel.vocabSize()):
            print(" " + str(topics[word][topic]))

    # Evaluate clustering
    cost = model.computeCost(rdd)
    save_cluster_metrics("bimeans", cost, k=k, clust_size=cluster_size)
    
    if save_model:
        save_cluster_model(model, fn)
        
    # returns an rdd of [(topic_values, cluster_id), ...]
    return rdd.map(lambda x: (x, model.predict(x)))

In [None]:
run_gauss_gs(ads_rdd)

In [13]:
def run_gauss_gs(rdd, verbose=True):
    model = "bimeans"
    predictions = None
    ks = [x for x in range(2, 3, 2)]
    minDivisibleClusterSize = [float(x/100.0) for x in range(1, 50, 50)], #percent
    svd_topics = [x for x in range(2, 3, 2)]
    for topic in svd_topics:
        for k in ks:
            for c_size in minDivisibleClusterSize:
                if verbose: print("k:{}, topic:{}, cluster_size:{}".format(k, topic, c_size))
                # calculate tfidf scores
                tfidf_rdd = get_tfidf(rdd)

                # transform bag of words to svd
                svd = get_svd(tfidf_rdd, topic)
                if verbose: print(svd)

                # run kmeans with left singular vectors 
                predictions_rdd = bimeans(svd, k, c_size)
                # first run
                if predictions is None:
                    #
                    predictions = predictions_rdd.collect()
                    print(type(predictions))
                else:
                    predictions.append(predictions_rdd.collect())
    bimeans_predictions_fn="predictions_{}_topics{}_k{}_csize{}.npz".format(model, topic, k, c_size)
    save_cluster_predictions(np.array(predictions), model="bimeans", fn=kmeans_predictions_fn)

def gaussian(svd, k, cluster_size, verbose=True, save_model=False):
    model = "bimeans"
    if verbose: print("in kmeans")
        
    # left singular vectors, U
    # array of DenseVectors, m_documents x n_topics
    # [ doc_i, doc_i+1, ...]
    # [[topic_j_score, topic_j+1_score ...], ...]
    rdd = svd.U.rows
    
    # Build the model (cluster the data)
    model = BisectingKMeans.train(rdd, k=k, minDivisibleClusterSize=cluster_size)
    # Build the model (cluster the data)
    model = GaussianMixture.train(rdd, 2)
    
    
    # Evaluate clustering
    cost = model.computeCost(rdd)
    save_cluster_metrics("bimeans", cost, k=k, clust_size=cluster_size)
    
    if save_model:k
        save_cluster_model(model, fn)
        
    # returns an rdd of [(topic_values, cluster_id), ...]
    return rdd.map(lambda x: (x, model.predict(x)))


def gaussian_clustering(rdd):


    # Save and load model
    gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
    sameModel = GaussianMixtureModel\
        .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")

    # output parameters of model
    for i in range(2):
        print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
              "sigma = ", gmm.gaussians[i].sigma.toArray())


IndentationError: unexpected indent (<ipython-input-13-5ee1cdecf66a>, line 51)

In [None]:
"""
docConcentration or alpha, Concentration parameter (commonly named “alpha”) for the 
prior placed on documents’ distributions over topics (“theta”)

A high alpha-value will lead to documents being more similar in terms of what topics they contain     
The effect is based on topic distribution assumption
if symmetric distribution - high alpha means that each doc will contain a mix of most topics
if symmetric distribution - low alpha means that docs will contain a few topics
if asymmetric distribution - vice versa
--------------------

topicConcentration or beta – Concentration parameter (commonly named “beta” or “eta”)
for the prior placed on topics’ distributions over terms. (default: -1.0)

A high beta-value will lead to topics being more similar in terms of what words they contain.    
if symmetric distribution - high beta means that each doc will contain a mix of most words
if symmetric distribution - low alpha means that docs will contain a few words
if asymmetric distribution - vice versa
 
"""
cluster_model_params = {
   
    "lda":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20  
        "doc_con":[float(x/100.0) for x in range(1, 10)], # default -1.0
        "topic_con": [float(x/100.0) for x in range(1, 10)] # default -1.0
    }
    "bimeans":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20
        "minDivisibleClusterSize": [float(x/100.0) for x in range(1, 10)], #percent
    }
    "kmeans":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20
        "initializationMode": ["random", "k-means||")] #default k-means        
    }
    "gaus":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(90, 150, 10)], #default 100
    }
    "pic":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(90, 150, 10)], #default 100
    }
        
}
cluster_models = ["lda", "bimeans", "kmeans", "gaus", "pic"]

In [65]:
tf_rdd.take(3)

['"Daziran Massage TherapyCall to make appointment.1314 Westgate Parkway, Suite 4 Dothan, AlabamaWe specialize in Back Walking, Deep Tissue, Sport Massage...etc$60/hour $65/hour deep tissue, we also have 90 Mins and 2 hours session.out call only within Dothan area.Open Monday to Saturday, 10am - 9pm, Sunday 1pm - 8pmshow contact infoAll major credit cards accepted    ***Daziran Massage***334-446-3721"',
 '"331 SPAWelcome to best asian massage. shiatsu and swedish, walk on your back , firm or relaxing massage.Great table shower.Our place is very clean and very comfortable.Every therapists are licensed and very professional and friendly.You\'ll be a New person,In callHalf Hr: $60.00 with table showerOne Hr: $80.00 with table showerwe do accept credit cardsopen 9 am to 10 pm , 7days a weekTel:  show contact info*from panama city or beach: one mile before to I-10 on highway 331 to your left* from Tallahassee: Left turn to highway 331 (Exit 85) , we are 1 mile from I-10 on your right side b

In [64]:
tf_rdd = filtered_rdd.map(lambda x: x[1])
hashingTF = HashingTF()
tf = hashingTF.transform(filtered_rdd)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidf_rdd = idfIgnore.transform(tf)
matrix_rdd = RowMatrix(tfidf_rdd)
svd_i = matrix_rdd.computeSVD(5, computeU=True)
rdd = svd_i.U.rows
clusters = KMeans.train(rdd, 5)

#     left singular vectors
#     type = RowMatrix
#     svd_u = svd.U
#     array of DenseVectors, m_documents x n_topics
#     [[topic_i, ...], ...]
#     return svd_u.rows.collect()
# WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
# print("Within Set Sum of Squared Error = " + str(WSSSE))
# predicts = rdd.map(lambda x: (x, clusters.predict(x)))

Py4JJavaError: An error occurred while calling o838.fitIDF.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 130.0 failed 1 times, most recent failure: Lost task 0.0 in stage 130.0 (TID 230, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:67)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.fitIDF(PythonMLLibAPI.scala:672)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)


In [None]:
def run_bimeans_gs(rdd, verbose=True):
    predictions = None
    ks = [x for x in range(2, 20, 2)]
    minDivisibleClusterSize: [float(x/100.0) for x in range(1, 50,10)], #percent
    svd_topics = [x for x in range(2, 10, 2)]
    for topic in svd_topics:
        for k in ks:
            # calculate tfidf scores
            tfidf_rdd = get_tfidf(rdd)
            
            # transform bag of words to svd
            svd = get_svd(tfidf_rdd, topic)
            if verbose: print(svd)
            
            # run kmeans with left singular vectors 
            predictions_rdd = bimeans(svd, k)
            if predictions is None:
                predictions = predictions_rdd.collect()
                print(type(predictions))
            else:
                predictions.append(predictions_rdd.collect())
    kmeans_predictions_fn="predictions_k{}_topics{}.npz"
    save_cluster_predictions(np.array(predictions), model="km", fn=kmeans_predictions_fn)
    save_cluster_predictions(np.array(predictions), model="km", fn=kmeans_predictions_fn)

In [None]:
# def transpose_rdd()
# tfidf_rdd.flatMap(lambda x: x).take(3)
# flatMap by keeping the column position
# flat_rdd = tfidf_rdd.flatMap(lambda row: row.map(lambda col: (col, row.indexOf(col))))
# flat_rdd.take(3)
# .map(v => (v._2, v._1)) // key by column position
# .groupByKey.sortByKey   // regroup on column position, thus all elements from the first column will be in the first row
# .map(_._2)              // discard the key, keep only value
# df = rdd.toDF()
# # Grab data from first columns, since it will be transposed to new column headers
# new_header = [i[0] for i in dt.select("_1").rdd.map(tuple).collect()]

# # Remove first column from dataframe
# dt2 = dt.select([c for c in dt.columns if c not in ['_1']])

# # Convert DataFrame to RDD
# rdd = dt2.rdd.map(tuple)

# # Transpose Data
# rddT1 = rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
# rddT2 = rddT1.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
# rddT3 = rddT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
# rddT4 = rddT3.map(lambda x: map(lambda (i, y): y , x))

# # Convert back to DataFrame (along with header)
# df = rddT4.toDF(new_header)

# return df

In [None]:
"""
docConcentration or alpha, Concentration parameter (commonly named “alpha”) for the 
prior placed on documents’ distributions over topics (“theta”)

A high alpha-value will lead to documents being more similar in terms of what topics they contain     
The effect is based on topic distribution assumption
if symmetric distribution - high alpha means that each doc will contain a mix of most topics
if symmetric distribution - low alpha means that docs will contain a few topics
if asymmetric distribution - vice versa
--------------------

topicConcentration or beta – Concentration parameter (commonly named “beta” or “eta”)
for the prior placed on topics’ distributions over terms. (default: -1.0)

A high beta-value will lead to topics being more similar in terms of what words they contain.    
if symmetric distribution - high beta means that each doc will contain a mix of most words
if symmetric distribution - low alpha means that docs will contain a few words
if asymmetric distribution - vice versa
 
"""
cluster_model_params = {
   
    "lda":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20  
        "doc_con":[float(x/100.0) for x in range(1, 10)], # default -1.0
        "topic_con": [float(x/100.0) for x in range(1, 10)] # default -1.0
    }
    "bimeans":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20
        "minDivisibleClusterSize": [float(x/100.0) for x in range(1, 10)], #percent
    }
    "kmeans":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(15, 30, 10)], #default 20
        "initializationMode": ["random", "k-means||")] #default k-means        
    }
    "gaus":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(90, 150, 10)], #default 100
    }
    "pic":{
        "k":[x for x in range(2, 20, 2)],
        "max_iters": [x for x in range(90, 150, 10)], #default 100
    }
        
}
cluster_models = ["lda", "bimeans", "kmeans", "gaus", "pic"]