In [1]:
import gc
import glob
import re
import math
from nltk.corpus import stopwords
from pyspark.sql.functions import collect_list
from pyspark.sql import Row
from pyspark.sql.types import *

sc = spark.sparkContext

# Here I creates the fields and the schema for the inverted index table (Dataframe)
inverted_index_fields = [StructField("words", StringType(), True), StructField("docID", IntegerType(), True)]
inverted_index_schema = StructType(inverted_index_fields)

# 
result = sqlContext.createDataFrame(sc.emptyRDD(), inverted_index_schema)


def all_words_vector(all_books, return_list = False):
    '''
    all_words_vector function:
    extract all the words from my corpus and "clean" them
    input:  all_books - list of all the books in my corpus (full path in my project for each one)
            return_list - Boolean type to deterime if the function returns the words as vector (list) or RDD
    output: vector (list) / RDD of all the words after "cleaning"
    '''
    all_words_vector = []
    for book in all_books:
        # Read the book file (.txt) and split it
        raw_txt=sc.textFile(book).flatMap(lambda line: line.split(" "))

        # Convert to lowercase
        lower_txt = raw_txt.map(lambda word:word.lower() if type(word) == str else word)
        
        # Remove duplicates
        lower_txt = lower_txt.distinct()

        # Remove non-alphabetic chars
        regex = re.compile('[^a-z]')
        clean_lower_txt = lower_txt.map(lambda word:word if word.isalnum() else regex.sub('', word)).filter(lambda word: word != '')

        # Convert the piplinedRDD to list to remove stop words
        Data = clean_lower_txt.collect()

        # Remove stop words
        text = ([word for word in Data if word not in (stopwords.words('english'))])
        all_words_vector += text
        
    if return_list:
        all_words_vector = list(set(all_words_vector))
        return all_words_vector
    else:
        all_words_vector_rdd = sc.parallelize(all_words_vector)
        all_words_vector_rdd = all_words_vector_rdd.distinct()
        return all_words_vector_rdd
        

all_books = glob.glob("Data/*.txt")
# Dictionary of all the books - {"Book_id" : "title"}
book_dict = {}
# This is the ID of the books
book_index = 1
# list of all the words after filtering
all_words_list = all_words_vector(all_books, True)
# here convert all the words to dataframe
temp_rdd = sc.parallelize(all_words_list)
row_rdd = temp_rdd.map(lambda x: Row(x))
all_words_df=sqlContext.createDataFrame(row_rdd,['words'])

'''
The main loop of reading all books, filtering the words and creating our big 2 tables (dataframes):
1. inverted_index table (dataframe)- for each distinct word in the corpus we save a list of all the documents it appears
2. all_words_df table (dataframe) - with 2 columns for each book:   a. book_i_tf tf value for all the words in the book 
                                                                    b. book_i_coutner coutner value for each word in the book
'''
for book in all_books:
    # Add the book to dictionary
    book_dict[book_index] = book[5:]
    
    # Read the book file (.txt) and split it
    raw_txt=sc.textFile(book).flatMap(lambda line: line.split(" "))
    
    # Convert to lowercase
    lower_txt = raw_txt.map(lambda word:word.lower() if type(word) == str else word)
    
    # Remove non-alphabetic chars
    regex = re.compile('[^a-z]')
    clean_lower_txt = lower_txt.map(lambda word:word if word.isalnum() else regex.sub('', word))
    
    # Convert the piplinedRDD to list to remove stop words
    Data = clean_lower_txt.collect()
    text = ([word for word in Data if word not in (stopwords.words('english'))])
    
    # Convert the list back to RDD
    rdd = sc.parallelize(text)
    rdd = rdd.filter(lambda word: word != '')
    
    # ----------------------------- Here i create the tf table --------------------------
    # I count words in each book
    bookwordCounts = rdd.map(lambda word: (word, book_index)).reduceByKey(lambda a,b:a + b)
    book_count_dict = bookwordCounts.collectAsMap() # Book as dictionary {"word":counter}
    
    tf_data = []
    tf_fields = [StructField("word", StringType(), False), StructField("book_{}_counter".format(book_index), IntegerType(), False), StructField("book_{}_tf".format(book_index), FloatType(), False)]
    tf_schema = StructType(tf_fields)
    
    # Create row for each word: Row looks like --> (word,counter,tf_value)
    # add if to all_words_df table
    for word in all_words_list:
        if word in book_count_dict:
            log = math.log10(book_count_dict[word]) + 1
            counter = book_count_dict[word]
            tf_row = (word,counter,log)
        else:
            tf_row = (word,0,0.0)
        tf_data.append(tf_row)
    
    tf_df = sqlContext.createDataFrame(tf_data, tf_schema)
    all_words_df = all_words_df.join(tf_df, all_words_df.words == tf_df.word)
    # free unnecesary table
    del(tf_df)
    # ----------------------------------- END ------------------------------
    
    # now I can remove duplicate words in the book
    rdd = rdd.distinct()
    
    # Convert the rdd to dataframe and add it to my final inverted index dataframe
#     df = rdd.map(lambda x: (x, book_index)).toDF()
#     result = result.union(df)
    book_index += 1
    
# inverted_table = result.groupBy(result.words).agg(collect_list('docID').alias('docID'))

In [13]:
# Show my inverted index table - row 1 - 15
inverted_table.show(15)

+---------------+--------------------+
|          words|               docID|
+---------------+--------------------+
|            296|                [19]|
|      ammonites|                 [4]|
|antisocialistic|                [16]|
|  apprehensions|     [15, 19, 7, 11]|
|      arguments|[18, 19, 21, 1, 3...|
|            art|[13, 14, 15, 16, ...|
|        attackd|             [19, 7]|
|        barrier|             [12, 9]|
|          besom|             [14, 1]|
|         biting|[15, 17, 18, 21, ...|
|         blairs|                [12]|
|         bleeve|                 [8]|
|        blossom|             [20, 1]|
|       bowsprit|                [15]|
|       brackets|                 [7]|
+---------------+--------------------+
only showing top 15 rows



In [2]:
# Here I seperate all_words_df table to tf table and counter table
all_words_df_tf = all_words_df.select("words","book_1_tf", "book_2_tf", "book_3_tf","book_4_tf", "book_5_tf", "book_6_tf","book_7_tf", "book_8_tf", "book_9_tf","book_10_tf","book_11_tf", "book_12_tf", "book_13_tf","book_14_tf", "book_15_tf", "book_16_tf","book_17_tf", "book_18_tf", "book_19_tf","book_20_tf","book_21_tf")
all_words_df_counter = all_words_df.select("words","book_1_counter", "book_2_counter", "book_3_counter", "book_4_counter", "book_5_counter", "book_6_counter", "book_7_counter", "book_8_counter", "book_9_counter", "book_10_counter","book_11_counter", "book_12_counter", "book_13_counter", "book_14_counter", "book_15_counter", "book_16_counter","book_17_counter", "book_18_counter", "book_19_counter", "book_20_counter", "book_21_counter")

In [None]:
# show the tf value of the first 9 books
all_words_df_tf.select("words","book_1_tf", "book_2_tf", "book_3_tf","book_4_tf", "book_5_tf", "book_6_tf","book_7_tf", "book_8_tf", "book_9_tf").show(10)

In [23]:
# ---------------------------------------------------------------------
# Create the idf table with counter
idf_list = []
N_documents = len(all_words_df_tf.columns) - 1
all_words_list = all_words_df.collect()

for row in all_words_list:
    r_dict = row.asDict()
    counter = 0
    for column_name, value in r_dict.items():
        if isinstance(value, int) and value > 0:
            counter += 1
    word = r_dict["words"]
    new_row = Row(words = word, counter = counter,idf=math.log10(N_documents / counter))
    idf_list.append(new_row)

idf_rdd = spark.sparkContext.parallelize(idf_list)
idf_df = idf_rdd.toDF()

In [17]:
# This is just to free memory - not mandatory
# del(idf_list)
# idf_rdd.unpersist()
# del(r_dict)
# all_words_df.unpersist()
# gc.collect()

0

In [5]:
# Show my final idf and counter table - rows 1 - 20 
idf_df.show(20)

+-------+-------------------+---------------+
|counter|                idf|          words|
+-------+-------------------+---------------+
|      1| 1.3222192947339193|            296|
|      1| 1.3222192947339193|      ammonites|
|      1| 1.3222192947339193|antisocialistic|
|      4| 0.7201593034059569|  apprehensions|
|     10| 0.3222192947339193|      arguments|
|     16|0.11809931207799448|            art|
|      2|  1.021189299069938|        attackd|
|      2|  1.021189299069938|        barrier|
|      2|  1.021189299069938|          besom|
|      7|0.47712125471966244|         biting|
|      1| 1.3222192947339193|         blairs|
|      1| 1.3222192947339193|         bleeve|
|      2|  1.021189299069938|        blossom|
|      1| 1.3222192947339193|       bowsprit|
|      1| 1.3222192947339193|       brackets|
|      1| 1.3222192947339193|         brands|
|      1| 1.3222192947339193|        buggies|
|      1| 1.3222192947339193|    captainover|
|      1| 1.3222192947339193|     

In [66]:
# ---------------------------------------------------------------------
# Create tf-idf dataframe
from pyspark.sql import functions as F
tf_columns  = all_words_df_tf.columns[1:]
words_idf_df = idf_df.select("words","idf")
tf_idf_df = words_idf_df.join(all_words_df_tf,"words")
tf_columns = all_words_df_tf.columns[1:]

for col in tf_columns:
    tf_idf_df = tf_idf_df.withColumn("tf_idf_{}".format(col[:-3]), F.column('idf') * F.column(col))
    

tf_idf_df = tf_join_idf.select("words","tf_idf_book_1","tf_idf_book_2","tf_idf_book_3","tf_idf_book_4","tf_idf_book_5","tf_idf_book_6","tf_idf_book_7","tf_idf_book_8","tf_idf_book_9","tf_idf_book_10","tf_idf_book_11","tf_idf_book_12","tf_idf_book_13","tf_idf_book_14","tf_idf_book_15","tf_idf_book_16","tf_idf_book_17","tf_idf_book_18","tf_idf_book_19","tf_idf_book_20","tf_idf_book_21")

In [70]:
# Show the tf-idf of books 1 - 5 - rows 1 - 10
tf_idf_df.select("words","tf_idf_book_1","tf_idf_book_2","tf_idf_book_3","tf_idf_book_4","tf_idf_book_5").show(10)

+---------------+-------------------+------------------+-------------------+------------------+-------------------+
|          words|      tf_idf_book_1|     tf_idf_book_2|      tf_idf_book_3|     tf_idf_book_4|      tf_idf_book_5|
+---------------+-------------------+------------------+-------------------+------------------+-------------------+
|            296|                0.0|               0.0|                0.0|               0.0|                0.0|
|      ammonites|                0.0|               0.0|                0.0|2.1182745909156515|                0.0|
|antisocialistic|                0.0|               0.0|                0.0|               0.0|                0.0|
|  apprehensions|                0.0|               0.0|                0.0|               0.0|                0.0|
|      arguments| 0.3222192947339193|               0.0|0.47595696223668355|0.6699522980531204| 0.5474409023538536|
|            art|  0.256994873048431|0.1892021792268405|                

In [18]:
# ----------------------------Deprecated ----------------------------------
# Create tf-idf dataframe 
# from pyspark.sql import functions as F
# from pyspark.sql import SQLContext


# tf_columns = all_words_df_tf.columns[1:]
# words_idf_df = idf_df.select("words","idf")
# tf_idf_df = sqlContext.createDataFrame(row_rdd,['words']) # we use here row_rdd

# for col in tf_columns:
#     tf_book_df = all_words_df_tf.select("words",col)
#     df_joined = words_idf_df.join(tf_book_df, "words")
#     df_joined = df_joined.withColumn("tf_idf_{}".format(col[:-3]), F.column('idf') * F.column(col))
#     df_joined = df_joined.select("words","tf_idf_{}".format(col[:-3]))
#     tf_idf_df = tf_idf_df.join(df_joined, "words")
# -----------------------------------------------------------------------

In [25]:
print(all_words_df_counter.count())

36156


In [71]:
# ------------------------------------------------------------------------------------
# cos-similarity between 2 books
import math
def cosine_similarity_between_books(v1,v2):
    '''
    cosine_similarity_between_books function:
    compute cosine similarity of v1 to v2: (v1 dot v2)/{||v1||*||v2||)
    input: 2 *not normalied* vectors of words counter per book
    output: cosine similarity between 2 books
    '''
    sumxx, sumxy, sumyy = 0, 0, 0
    for i in range(len(v1)):
        x = v1[i]; y = v2[i]
        sumxx += x*x
        sumyy += y*y
        sumxy += x*y
    return sumxy/math.sqrt(sumxx*sumyy)

In [78]:
# calculating for each pair of books their cosine similarity
from pyspark.ml.linalg import DenseVector

inner_book_index = 1
outter_book_index = 1

counter_df_columns = all_words_df_counter.columns[1:]
for col1 in counter_df_columns:
    vec1 = DenseVector(all_words_df_counter.select(col1).rdd.map(lambda x: x[0]).collect())
    for col2 in counter_df_columns:
        vec2 = DenseVector(all_words_df_counter.select(col2).rdd.map(lambda x: x[0]).collect())
        print("{} and {} is:\t{}".format(book_dict[outter_book_index][5:],book_dict[inner_book_index][5:],cosine_similarity_between_books(vec2,vec1)))
        inner_book_index += 1
    inner_book_index = 1
    outter_book_index += 1

Bushido the Soul of Japan.txt and Bushido the Soul of Japan.txt is:	1.0
Bushido the Soul of Japan.txt and The Wonderful Wizard of Oz.txt is:	0.4115537829773219
Bushido the Soul of Japan.txt and Alice's Adventures in Wonderland.txt is:	0.38045329994877014
Bushido the Soul of Japan.txt and Theologico Political Treatise.txt is:	0.5067302823295493
Bushido the Soul of Japan.txt and The Wit and Humor of America Volume IX.txt is:	0.5319905534246382
Bushido the Soul of Japan.txt and Political Ideals.txt is:	0.6116500938856396
Bushido the Soul of Japan.txt and The Autobiography of Benjamin Franklin.txt is:	0.6426534320854053
Bushido the Soul of Japan.txt and The Adventures of Tom Sawyer.txt is:	0.42607317170971704
Bushido the Soul of Japan.txt and Pride and Prejudice a play founded on Jane Austen's novel.txt is:	0.28870204860418863
Bushido the Soul of Japan.txt and Autobiography of Makataimeshekiakiak or Black Hawk.txt is:	0.5027530276886656
Bushido the Soul of Japan.txt and The Call of the Wil

The Wit and Humor of America Volume IX.txt and Alice's Adventures in Wonderland.txt is:	0.6427355826632657
The Wit and Humor of America Volume IX.txt and Theologico Political Treatise.txt is:	0.3468177695132518
The Wit and Humor of America Volume IX.txt and The Wit and Humor of America Volume IX.txt is:	1.0
The Wit and Humor of America Volume IX.txt and Political Ideals.txt is:	0.4241554484784585
The Wit and Humor of America Volume IX.txt and The Autobiography of Benjamin Franklin.txt is:	0.6180214461983131
The Wit and Humor of America Volume IX.txt and The Adventures of Tom Sawyer.txt is:	0.7154869499996183
The Wit and Humor of America Volume IX.txt and Pride and Prejudice a play founded on Jane Austen's novel.txt is:	0.43166224516130436
The Wit and Humor of America Volume IX.txt and Autobiography of Makataimeshekiakiak or Black Hawk.txt is:	0.5194877219653496
The Wit and Humor of America Volume IX.txt and The Call of the Wild.txt is:	0.46734421937293275
The Wit and Humor of America V

Pride and Prejudice a play founded on Jane Austen's novel.txt and Alice's Adventures in Wonderland.txt is:	0.2215437976705146
Pride and Prejudice a play founded on Jane Austen's novel.txt and Theologico Political Treatise.txt is:	0.1924107774520058
Pride and Prejudice a play founded on Jane Austen's novel.txt and The Wit and Humor of America Volume IX.txt is:	0.43166224516130436
Pride and Prejudice a play founded on Jane Austen's novel.txt and Political Ideals.txt is:	0.2531804866937474
Pride and Prejudice a play founded on Jane Austen's novel.txt and The Autobiography of Benjamin Franklin.txt is:	0.3313058611294326
Pride and Prejudice a play founded on Jane Austen's novel.txt and The Adventures of Tom Sawyer.txt is:	0.25794944034210077
Pride and Prejudice a play founded on Jane Austen's novel.txt and Pride and Prejudice a play founded on Jane Austen's novel.txt is:	1.0
Pride and Prejudice a play founded on Jane Austen's novel.txt and Autobiography of Makataimeshekiakiak or Black Hawk.

The Man in the Brown Suit.txt and The Writings of Thomas Paine Volume 4 1794 to 1796 The Age of Reason.txt is:	0.5217437645655736
The Man in the Brown Suit.txt and Autobiography of Benjamin Franklin.txt is:	0.6000597138647783
The Man in the Brown Suit.txt and Up from Slavery An Autobiography.txt is:	0.6070748781724965
The Man in the Brown Suit.txt and Beautiful Joe An Autobiography.txt is:	0.7325604289634402
The Prince.txt and Bushido the Soul of Japan.txt is:	0.5753385985162813
The Prince.txt and The Wonderful Wizard of Oz.txt is:	0.38155903028990656
The Prince.txt and Alice's Adventures in Wonderland.txt is:	0.3364298094865816
The Prince.txt and Theologico Political Treatise.txt is:	0.45374302486871865
The Prince.txt and The Wit and Humor of America Volume IX.txt is:	0.4672968723963406
The Prince.txt and Political Ideals.txt is:	0.6141454256943906
The Prince.txt and The Autobiography of Benjamin Franklin.txt is:	0.6312110121011666
The Prince.txt and The Adventures of Tom Sawyer.txt i

Heart of Darkness.txt and Political Ideals.txt is:	0.504559802904877
Heart of Darkness.txt and The Autobiography of Benjamin Franklin.txt is:	0.656471253693405
Heart of Darkness.txt and The Adventures of Tom Sawyer.txt is:	0.6476562004768615
Heart of Darkness.txt and Pride and Prejudice a play founded on Jane Austen's novel.txt is:	0.33947212995788867
Heart of Darkness.txt and Autobiography of Makataimeshekiakiak or Black Hawk.txt is:	0.5870821820612089
Heart of Darkness.txt and The Call of the Wild.txt is:	0.5912087857521768
Heart of Darkness.txt and The Man in the Brown Suit.txt is:	0.7530038563738879
Heart of Darkness.txt and The Prince.txt is:	0.519848373936324
Heart of Darkness.txt and The Secret Garden.txt is:	0.6382597688669339
Heart of Darkness.txt and Treasure Island.txt is:	0.7685012032602614
Heart of Darkness.txt and Readings on Fascism and National Socialism.txt is:	0.3443863872776408
Heart of Darkness.txt and Heart of Darkness.txt is:	1.0
Heart of Darkness.txt and The Writ

Up from Slavery An Autobiography.txt and Up from Slavery An Autobiography.txt is:	1.0
Up from Slavery An Autobiography.txt and Beautiful Joe An Autobiography.txt is:	0.5876314043262643
Beautiful Joe An Autobiography.txt and Bushido the Soul of Japan.txt is:	0.44814226287980274
Beautiful Joe An Autobiography.txt and The Wonderful Wizard of Oz.txt is:	0.5847227631799059
Beautiful Joe An Autobiography.txt and Alice's Adventures in Wonderland.txt is:	0.6117598680478598
Beautiful Joe An Autobiography.txt and Theologico Political Treatise.txt is:	0.2921350636830157
Beautiful Joe An Autobiography.txt and The Wit and Humor of America Volume IX.txt is:	0.7669732708700917
Beautiful Joe An Autobiography.txt and Political Ideals.txt is:	0.34810230569358913
Beautiful Joe An Autobiography.txt and The Autobiography of Benjamin Franklin.txt is:	0.5980921117330686
Beautiful Joe An Autobiography.txt and The Adventures of Tom Sawyer.txt is:	0.644742395771293
Beautiful Joe An Autobiography.txt and Pride a

In [9]:
# ---------------------------------------------------------------------
# cos-similarity with query
from numpy import linalg
import numpy as np
from pyspark.ml.linalg import DenseVector
from sklearn import preprocessing
import re, string 

def normalize(v):
    '''
    normalize function:
    normalize vector v
    '''
    norm = np.linalg.norm(v, ord=2)
    if norm==0:
        norm=np.finfo(v.dtype).eps
    return v/norm

# get a query string and cleans it
query = input()

# Convert to lowercase
query = query.lower()
pattern = re.compile('[\W_]+')
query = pattern.sub(' ', query)

# Convert to list
query_list = query.split(' ')
query_list = list(set(query_list))

# Remove stop words
clean_query_list = []
text = ([word for word in query_list if word not in (stopwords.words('english'))])
clean_query_list += text

# Create the query idf dataframe
for i in range(len(book_dict)):
    query_idf_vec = []
    cur_col = "book_{}_counter".format(i + 1)
    book_counter_lst = all_words_df_counter.selectExpr("words as words","{} as counter".format(cur_col)).collect()
    for row in book_counter_lst:
        if row.words in clean_query_list and row.counter > 0:
            query_idf_vec.append(math.log10(row.counter) + 1)
        else:
            query_idf_vec.append(0)
    
    cur_col = "book_{}_counter".format(i + 1)
    book_counter_vec = all_words_df_counter.select(cur_col).rdd.map(lambda x: x[0]).collect()
    book_counter_vec = normalize(book_counter_vec)
    book_query_score = np.dot(book_counter_vec, query_idf_vec)
    print("{} the score is: {}".format(book_dict[i+1], book_query_score))

# --------------------------------------  Deprecated  -----------------------------------------
# query_idf_vec = []
# for row in idf_list:
#     if row.words in clean_query_list:
#         query_idf_vec.append(row.idf)
#     else:
#         query_idf_vec.append(0)

# for i in range(len(book_dict)):
#     cur_col = "book_{}_counter".format(i + 1)
#     # Convert the book tf values to list (i call it counter) 
# #     book_counter_vec = all_words_df_counter.select(cur_col).rdd.map(lambda x: x[0]).collect()
#     # normalize the book vector
#     book_counter_vec = normalize(book_counter_vec)
#     # calculating according to Standfort 
#     book_query_score = np.dot(book_counter_vec, query_idf_vec)
#     print("{} the score is: {}".format(book_dict[i+1][5:], book_query_score))
# ------------------------------------------------------------------------------------------------

black people history
Bushido the Soul of Japan.txt the score is: 0.37448244659833363
The Wonderful Wizard of Oz.txt the score is: 0.21037983625219814
Alice's Adventures in Wonderland.txt the score is: 0.07299548022863267
Theologico Political Treatise.txt the score is: 0.2403082441228443
The Wit and Humor of America Volume IX.txt the score is: 0.28806468135452734
Political Ideals.txt the score is: 0.11288890757529418
The Autobiography of Benjamin Franklin.txt the score is: 0.3301594601629955
The Adventures of Tom Sawyer.txt the score is: 0.13201759830517026
Pride and Prejudice a play founded on Jane Austen's novel.txt the score is: 0.06422677566941346
Autobiography of Makataimeshekiakiak or Black Hawk.txt the score is: 1.2412319176444084
The Call of the Wild.txt the score is: 0.04598742570010646
The Man in the Brown Suit.txt the score is: 0.2575032103632606
The Prince.txt the score is: 0.5935680728432118
The Secret Garden.txt the score is: 0.26165520570827316
Treasure Island.txt the sco

In [None]:
book_dict

In [56]:
# ---------------------------------------------------------------------
# K-means
import numpy as np  
import sys 
import time
from numpy.linalg import norm 
from matplotlib import pyplot as plt
from random import gauss, sample

K = 4

def normalize(v):
    '''
    normalize function:
    normalize vector v
    '''
    norm = np.linalg.norm(v, ord=1)
    if norm==0:
        norm=np.finfo(v.dtype).eps
    return v/norm

def read_books_counters():
    '''
    read the columns for the table I createrd earlier
    output: {"name of the book" : vector represent the book (np.array)}
    '''
    book_to_vect = {}
    for i in range(len(book_dict)):
            cur_col = "book_{}_counter".format(i + 1)
            book_counter_vec = all_words_df_counter.select(cur_col).rdd.map(lambda x: x[0]).collect()
#             book_counter_vec = normalize(book_counter_vec)
            book_to_vect[book_dict[i+1]] = book_counter_vec
    return book_to_vect

# -----------------  Deprecated  ------------------
def make_rand_vec(dims):
    vec = [gauss(0,1) for i in range(dims)]
    mag = sum(x**2 for x in vec) ** .5
    return np.array([x/mag for x in vec])
#--------------------------------------------------

def calc_distance(vec1, vec2):
    '''
    calculate the distance between vec1 and vec2
    '''
    return(np.linalg.norm(vec1 - vec2))

def init_centers(K, book_to_vector_dict):
    '''
    random choose K books to be the init centers
    input:  K - number of clusters
            book_to_vector_dict  -> {"name of the book" : vector represent the book (np.array)}
    output: centers_dict -> {"clusterID" : vector represent the center}
    '''
    centers_dict = {}
    index = 1
    random_book_numbers = sample(range(1, len(book_dict)), K)
    for i in random_book_numbers:
        centers_dict[index] = book_to_vector_dict[book_dict[i]]
        index += 1
    return centers_dict

def calc_nearest_center(centers_dict, book_to_vector_dict):
    '''
    for each book calculate the nearest center
    input:  centers_dict -> {"clusterID" : vector represent the center}
            book_to_vector_dict  -> {"name of the book" : vector represent the book (np.array)}
    output: {"clusterID" : list of books name}
    '''
    cluster_to_book_dict = {}
    for i in range(K):
        cluster_to_book_dict[i+1] = []

    for book_name, book_vector in book_to_vector_dict.items():
        min_distance = np.inf
        for cluster_num, center_vec in centers_dict.items():
            distance = calc_distance(center_vec, book_vector)
            if (distance < min_distance):
                min_distance = distance
                nearest_cluster_num = cluster_num
        cluster_to_book_dict[nearest_cluster_num].append(book_name)
            
    return cluster_to_book_dict

def calc_new_center(cluster_to_book_dict, book_to_vector_dict):
    '''
    calculate the new centers
    input:  cluster_to_book_dict -> {"clusterID" : "name of the book"}
            book_to_vector_dict  -> {"name of the book" : vector represent the book (np.array)}
    output: new center dictionary -> {"clusterID" : vector represent the center}
    '''
    new_center_dict = {}
    for cluster, books_name_list in cluster_to_book_dict.items():
        new_center = np.zeros(36156)
        for book_name in books_name_list:
            book_vec = book_to_vector_dict[book_name]
            new_center += book_vec
            
        new_center = new_center / len(books_name_list)
#         new_center_dict[cluster] = normalize(new_center)
        new_center_dict[cluster] = new_center
    return new_center_dict


# book_to_vector_dict = read_books_counters()
centers_dict = init_centers(4, books_to_vector_dict)
loop = 500
for i in range(loop):    
    cluster_to_book_dict = calc_nearest_center(centers_dict, book_to_vector_dict)
    
    # Before we update the centers
#     if i == 1 or i == 999:
#         print(centers_dict,"\n\n")
        
    centers_dict = calc_new_center(cluster_to_book_dict, book_to_vector_dict)
    
    # After we update the centers
#     if i == 1 or i == 999:
#         print(centers_dict)
    
    if i == 1 or i == 499:
        print(cluster_to_book_dict,"\n\n")



{1: ['The Autobiography of Benjamin Franklin.txt', 'The Prince.txt', 'Readings on Fascism and National Socialism.txt', 'The Writings of Thomas Paine Volume 4 1794 to 1796 The Age of Reason.txt', 'Autobiography of Benjamin Franklin.txt', 'Up from Slavery An Autobiography.txt'], 2: ['Autobiography of Makataimeshekiakiak or Black Hawk.txt'], 3: ['Bushido the Soul of Japan.txt', 'The Wonderful Wizard of Oz.txt', "Alice's Adventures in Wonderland.txt", 'Theologico Political Treatise.txt', 'The Wit and Humor of America Volume IX.txt', 'Political Ideals.txt', "Pride and Prejudice a play founded on Jane Austen's novel.txt", 'The Call of the Wild.txt', 'The Man in the Brown Suit.txt', 'The Secret Garden.txt', 'Heart of Darkness.txt'], 4: ['The Adventures of Tom Sawyer.txt', 'Treasure Island.txt', 'Beautiful Joe An Autobiography.txt']} 


{1: ['The Writings of Thomas Paine Volume 4 1794 to 1796 The Age of Reason.txt', 'Autobiography of Benjamin Franklin.txt', 'Up from Slavery An Autobiography.tx

In [43]:
vec1 = np.array([1,1])
vec2 = np.array([1,2])

calc_distance(normalize(vec1), normalize(vec2))

0.3203644860139344

In [28]:
cluster_to_book_dict

{1: [],
 2: [],
 3: ['Bushido the Soul of Japan.txt',
  'The Wonderful Wizard of Oz.txt',
  "Alice's Adventures in Wonderland.txt",
  'Theologico Political Treatise.txt',
  'The Wit and Humor of America Volume IX.txt',
  'Political Ideals.txt',
  'The Autobiography of Benjamin Franklin.txt',
  'The Adventures of Tom Sawyer.txt',
  "Pride and Prejudice a play founded on Jane Austen's novel.txt",
  'Autobiography of Makataimeshekiakiak or Black Hawk.txt',
  'The Call of the Wild.txt',
  'The Man in the Brown Suit.txt',
  'The Prince.txt',
  'The Secret Garden.txt',
  'Treasure Island.txt',
  'Readings on Fascism and National Socialism.txt',
  'Heart of Darkness.txt',
  'The Writings of Thomas Paine Volume 4 1794 to 1796 The Age of Reason.txt',
  'Autobiography of Benjamin Franklin.txt',
  'Up from Slavery An Autobiography.txt',
  'Beautiful Joe An Autobiography.txt'],
 4: []}

In [29]:
books_to_dict

{'Bushido the Soul of Japan.txt': array([0., 0., 0., ..., 0., 0., 0.]),
 'The Wonderful Wizard of Oz.txt': array([0., 0., 0., ..., 0., 0., 0.]),
 "Alice's Adventures in Wonderland.txt": array([0., 0., 0., ..., 0., 0., 0.]),
 'Theologico Political Treatise.txt': array([0.        , 0.00146048, 0.        , ..., 0.        , 0.        ,
        0.00146048]),
 'The Wit and Humor of America Volume IX.txt': array([0.        , 0.        , 0.        , ..., 0.        , 0.00136977,
        0.        ]),
 'Political Ideals.txt': array([0., 0., 0., ..., 0., 0., 0.]),
 'The Autobiography of Benjamin Franklin.txt': array([0., 0., 0., ..., 0., 0., 0.]),
 'The Adventures of Tom Sawyer.txt': array([0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.00142404]),
 "Pride and Prejudice a play founded on Jane Austen's novel.txt": array([0., 0., 0., ..., 0., 0., 0.]),
 'Autobiography of Makataimeshekiakiak or Black Hawk.txt': array([0., 0., 0., ..., 0., 0., 0.]),
 'The Call of the Wild.

In [24]:
len(all_words_list)

36156

In [33]:
centers_dict[3]

array([-0.01046357,  0.00044255, -0.00136047, ..., -0.00076656,
       -0.00493744, -0.01303576])

In [128]:
books_rdd.count()

10

In [129]:
local_data = (books_rdd.map(lambda x: x[1]).collect())

In [130]:
local_data

[array([0, 0, 0, ..., 0, 0, 0]),
 array([0, 0, 0, ..., 0, 0, 0]),
 array([0, 3, 0, ..., 0, 0, 0]),
 array([0, 4, 0, ..., 0, 4, 8]),
 array([0, 0, 0, ..., 0, 0, 0]),
 array([0, 0, 6, ..., 0, 0, 0]),
 array([ 0,  0,  0, ...,  0,  0, 14]),
 array([0, 0, 0, ..., 8, 0, 0]),
 array([ 9,  0, 18, ...,  0,  0,  9]),
 array([ 0,  0,  0, ...,  0,  0, 20])]