In [1]:
import pyspark
import os

my_dir = "./BooksDataset"

# False if you don't have the rdd saved, True otherwise
readFromSavedFile = False

#List of Authors
authors = ['Arthur Conan Doyle', 'Charles Dickens', 'Daniel Defoe', 'Edith Wharton', 'Jane Austen', 'Joseph Conrad', 'Lewis Carroll', 'Louisa May Alcott', 'Voltaire', 'William Shakespeare']

#Number of Authors
n_authors = len(authors)


In [2]:
sc = pyspark.SparkContext(appName="BigDataProject")

sc

#sc.stop()

In [5]:
#Save/read all books' name of every author

if not readFromSavedFile:
    for i in range(n_authors):
        book = []
        mydiraut = my_dir+"/"+authors[i]
        print(f"Getting books' name for author: {authors[i]}")
        
        for file in os.listdir(mydiraut):
            d = os.path.join(mydiraut, file)
            book.append(file)
            
        print(f"Start - Saving RDD of all books' name of author: {authors[i]}")
        sc.parallelize(book).saveAsTextFile("./RDD/BooksName/"+authors[i])
        print(f"End - Saving RDD of all books' name of author: {authors[i]}\n")
    

print("Reading all books' name of every author from saved RDD")
books = []

for i in range(n_authors):
    books.append(sc.textFile("./RDD/BooksName/"+authors[i]+"/*").collect()) 
    
print(books)



Getting books' name for author: Arthur Conan Doyle
Start - Saving RDD of all books' name of author: Arthur Conan Doyle
End - Saving RDD of all books' name of author: Arthur Conan Doyle

Getting books' name for author: Charles Dickens
Start - Saving RDD of all books' name of author: Charles Dickens
End - Saving RDD of all books' name of author: Charles Dickens

Getting books' name for author: Daniel Defoe
Start - Saving RDD of all books' name of author: Daniel Defoe
End - Saving RDD of all books' name of author: Daniel Defoe

Getting books' name for author: Edith Wharton
Start - Saving RDD of all books' name of author: Edith Wharton
End - Saving RDD of all books' name of author: Edith Wharton

Getting books' name for author: Jane Austen
Start - Saving RDD of all books' name of author: Jane Austen
End - Saving RDD of all books' name of author: Jane Austen

Getting books' name for author: Joseph Conrad
Start - Saving RDD of all books' name of author: Joseph Conrad
End - Saving RDD of all 

In [14]:
#Function to clear the text given in input

def clearText(file_path):
    return sc.textFile(file_path).filter(bool)  \
                .map(lambda w: w.replace("."," . ")) \
                .map(lambda w: w.replace(","," , ")) \
                .map(lambda w: w.replace(";"," ; ")) \
                .map(lambda w: w.replace("!"," ! ")) \
                .map(lambda w: w.replace("?"," ? ")) \
                .map(lambda w: w.replace('"',' " ')) \
                .map(lambda w: w.replace("-"," - ")) \
                .map(lambda w: w.replace("_"," _ ")) \
                .map(lambda w: w.replace("{"," { ")) \
                .map(lambda w: w.replace("}"," } ")) \
                .map(lambda w: w.replace("["," [ ")) \
                .map(lambda w: w.replace("]"," ] ")) \
                .map(lambda w: w.replace("("," ( ")) \
                .map(lambda w: w.replace(")"," ) "))
    

In [11]:
#WordCount returning a list of words
#.filter(lambda w: w not in common_words) \

def wordCount(file_path):
    words = clearText(file_path) \
                .flatMap(lambda line: line.split(" ")) \
                .map(lambda w: w.lower()) \
                .map(lambda w: (w, 1)) \
                .reduceByKey(lambda v1, v2: v1 + v2) \
                .map(lambda x: (x[1], x[0])) \
                .sortByKey(False)
    
    result = words.collect()

    # Calculate total words of the book given in input
    total_words = 0

    for t in result:
        total_words += t[0]
    
    # Add the total word to the list of words
    result.append((total_words, 'total_words'))
    
    return result

In [15]:
#Saving RDD all words for an authors

if not readFromSavedFile:

    for i in range(1): #n_authors
        # Obtain the path of author directory
        tmp_filename = os.path.join(my_dir, authors[i])
        print(f"Word count of {authors[i]}'s books.")
        
        for j in range(2): #len(books[i])
            # Obtain the path of books given the path of an author
            input_filename = os.path.join(tmp_filename, books[i][j])
            
            print(f"Start - Counting words of book: {books[i][j]}")
            wordsOfABook = wordCount(input_filename)
            print(f"End - Counting words of book: {books[i][j]}\n")
            
            print(f"Start - Saving words count of book: {books[i][j]}")
            sc.parallelize(wordsOfABook).saveAsTextFile("./RDD/Books/"+authors[i]+"/SingleBooks/"+books[i][j])
            print(f"End - Saving words count of book: {books[i][j]}\n")
            
        


Word count of Arthur Conan Doyle's books.
Start - Counting words of book: Arthur Conan Doyle_Beyond the City.txt
End - Counting words of book: Arthur Conan Doyle_Beyond the City.txt

Start - Saving words count of book: Arthur Conan Doyle_Beyond the City.txt
End - Saving words count of book: Arthur Conan Doyle_Beyond the City.txt

Start - Counting words of book: Arthur Conan Doyle_The Adventure of the Cardboard Box.txt
End - Counting words of book: Arthur Conan Doyle_The Adventure of the Cardboard Box.txt

Start - Saving words count of book: Arthur Conan Doyle_The Adventure of the Cardboard Box.txt
End - Saving words count of book: Arthur Conan Doyle_The Adventure of the Cardboard Box.txt



In [45]:
#Given a list of string convert it to a list of tuple
def stringToTuple(wordCount):
    result = []
    for s in wordCount:
        #Remove useless characters in the string
        res = s.strip("(").strip(")").split(", ")   
        tmp = len(res[1])
        #Take only the string inside the '' 
        result.append((int(res[0]), str(res[1][1:tmp-1])))
        
    return result

In [46]:
#Read the rdd of all books of all authors
wordsCountAuthors = [[] for _ in range(n_authors)]

for i in range(1):
    print(f"Reading books for: {authors[i]}")
    for j in range(2):
        # Getting path of book's RDD
        tmp = "./RDD/Books/"+authors[i]+"/SingleBooks/"+books[i][j]+"/*"

        print(f"Start - Reading words count of book: {books[i][j]}")
        result = sc.textFile(tmp).collect()
        # Converting list of string into list of tuple
        result = stringToTuple(result)
        print(f"End - Reading words count of book: {books[i][j]}")

        wordsCountAuthors[i].append(result)
        

Reading books for: Arthur Conan Doyle
[(1753, ''), (595, ','), (465, '.'), (391, 'the'), (304, '"'), (254, 'and'), (250, 'i'), (231, 'a'), (216, 'to'), (208, 'of'), (154, 'that'), (142, 'was'), (140, 'in'), (112, 'it'), (110, 'you'), (101, 'had'), (96, 'my'), (95, 'she'), (93, 'have'), (93, 'he'), (88, 'for'), (86, 'as'), (84, 'at'), (78, 'is'), (72, '-'), (71, 'her'), (70, 'with'), (69, 'but'), (64, 'we'), (63, 'which'), (63, 'me'), (54, 'his'), (46, 'been'), (45, 'on'), (44, 'this'), (44, 'be'), (42, '?'), (40, 'your'), (38, 'would'), (38, 'from'), (37, 'when'), (36, 'were'), (35, 'there'), (35, 'so'), (34, 'not'), (34, 'one'), (33, 'then'), (32, 'said'), (32, 'by'), (29, 'upon'), (29, 'very'), (29, 'him'), (28, 'no'), (28, 'miss'), (27, 'holmes'), (27, 'are'), (27, 'cushing'), (27, 'what'), (27, 'if'), (27, 'all'), (25, 'sarah'), (25, 'our'), (24, 'an'), (24, 'them'), (24, 'up'), (24, 'could'), (23, 'or'), (23, 'lestrade'), (23, 'see'), (22, 'out'), (22, 'has'), (22, 'day'), (21, '!

# Method without score NO ML

In [None]:
#To find same words in top100 words of all authors
same_words = []

#take first author to compare the same words of the other authors 
test = wordsCountAuthors[0][0]


#Compare same words of the first author to others
for _, word in test:
    count = 0
    for i in range(1,10):
        for _, word2 in wordsCountAuthors[i]:
            if word == word2:
                count += 1
                continue
    if count == 9:
        same_words.append(word)


print(same_words)

In [None]:
# Words in common in top100 words of all authors
common_words = ['the', ',', '.', 'of', 'and', '', 'to', 'a', 'i', 'in', 'was', 'that', 'it', 'he', 'had', 'you', 'his', 'with', 'is', 'my', 'which', 'as', 'for', 'at', 'have', 'were', 'but', 'be', 'not', 'me', 'from', 'this', 'by', 'on', 'there', 'her', 'they', 'so', 'we', 'one', 'all', 'him', 'their', 'would', 'an', 'no', '?', 'when', 'them', 'what', 'if', 'or', 'do', 'more']

In [None]:
#rdd.take(1)[-1]

In [48]:
training_index = [0, 1, 2, 3, 4, 5, 6]
testing_index = [7, 8, 9]

#n_authors
if not(readFromSavedFile):
    for a in range(1):
        empty_rdd = sc.emptyRDD()
        #Take only the books in training_index
        for b in range(2): #training_index:
            empty_rdd = empty_rdd.union(sc.parallelize(wordsCountAuthors[a][b]))
        
        #Word Count of all books in training_index of the author
        print(f"Word Count of all books in training_index: {authors[a]}")
        result = empty_rdd.map(lambda x: (x[1], x[0])).reduceByKey(lambda v1, v2: v1 + v2) \
                .map(lambda x: (x[1], x[0])).sortByKey(False)
        
        print(result)

        total_words = result.take(1)[0]

        #Opz 1 without total words
        #result = result[1:-1]
        #result = result.map(lambda n, w: (n/total_words, w))
        result = result.map(lambda x: (x[0]/total_words, x[1]))


        #Opz 2 with total_words 
        #result = result.map(lambda n, w: (n/total_words, w) if w != "total_words" else (n, w))
        
        
        print(type(result))
        print(result)
        
        # Only save top 100 words 
        result = sc.parallelize(result.take(200))
        result.saveAsTextFile(f"./RDD/Books/{authors[a]}/TrainingResult")    
else:
    
    
    AllWordsCountAuthors = []

    for i in range(n_authors):
        print(authors[i])
        tmp = f"./RDD/Books/{authors[i]}/TrainingResult/*"   
        print(tmp)
        result = sc.textFile(tmp).collect()
        AllWordsCountAuthors.append(result)



Word Count of all books in training_index: Arthur Conan Doyle


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\giaco\anaconda3\envs\BigData\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\giaco\anaconda3\envs\BigData\lib\site-packages\py4j\clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\giaco\anaconda3\envs\BigData\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [11]:
AllWordCountByAuthors_RDD = [[] for _ in range(n_authors)]
for i in range(n_authors):

    for w in AllWordsCountAuthors[i]:
        print(w)
        cleaned_string = w.replace('[', '').replace(']', '').replace('(', '').replace(')', '').replace("'", '')

        word_list = cleaned_string.split(', ')

        
        for k in range(0, len(word_list), 2):
            tuple_list = (int(word_list[k]), word_list[k + 1]) 

      
        
        AllWordCountByAuthors_RDD[i].append(tuple_list)

(22086, 'the')
(21939, ',')
(16285, '.')
(10278, 'of')
(9630, 'and')
(9243, '')
(7811, 'to')
(7490, 'a')
(6378, 'i')
(5397, 'in')
(5232, 'was')
(4601, 'that')
(4417, 'it')
(3201, 'he')
(3003, 'had')
(2952, 'you')
(2764, 'his')
(2614, 'with')
(2517, '-')
(2496, 'is')
(2495, 'my')
(2410, 'which')
(2364, 'as')
(2323, 'for')
(2211, 'at')
(2107, 'have')
(2014, 'were')
(1919, 'but')
(1767, '"')
(1738, 'be')
(1676, 'not')
(1669, 'me')
(1634, 'from')
(1624, 'upon')
(1525, 'this')
(1491, 'by')
(1469, '.”')
(1419, 'on')
(1286, 'there')
(1247, 'her')
(1220, 'they')
(1216, 'been')
(1213, 'so')
(1204, 'we')
(1156, 'one')
(1154, 'all')
(1128, 'him')
(1110, 'their')
(1073, 'said')
(1012, 'would')
(1003, 'she')
(989, 'an')
(949, 'no')
(937, 'could')
(894, '"s"')
(889, 'who')
(887, 'up')
(886, 'very')
(844, 'out')
(834, '?')
(791, 'when')
(784, 'them')
(759, 'are')
(757, 'will')
(756, 'what')
(735, 'your')
(733, '!')
(729, 'some')
(728, ',”')
(717, 'into')
(707, 'man')
(704, 'then')
(668, 'if')
(640, '

In [40]:

#Change to select the rigth author
test_author = 0

"""
tmp_filename = os.path.join("./RDD/Books", authors[test_author])
input_filename = os.path.join(tmp_filename, books[test_author][testing_index[0]])
input_filename = input_filename
lista2 = sc.textFile(input_filename).collect()
"""

lista2 = wordCountByAuthors[test_author][testing_index[0]].take(100)
print(lista2)

def count_common_words(lista1, lista2):
    count_words = 0

    for parola in lista1:
        #print(parola[1])
        if parola in lista2:
            count_words += 1

    return count_words


#print(count_common_words(AllWordCountByAuthors_RDD[0], lista2))

max_count = -1
author_index = []

for a in range(n_authors):
    c = []
    for p in AllWordCountByAuthors_RDD[a]:
        c.append(p[1])

    if count_common_words(c, lista2) == max_count:
        author_index.append(a)

    if count_common_words(c, lista2) > max_count:
        author_index = []
        author_index.append(a)
        max_count = count_common_words(c, lista2) 




for k in author_index:
    print(authors[k])

[(5449, ','), (5205, 'the'), (3629, '.'), (2644, 'and'), (2621, 'of'), (2103, 'a'), (2073, 'to'), (1861, 'i'), (1340, 'in'), (1293, 'was'), (1105, 'he'), (1073, 'that'), (1049, 'his'), (1033, 'it'), (819, 'as'), (803, 'my'), (720, 'had'), (702, 'with'), (651, 'which'), (597, 'for'), (570, 'at'), (553, 'is'), (516, 'me'), (491, 'you'), (481, 'have'), (468, 'upon'), (427, 'from'), (422, 'him'), (404, 'but'), (401, 'be'), (400, 'her'), (388, 'on'), (377, 'by'), (371, 'this'), (354, 'one'), (349, ''), (347, 'there'), (339, ',”'), (336, '-'), (324, 'were'), (309, '.”'), (296, 'not'), (285, 'all'), (281, 'an'), (273, 'so'), (269, 'out'), (264, 'been'), (263, 'when'), (262, 'said'), (256, 'we'), (255, 'she'), (254, 'up'), (252, 'some'), (244, 'man'), (235, 'into'), (235, 'who'), (230, 'would'), (217, 'their'), (216, 'they'), (215, 'no'), (210, 'or'), (204, 'could'), (192, 'down'), (187, 'are'), (180, 'what'), (178, 'over'), (176, ';'), (176, 'more'), (172, 'them'), (168, 'then'), (163, 'very'

['the', ',', '.', 'of', 'and', '', 'to', 'a', 'i', 'in', 'was', 'that', 'it', 'he', 'had', 'you', 'his', 'with', 'is', 'my', 'which', 'as', 'for', 'at', 'have', 'were', 'but', 'be', 'not', 'me', 'from', 'this', 'by', 'on', 'there', 'her', 'they', 'so', 'we', 'one', 'all', 'him', 'their', 'would', 'an', 'no', '?', 'when', 'them', 'what', 'if', 'or', 'do', 'more']


# Celle di Test

In [8]:
n_books = []

for i in range(len(books)):
    data_str = str(books[i])
    data_str = data_str.replace("[", "").replace("]", "").replace("'", "")
    data_list = data_str.split(", ")
    n_books.append(len(data_list))

print(n_books)


[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]


In [45]:
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

In [61]:
#Creo il dataframe
training_data = []

for i in range(n_authors):
    training_data.append((authors[i], AllWordCountByAuthors_RDD[i]))


df = pd.DataFrame(training_data)
df.columns = ["Author", "Top100Words"]
print(df)




                Author                                        Top100Words
0   Arthur Conan Doyle  [(28688, the), (13417, of), (12559, and), (101...
1      Charles Dickens  [(69026, the), (50940, and), (39832, to), (379...
2         Daniel Defoe  [(24009, the), (16405, and), (14316, to), (134...
3        Edith Wharton  [(29289, the), (18053, ), (16213, of), (14380,...
4          Jane Austen  [(42721, ), (32293, the), (29439, to), (26255,...
5        Joseph Conrad  [(38777, the), (24566, of), (17697, a), (17400...
6        Lewis Carroll  [(39778, ), (11110, the), (5736, and), (5106, ...
7    Louisa May Alcott  [(28537, the), (24400, and), (16083, to), (145...
8             Voltaire  [(26520, the), (16079, ), (16067, of), (11568,...
9  William Shakespeare  [(46471, ), (7975, the), (7023, and), (5525, i...


In [62]:
text_to_predict = [(3, "Ciao"), (4,"hello"), (2,"first"), (5,"british")]

In [11]:
#[Authors][#Books]
#wordCountByAuthors[0][0]

empty_rdd = sc.emptyRDD()

for i in range(10):
    empty_rdd = empty_rdd.union(wordCountByAuthors[0][i])

#print(empty_rdd.collect())




In [12]:
#tmp = empty_rdd.flatMap().map(lambda x: (x[1], x[0])).reduceByKey(lambda v1, v2: v1 + v2) \
#                .map(lambda x: (x[1], x[0])) \
                

tmp = empty_rdd.map(lambda x: (x[1], x[0])).reduceByKey(lambda x,y: x+y)
print(tmp.collect())



In [14]:
tmp2 = tmp.map(lambda x:(x[1], x[0]))

print(tmp2.collect())



In [15]:
tmp3 = tmp2.sortByKey(False)
print(tmp3)

PythonRDD[170] at RDD at PythonRDD.scala:53


In [16]:
print(tmp3.collect())



In [17]:
tmp1000 = empty_rdd.map(lambda x: (x[1], x[0])).reduceByKey(lambda v1, v2: v1 + v2) \
               .map(lambda x: (x[1], x[0])).sortByKey(False)
print(tmp1000.collect())



In [91]:
sc.stop()