In [None]:
#LOAD DATA AND FUNCTIONS

from pyspark.sql.functions import rand, col
from pyspark.sql.functions import lower
from pyspark.sql.types import IntegerType, StringType, MapType
from pyspark.sql.functions import lit
from pyspark.sql.functions import row_number, lit, dense_rank, concat_ws
from pyspark.sql import functions as F, Window
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import liwc

#300 RATES PER BOOK
#80 WORDS IN CONSIDERED COMMENTS

#############################################
#            read data                      # 
#############################################
print("Read Anobii data")

# Read table which contains for each item_id (id of each book), the id of the author (author_id) who wrote that book
file = "/data/SMARTDATA/books/anobii_2021/sql/author_item.csv"
DFauthorbooks = spark.read.csv(file,header=True) #MOSTRA I LIBRI RELATIVI AGLI AUTORI

# Read table which contains for each author_id (id of each auhtor), the info of that author
file = "/data/SMARTDATA/books/anobii_2021/sql/author_display.csv"
DFdisplay = spark.read.csv(file,header=True) #MOSTRA GLI AUTORI DEL DATABASE

# Read table which contains the mapping of the language (language=11 means italian)
file = "/data/SMARTDATA/books/anobii_2021/sql/language_mapping.csv"
DFlanguages = spark.read.csv(file,header=True)

# Read table which contains the info about items, i.e. books
file = "/data/SMARTDATA/books/anobii_2021/sql/item.csv"
DFitems_anobii = spark.read.csv(file,header=True) 
# DFitems_anobii.show(1, False)

# Read table which contains the rate given by a person to a book
file = "/data/SMARTDATA/books/anobii_2021/sql/link_person_item.csv" #item_review ha il voto dato dall'utente a quel libro
DFinfo = spark.read.csv(file,header=True)
# DFinfo.show(1, False)

# Read table which contains the genre of each book
file = "anobii_genres/new_genres5.csv"
DFgenres = spark.read.csv(file, header=True)

#Comments of the users related to books
file = "/data/SMARTDATA/books/anobii/link_person_item_comment.csv"
DFreviews = spark.read.csv(file, header=True)

#Remove noise data
DFinfo = DFinfo.filter((DFinfo.item_review == '0') | (DFinfo.item_review == '1') | (DFinfo.item_review == '2') | (DFinfo.item_review == '3') | (DFinfo.item_review == '4') | (DFinfo.item_review == '5'))

#FUNCTIONS
def title_and_subtitle(line): #To split title and subtitle
    if len(line.title.split(" : ")) > 1: #Book possesses a subtitle
        title = line.title.split(" : ")[0]
        sub_title = line.title.split(" : ")[1]
        return Row(line.manifestation_id_new, line.patron_id_md5, title, sub_title, line.author, 4, "None", 4, "None", line.ISBNISSN_new, "bct") #NE APPROFITTO ANCHE PER INSERIRE VOTO E CAMPI PER IL MERGE (PER ORA NULL)
    else: #Il libro non possiede un sottotitolo
        return Row(line.manifestation_id_new, line.patron_id_md5, line.title, "None", line.author, 4, "None", 4, "None", line.ISBNISSN_new, "bct")

def create_user_dictionary(rdd): #Used to assign an integer id to each user of the rdd (to get the rows of the CSR)
    rdd = rdd.map(lambda x: (str(x.person_id), list(x))).sortByKey()
    user_dictionary = rdd.countByKey()
    i = 0
    for key in user_dictionary.keys():
        user_dictionary[key] = i
        i += 1
    return user_dictionary

def create_book_dictionary(rdd): #Used to assign an integer id to each book of the rdd (to get the column of the CSR)
    rdd = rdd.map(lambda x: (x.book_id, list(x))).sortByKey()
    book_dictionary = rdd.countByKey()
    i = 0
    for key in book_dictionary.keys():
        book_dictionary[key] = i
        i += 1
    return book_dictionary

def addDataType1(line):
    return Row(line.item_id, line.person_id, line.title, str(line.sub_title), line.item_review, line.total_review, line.average_rating, str(line.total_votes), line.isbn, "anobii")


def addRowColumnId(line): #Used to add index to books and users (not used anymore)
    book_number = book_dictionary[line.book_id] 
    user_number = user_dictionary[line.person_id]
    return Row(line.book_id,
               line.title,
               line.sub_title,
               line.total_wishlist,
               line.no_of_page,
               line.publication_date,
               line.publisher,
               line.binding,
               line.edition,
               line.product_type,
               line.total_votes,
               line.data_type,
               line.person_id,
               line.item_review,
               line.author,
               line.genre,
               line.encrypt_item_id,
               line.isbn,
               line.total_count,
               line.average_rating,
               line.total_review,
               str(user_number), 
               str(book_number))
    return Row(line.person_id, line.title, str(line.sub_title), line.item_review, line.data_type, line.book_id, line.author, line.genre, line.encrypt_item_id, line.total_votes_or_loans, line.average_rating, line.total_review, line.isbn, str(user_number), str(book_number))

def title_and_subtitle_books(line): #FUNZIONE USATA PER DIVIDERE TITOLO E SOTTOTITOLO NEL DATASET DEI LIBRI DELLE BIBLIOTECHE E PER ELIMINARE EDITION_DATE ED EDITION_LANGUAGE
    if len(line.title.split(" : ")) > 1:
        title = line.title.split(" : ")[0]
        sub_title = line.title.split(" : ")[1]
    else:
        title = line.title
        sub_title = "None"
    #"title", "sub_title", "author", "publisher", "book_id"
    return Row(title, sub_title, line.author, line.publisher, line.manifestation_id_new, line.ISBNISSN_new)

def concatenateContentsDescriptions(line1, line2):
    #More than one description in the same book
    return Row(book_id=line1.book_id, title=line1.title, sub_title=line1.sub_title, 
               total_wishlist=line1.total_wishlist, no_of_page=line1.no_of_page, publication_date=line1.publication_date, 
               publisher=line1.publisher, binding=line1.binding, edition=line1.edition, product_type=line1.product_type, 
               total_votes=line1.total_votes, data_type=line1.data_type, author=line1.author, genre=line1.genre, 
               encrypt_item_id=line1.encrypt_item_id, isbn=line1.isbn, total_count=line1.total_count, average_rating=line1.average_rating, 
               total_review=line1.total_review, content=line1.content + " " + line2.content, content2=line1.content2) #book_index=line1.book_index,

def toSingleDescriptions(line1):
    return Row(book_id=line1[1].book_id, title=line1[1].title, sub_title=line1[1].sub_title, 
               total_wishlist=line1[1].total_wishlist, no_of_page=line1[1].no_of_page, publication_date=line1[1].publication_date, 
               publisher=line1[1].publisher, binding=line1[1].binding, edition=line1[1].edition, product_type=line1[1].product_type, 
               total_votes=line1[1].total_votes, data_type=line1[1].data_type, author=line1[1].author, genre=line1[1].genre, 
               encrypt_item_id=line1[1].encrypt_item_id, isbn=line1[1].isbn, total_count=line1[1].total_count, average_rating=line1[1].average_rating, 
               total_review=line1[1].total_review,  content=line1[1].content, content2=line1[1].content2)#book_index=line1[1].book_index,

def concatenateContentsComments(line1, line2):
    #Comments concatenation to description
    return Row(book_id=line1.book_id, title=line1.title, sub_title=line1.sub_title, 
               total_wishlist=line1.total_wishlist, no_of_page=line1.no_of_page, publication_date=line1.publication_date, 
               publisher=line1.publisher, binding=line1.binding, edition=line1.edition, product_type=line1.product_type, 
               total_votes=line1.total_votes, data_type=line1.data_type, author=line1.author, genre=line1.genre, 
               encrypt_item_id=line1.encrypt_item_id, isbn=line1.isbn, total_count=line1.total_count, average_rating=line1.average_rating, 
               total_review=line1.total_review,  content=line1.content, content2=line1.content2, comment_content = str(line1.comment_content) + " " + str(line2.comment_content))#book_index=line1.book_index,

def mergeDescriptionComments(line1):
    return Row(book_id=line1.book_id, title=line1.title, sub_title=line1.sub_title, 
               total_wishlist=line1.total_wishlist, no_of_page=line1.no_of_page, publication_date=line1.publication_date, 
               publisher=line1.publisher, binding=line1.binding, edition=line1.edition, product_type=line1.product_type, 
               total_votes=line1.total_votes, data_type=line1.data_type, author=line1.author, genre=line1.genre, 
               encrypt_item_id=line1.encrypt_item_id, isbn=line1.isbn, total_count=line1.total_count, average_rating=line1.average_rating, 
               total_review=line1.total_review,  content2=line1.content2, content=str(line1.content) + " " + str(line1.comment_content))#book_index=line1.book_index,

def toSingleComments(line1):
    #print(line1)
    return Row(book_id=line1[1].book_id, title=line1[1].title, sub_title=line1[1].sub_title, 
               total_wishlist=line1[1].total_wishlist, no_of_page=line1[1].no_of_page, publication_date=line1[1].publication_date, 
               publisher=line1[1].publisher, binding=line1[1].binding, edition=line1[1].edition, product_type=line1[1].product_type, 
               total_votes=line1[1].total_votes, data_type=line1[1].data_type, author=line1[1].author, genre=line1[1].genre, 
               encrypt_item_id=line1[1].encrypt_item_id, isbn=line1[1].isbn, total_count=line1[1].total_count, average_rating=line1[1].average_rating, 
               total_review=line1[1].total_review, content2=line1[1].content2, content=line1[1].content, comment_content = line1[1].comment_content)# book_index=line1[1].book_index,
print("Raws data have been read")

In [None]:
#FILTERING START CELL

#these are the columns which are needed for our work, for the item table. The others can be dropped
DFitems_anobii_cols_to_keep = [
                        "item_id",\
                        "isbn",\
#                         "family_id",\
                        "title",\
                        "sub_title",\
                        # "barcode",\
                        # "image_source",\
                        # "image_width",\
                        # "image_height",\
                        "no_of_page",\
                        "publication_date",\
                        "publisher",\
                        "binding",\
                        "edition",\
                        # "reading_level",\
                        # "height",\
                        # "height_unit",\
                        # "length",\
                        # "length_unit",\
                        # "width",\
                        # "width_unit",\
                        # "weight",\
                        # "weight_unit",\
                        # "salesrank",\
                        # "item_popularity",\
                        # "check_same_family",\
                        # "check_internal_family",\
                        # "last_update",\
                        "average_rating",\
                        "total_review",\
                        "product_type",\
                        # "title_foreign",\
                        # "image_process",\
                        # "amazon",\
                        # "google",\
                        "encrypt_item_id",\
                        # "pid",\
                        # "binding_id",\
                        # "problem",\
                        "language",\
                        # "language_correct",\
                        # "ean",\
                        # "family_head",\
                        # "google_time",\
                        # "total_world",\
                        # "lock",\
                        # "volumes",\
                        # "publication_country_id",\
                        # "added_date",\
                        # "publishing_status",\
                        # "total_libraries",\
                        # "unavailable_date",\
                        # "table_of_contents_html",\
                        # "product_form_detail",\
                        # "total_edition_ratings",\
                        # "epub_url",\
                        # "imprint_name",\
                        # "is_sellable",\
                        # "total_topics",\
                        # "publisher_id",\
                        "total_wishlist",\
                        # "embargo_date",\
                        # "data_source",\
                        # "ebook_type",\
                        # "has_ebook",\
                        # "fulfillment_book_id",\
                        # "ebook_filesize",\
                        # "sample_status",\
                        # "sample_url",\
                        # "sample_filesize",\
                      ]
DFitems_anobii = DFitems_anobii.select(DFitems_anobii_cols_to_keep)
print("Filter Anobii data (select books in italian, drop comics and books with a low number of ratings)")
#FILTERING 1: italian language
#Filtering tuples with language != '11'

DFfilteredlanguageitems = DFitems_anobii.filter(DFitems_anobii.language == "11")
DFitems_anobii = DFitems_anobii.select(DFitems_anobii_cols_to_keep)
print(DFfilteredlanguageitems.count())

#FILTERING 2: only "hardcover" and "paperback" (filtering on binding attribute)

DFfilteredbindingitems = DFfilteredlanguageitems.filter((DFfilteredlanguageitems.binding == "Paperback") | (DFfilteredlanguageitems.binding == "Hardcover"))
print(DFfilteredbindingitems.count())

#FILTERING 3: delete periodical books (as possible)
#Keywords:Paperino, Topolino, Bonelli comics
DFfilteredmagazineitems = DFfilteredbindingitems.filter(~(DFfilteredbindingitems.title.contains("Topolino")) | (DFfilteredbindingitems.title.contains("Paperino"))|(DFfilteredbindingitems.title.contains("Tex"))|(DFfilteredbindingitems.title.contains("Dylan Dog"))|(DFfilteredbindingitems.title.contains("Nathan Never"))|(DFfilteredbindingitems.title.contains("Zagor")))
print(DFfilteredmagazineitems.count())

In [None]:
#CELL PLOT

if False:
    import matplotlib.pyplot as plt
    from matplotlib import rcParams
    size = 30
    f = 1
    rcParams['figure.figsize'] = 12.6*f, 7*f
    rcParams['xtick.labelsize'] = size
    rcParams['ytick.labelsize'] = size
    rcParams['font.size'] = size
    rcParams["font.sans-serif"] = ["Liberation Sans"]
    rcParams["font.family"] = "sans-serif"
    rcParams.update({'figure.autolayout': False})
    rcParams['pdf.fonttype'] = 42
    rcParams['ps.fonttype'] = 42

    import numpy as np
    from itertools import accumulate

    if False:
        #PLOT CDF
        file = "/data/SMARTDATA/books/anobii_2021/sql/link_person_item.csv"
        DF = spark.read.csv(file,header=True)
        #print("There are "+str(DF.count())+" ratings")
        prova = DF.groupby(DF.person_id).count().toPandas()
        #print("the mean is "+str(prova['count'].mean()))
        #print("the median is "+str(prova['count'].median()))
        # fig,ax = plt.subplots()
        # USE numpy's HISTOGRAM FUNCTION TO COMPUTE BINS
        xh, xb = np.histogram(prova['count'], bins=6000, normed=True)
        # COMPUTE THE CUMULATIVE SUM WITH accumulate
        xh = list(accumulate(xh))

        # NORMALIZE THE RESULT
        xh = np.array(xh) / max(xh)
        fig,ax = plt.subplots()
        # PLOT WITH LABEL
        ax.plot(xb[1:], xh, label="Anobii")
        #ax.plot(xbBCT[1:], xhBCT, label="BCT")
        #ax.grid()
        #ax.legend()
        ax.set_xlim(-20, 3000)#10000)
        ax.set_ylim(0.0, 1.1)
        #ax.set_title("CDF of Loans per User")
        ax.set_xlabel("Ratings per User")
        ax.set_ylabel("CDF")
        #ax.legend(frameon=False)
        plt.savefig("006_cdfLoansPerUserAnobii.pdf",\
                            # bbox_extra_artists=(first_legend,second_legend), \
                            bbox_inches='tight')
        plt.close()

        #Number of users in top 10 countries
        file = "/data/SMARTDATA/books/anobii_2021/sql/person.csv" #item_review ha il voto dato dall'utente a quel libro
        DFusers = spark.read.csv(file,header=True)
        DFusers_2 = DFusers.filter(DFusers.person_active == '1')

        # Read table which contains the mapping of the language (language=11 means italian)
        file = "/data/SMARTDATA/books/anobii_2021/sql/language_mapping.csv"
        DFlanguages = spark.read.csv(file,header=True)

        DFjoined = DFusers.join(DFlanguages, DFusers.language == DFlanguages.language_id)
        DFjoined_2 = DFusers_2.join(DFlanguages, DFusers.language == DFlanguages.language_id)

        DFcounting = DFjoined.select('language_name').groupBy('language_name').count().orderBy('count', ascending=False)

        language_name = ['English', 'Italiano', '繁體中文', 'Español', 'Deutsch', 'Nederlands', 'РУССКИЙ', 'Français', '한국어']
        language_name_2 = ['English', 'Italian', 'Chinese', 'Spanish', 'German', 'Dutch', 'Russian', 'French', 'Korean']

        genre_users_language = {}
        genre_users_language_2 = {}

        for language, language2 in zip(language_name, language_name_2):
            count = DFjoined.filter(DFjoined.language_name.contains(language)).count()
            count_2 = DFjoined_2.filter(DFjoined_2.language_name.contains(language)).count()
            genre_users_language[language2] = count
            genre_users_language_2[language2] = count_2

        names = list(genre_users_language.keys())
        values = list(genre_users_language.values())

        names_2 = list(genre_users_language_2.keys())
        values_2 = list(genre_users_language_2.values())

        X_axis = np.arange(len(language_name))
        plt.figure()
        plotting = plt.bar(X_axis - 0.2, values, 0.4)
        plotting_2 = plt.bar(X_axis + 0.2, values_2, 0.4)
        plt.legend(['all users', 'active users'])
        plt.xticks(X_axis, language_name_2, rotation='vertical')
        plt.ylabel('Number of users')
        plt.savefig('007_number_anobii_users_countries.pdf', bbox_inches='tight')
        plt.close()
    if False:
        #Number of books per top 10 language
        file = "/data/SMARTDATA/books/anobii_2021/sql/item.csv" #item_review ha il voto dato dall'utente a quel libro
        DFbooks = spark.read.csv(file,header=True)

        file = "/data/SMARTDATA/books/anobii_2021/sql/language_mapping.csv"
        DFlanguages = spark.read.csv(file,header=True)
        DFjoined = DFbooks.join(DFlanguages, DFbooks.language == DFlanguages.language_id)

        DFcounting = DFjoined.select('language_name').groupBy('language_name').count().orderBy('count', ascending=False)

        language_name = ['English', 'Italiano', '繁體中文', 'Español', 'Deutsch', 'Nederlands', 'РУССКИЙ', 'Français', '日本語']
        language_name_2 = ['English', 'Italian', 'Chinese', 'Spanish', 'German', 'Dutch', 'Russian', 'French', 'Japanese']

        genre_users_language = {}

        for language, language2 in zip(language_name, language_name_2):
            count = DFjoined.filter(DFjoined.language_name.contains(language)).count()
            genre_users_language[language2] = count

        names = list(genre_users_language.keys())[0:5]
        values = list(genre_users_language.values())[0:5]

        plt.figure()
        plt.bar(names, values)
        plt.xticks(rotation='vertical')
        plt.ylabel('Number of books')
        plt.savefig('008_number_anobii_books_countries.pdf', bbox_inches='tight')
        plt.close()
    if False:
        #Occurrences of ratings in Anobii dataset
        DFinfo_grouped = DFinfo.groupBy('item_review').count()
        DFinfo_grouped_pandas = DFinfo_grouped.toPandas()
        list_ratings = DFinfo_grouped_pandas['item_review'].to_list()
        list_votes = DFinfo_grouped_pandas['count'].to_list()

        list_ratings_votes = list(zip(list_ratings, list_votes))
        list_ratings_votes.sort()

        list_ratings = [item[0] for item in list_ratings_votes]
        list_votes = [item[1] for item in list_ratings_votes]

        plt.bar(list_ratings, list_votes)
        plt.xlabel('Rate')
        plt.ylabel('Number of rated books')
        plt.savefig('009_rates_anobii_books.pdf', bbox_inches='tight')
        plt.close()
    if False:
        #GENRES OCCURRENCE IN GENRE DATASET
        DFgenres_grouped = DFgenres.groupBy('name').count()
        DFgenres_grouped_pandas = DFgenres_grouped.toPandas()
        list_genres = DFgenres_grouped_pandas['name']
        list_counts = DFgenres_grouped_pandas['count']

        plt.figure()
        plt.bar(list_genres, list_counts)
        plt.xticks(rotation='vertical')
        plt.ylabel('Genre occurrence')
        plt.savefig('010_genre_occurrences.pdf', bbox_inches='tight')
        plt.close()

    if False:
        #PLOT FOR MERGED DATASET (MOST FREQUENT KEYWORD in TOP 5)
        import pandas as pd
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_top5keywords = DF_final['top_keywords_5_with_comment'].to_list()

        list_keywords = [eval(item) for item in DF_final_top5keywords]
        list_flat_keywords = [item for sublist in list_keywords for item in sublist]

        keywords = {}

        for keyword in list_flat_keywords: 
            if keyword not in keywords.keys():
                keywords[keyword] = 1
            else:
                keywords[keyword] += 1

        keywords = dict(sorted(keywords.items(), key=lambda item: item[1], reverse=True))

        names = list(keywords.keys())[0:20]
        values = list(keywords.values())[0:20]

        plt.figure()
        plt.bar(names, values)
        plt.ylabel('Number of occurrences')
        plt.xticks(rotation='vertical')
        plt.savefig('016_keyword_occurrences_with_comm.pdf', bbox_inches='tight')
        plt.close()

    if False:
        #CALCULATE ENTROPY
        #minus Sum for each keyword(Occurrence_keyword/Total_occurrences * logbasekeywordnumber(Occurrence_keyword/Total_occurrences))
        #How many keywords?
        import math
        keyword_number = ['5', '10', '15', '20']
        entropy_with_values = []
        entropy_without_values = []
        DF_final = pd.read_csv('2_final_dataset.csv')
        for number in keyword_number:
            list_keywords = DF_final[f'top_keywords_{number}_with_comment'].to_list()
            flat_list = [item for sublist in list_keywords for item in sublist]
            flat_list_unique = list(set(flat_list))

            entropy_array = []
            #Calculate entropy as above
            for keyword in flat_list_unique:

                flat_list_keyword = [item for item in flat_list if item == keyword] #Occurrences
                #if len(flat_list_keyword) > 150:
                    #print(keyword)
                #print(len(flat_list_keyword))
                entropy_keyword = (len(flat_list_keyword) / len(flat_list)) * (math.log((len(flat_list_keyword) / len(flat_list)), len(flat_list_unique)))
                entropy_array.append(entropy_keyword)

            entropy_np = np.asarray(entropy_array)
            entropy_with = -(np.sum(entropy_np))
            entropy_with_values.append(entropy_with)
            print(entropy_with_values)
            list_keywords = DF_final[f'top_keywords_{number}_without_comm'].to_list()
            flat_list = [item for sublist in list_keywords for item in sublist]
            flat_list_unique = list(set(flat_list))

            entropy_array = []
            #Calculate entropy as above
            for keyword in flat_list_unique:
                flat_list_keyword = [item for item in flat_list if item == keyword] #Occurrences
                #if len(flat_list_keyword) > 150:
                    #print(keyword)
                #print(len(flat_list_keyword))
                entropy_keyword = (len(flat_list_keyword) / len(flat_list)) * (math.log((len(flat_list_keyword) / len(flat_list)), len(flat_list_unique)))
                entropy_array.append(entropy_keyword)

            entropy_np = np.asarray(entropy_array)
            entropy_without = -(np.sum(entropy_np))
            entropy_without_values.append(entropy_without)

        #GRAPHS ENTROPY
        plt.figure()
        plt.plot(keyword_number, entropy_with_values)
        plt.plot(keyword_number, entropy_without_values)
        plt.ylabel('Entropy')
        plt.xlabel('Number of keywords per book')
        plt.legend(['With comments','No comments'])
        plt.savefig('017_entropy_keywords.pdf', bbox_inches='tight')
        plt.close()

        #COVERAGE GENRE KEYWORDS (SAVED BEFOREHAND FOR COMFORT)
        possible_keywords = [5, 10, 15, 20, 30, 50]
        average_percentages = [49.086402663093516, 65.79894419074137, 75.69834836556531, 82.83928946909826, 91.31414622247611, 98.63530510583139]


        import matplotlib.pyplot as plt

        plt.figure()
        plt.plot(possible_keywords, average_percentages)
        plt.xlabel('Number of keywords per book')
        plt.ylabel('Coverage percentage')
        plt.savefig('018_coverage_keywords.pdf', bbox_inches='tight')
        plt.close()

        #feel-it mood plot
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_grouped = DF_final.groupby(['sentiment']).count()
        list_mood = ['negative','positive']
        list_values = [1878, 553]

        plt.figure()
        plt.bar(list_mood, list_values)
        plt.ylabel('Number of books')
        plt.savefig('019_mood_feelit.pdf', bbox_inches='tight')
        plt.close()

        #feel-it emotion plot
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_grouped = DF_final.groupby(['emotion']).count()
        #print(DF_final_grouped)
        list_mood = ['anger','joy', 'sadness', 'fear']
        list_values = [161, 733, 1179, 358]

        plt.figure()
        plt.bar(list_mood, list_values)
        plt.ylabel('Number of books')
        plt.savefig('020_emotion_feelit.pdf', bbox_inches='tight')
        plt.close()

        #VADER mood labels plot
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_grouped = DF_final.groupby(['vader_score']).count()
        print(DF_final_grouped)
        list_mood = ['negative','positive']
        list_values = [1076, 1355]

        plt.figure()
        plt.bar(list_mood, list_values)
        plt.ylabel('Number of books')
        plt.savefig('021_mood_vader.pdf', bbox_inches='tight')
        plt.close()

        #TEXTBLOB mood plot
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_grouped = DF_final.groupby(['textblob_english_sentiment']).count()
        print(DF_final_grouped)
        list_mood = ['negative','positive']
        list_values = [607, 1824]

        plt.figure()
        plt.bar(list_mood, list_values)
        plt.ylabel('Number of books')
        plt.savefig('022_mood_textblob.pdf', bbox_inches='tight')
        plt.close()

        #majority mood plot
        DF_final = pd.read_csv('2_final_dataset.csv')
        DF_final_grouped = DF_final.groupby(['majority']).count()
        #print(DF_final_grouped)
        list_mood = ['negative','positive']
        list_values = [1129, 1302]

        plt.figure()
        plt.bar(list_mood, list_values)
        plt.ylabel('Number of books')
        plt.savefig('023_mood_majority.pdf', bbox_inches='tight')
        plt.close()

In [None]:
#PROCESSING DATAFRAME GENRES#

#Phase 1: delete tuple selfhelp, referece, textbook as genre
DFgenres = DFgenres.filter((DFgenres.name != "SelfHelp") & (DFgenres.name != "Reference") & (DFgenres.name != "Textbook"))

#Phase 1.2: delete tuples with votes == null (noise data)
DFgenres = DFgenres.filter(~(DFgenres.votes.isNull()))

#Phase 2: map genres by aggregating them as described in the thesis
RDDgenres = DFgenres.rdd

def mapGenres(line):
    transformed_line = line.name
    if transformed_line == "Cooking-Food&Wine" or transformed_line == "Games" or transformed_line == "Crafts&Hobbies" or transformed_line == "Home&Gardening" or transformed_line == "Music" or transformed_line == "Art-Architecture&Photography" or transformed_line == "Sports-Outdoors&Adventure" or transformed_line == "Entertainment":
        transformed_line = "FreeTime"
    elif transformed_line == "Professional&Technical" or transformed_line == "Computer&Technology" or transformed_line == "Law" or transformed_line == "Medicine" or transformed_line == "Business&Economics" or transformed_line == "ForeignLanguageStudy" or transformed_line == "Education&Teaching":
        transformed_line = "Professional&Technical"
    elif transformed_line == "Health-Mind&Body" or transformed_line == "Religion&Spirituality":
        transformed_line = "Health-Mind&Body"
    elif transformed_line == "Family-Sex&Relationships" or transformed_line == "Gay&Lesbian":
        transformed_line = "Family-Sex&Relationships"
    elif transformed_line == "Science&Nature" or transformed_line == "Pets":
        transformed_line = "Science&Nature"
    elif transformed_line == "Children" or transformed_line == "Teens":
        transformed_line = "Children&Teens"
    #elif transformed_line == "Fiction&Literature":
    #    transformed_line = "None"
    return Row(id=line.id, familyid=line.familyid, itemid=line.itemid, categoryid=line.categoryid, slug=line.slug, name=transformed_line, languageid=line.languageid, votes = line.votes)

RDDgenres_mapped = RDDgenres.map(mapGenres)
DFgenres = RDDgenres_mapped.toDF([])

#delete fiction & literature
print(DFgenres.count())
DFgenres = DFgenres.filter(DFgenres.name != "Fiction&Literature")
print(DFgenres.count())

#Phase 2.2: Aggregation genres

DFgenres = DFgenres.select("*", F.sum(DFgenres.votes).over(Window.partitionBy(DFgenres.itemid, DFgenres.name)).alias("new_votes")).drop(DFgenres.votes)
DFgenres = DFgenres.withColumnRenamed("new_votes", "votes").dropDuplicates(["itemid", "name"])

#Phase 3: keeping only 4 genres max per book
DFgenres = DFgenres.withColumn("rank", row_number().over(Window.partitionBy(DFgenres.itemid).orderBy(col("votes").desc()))).filter(col("rank") <= 4)

#Phase 4: aggregation of book in one tuple (genres are now a list) 

DFgenres = DFgenres.select(concat_ws("/", DFgenres.name, DFgenres.votes).alias("genre_with_votes"), "id", "familyid", "itemid", "categoryid", "languageid", "slug")

RDDgenres = DFgenres.rdd
RDDgenres_pair = RDDgenres.map(lambda x: (x.itemid, x))

def aggregateGenres(line1, line2):
    return Row(id=line1.id, familyid=line1.familyid, itemid=line1.itemid, categoryid=line1.categoryid, slug=line1.slug, languageid=line1.languageid, genre_with_votes = line1.genre_with_votes + " " + line2.genre_with_votes)

RDDgenres_reduced = RDDgenres_pair.reduceByKey(aggregateGenres)

def toSingleGenres(line1):
    return Row(id=line1[1].id, familyid=line1[1].familyid, itemid=line1[1].itemid, categoryid=line1[1].categoryid, slug=line1[1].slug, languageid=line1[1].languageid, genre_with_votes = line1[1].genre_with_votes)

RDDgenres_single = RDDgenres_reduced.map(toSingleGenres)

def toDictGenre(line1):
    dict_genres = {}
    genre_vote_list = line1.genre_with_votes.split(" ")
    for genre_vote in genre_vote_list:
        genre = genre_vote.split("/")[0]
        vote = genre_vote.split("/")[1]
        dict_genres[genre] = vote

    return Row(id=line1.id, familyid=line1.familyid, itemid=line1.itemid, categoryid=line1.categoryid, slug=line1.slug, languageid=line1.languageid, genre_with_votes = dict_genres)

RDDgenres_single = RDDgenres_single.map(toDictGenre)
DFgenres = RDDgenres_single.toDF([])

In [None]:
#COMMENT HANDLING CELL
from pyspark.sql import functions as F
file = "/data/SMARTDATA/books/anobii/link_person_item_comment.csv"
DFreviews = spark.read.csv(file, header=True)

#Comment length filtering (80 words per comment min)
DFreviews = DFreviews.filter(F.size(F.split('comment_content', ' ')) >= 80)

#PLOT COMMENTS LENGTH
if False:
    import matplotlib.pyplot as plt
    import numpy as np

    file = "/data/SMARTDATA/books/anobii/link_person_item_comment.csv"
    DFreviews = spark.read.csv(file, header=True)
    DFreviews = DFreviews.withColumn("comment_length", F.size(F.split('comment_content', ' ')))
    DFreviews_count_words = DFreviews.groupBy('comment_length').count()
    #DFreviews_count_words.show(1, False)
    DFreviews_count_words_pandas = DFreviews_count_words.toPandas().sort_values(by=['comment_length'])
    #print(DFreviews_count_words_pandas)
    list_lengths = DFreviews_count_words_pandas['comment_length'].to_list()
    list_counts = DFreviews_count_words_pandas['count'].to_list()
    plt.figure()
    plt.plot(list_lengths[1:201], list_counts[1:201])
    plt.xlabel('Length of comment in words')
    plt.ylabel('Number of comments')
    plt.savefig('011_comment_length.pdf', bbox_inches='tight')
    plt.close()

In [None]:
#FILTERING CELL 2

#FILTERING 4: books with more than X (selectable) votes.

DFstarsfilteredno0 = DFinfo#.filter(DFinfo.item_review > 0) #KEEP THE 0 votes because they are still implicit interactions

#Count books with most ratings and sort them in descending order
DFgrouped = DFstarsfilteredno0.groupby("item_id").count().withColumnRenamed("count", "total_votes")\
                              .sort("total_votes", ascending=False)


#list_books = []
#for i in [50, 100, 150, 200, 250, 300]: #FOR RATING BOOKS PLOT
#29537 books with 50 threshold, 15273 books with 100 threshold
DFgroupedfiltered = DFgrouped.filter(DFgrouped['total_votes'] >= 300) #300 = RATING THRESHOLD
DFjoinbookstars = DFfilteredmagazineitems.join(DFgroupedfiltered, DFfilteredmagazineitems.item_id == DFgroupedfiltered.item_id).drop(DFgroupedfiltered.item_id)
print(DFjoinbookstars.count())
#list_books.append(DFjoinbookstars.count())

# DFjoinbookstars:
# +-------+----------+-----------------+---------+----------+----------------+---------+---------+-------+----------------+------------+------------+------------------+--------+--------------+-----------+
# |item_id|isbn      |title            |sub_title|no_of_page|publication_date|publisher|binding  |edition|average_rating  |total_review|product_type|encrypt_item_id   |language|total_wishlist|total_votes|
# +-------+----------+-----------------+---------+----------+----------------+---------+---------+-------+----------------+------------+------------+------------------+--------+--------------+-----------+
# |1981748|8845260372|Giulietta squeenz|null     |209       |2008-05-01      |Bompiani |Paperback|null   |3.18773234200743|129         |1           |014f24ec9629744c88|11      |110           |543        |
# +-------+----------+-----------------+---------+----------+----------------+---------+---------+-------+----------------+------------+------------+------------------+--------+--------------+-----------+

#FILTERING 5: Various cleaning
#Taking titles and trasforming them in lower case for merging 
DFmanifestations_definitive_anobii = DFjoinbookstars.select("item_id", \
                                                            lower(DFjoinbookstars.title), \
                                                            lower(DFjoinbookstars.sub_title), \
                                                            "isbn",\
                                                            "average_rating", \
                                                            "total_review", \
                                                            "total_wishlist", \
                                                            "no_of_page",
                                                            "publication_date",
                                                            "publisher",
                                                            "binding",
                                                            "edition",
                                                            "product_type",
                                                            "total_votes",
                                                            "encrypt_item_id",)\
                                                    .withColumnRenamed("lower(title)", "title")\
                                                    .withColumnRenamed("lower(sub_title)", "sub_title")
# DFmanifestations_definitive_anobii.show(1, False)
# DFmanifestations_definitive_anobii: (uguale a DFjoinbookstars ma senza language)
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+
# |item_id|title            |sub_title|isbn      |average_rating  |total_review|total_wishlist|no_of_page|publication_date|publisher|binding  |edition|product_type|total_votes|encrypt_item_id   |
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+
# |1981748|giulietta squeenz|null     |8845260372|3.18773234200743|129         |110           |209       |2008-05-01      |Bompiani |Paperback|null   |1           |543        |014f24ec9629744c88|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+

if False:
    #PLOT RATING THRESHOLD
    plt.figure()
    plt.plot([50,100,150,200,250,300], list_books)
    plt.xlabel('Rating threshold')
    plt.ylabel('Number of books')
    plt.savefig('012_rating_threshold.pdf', bbox_inches='tight')
    plt.close()

In [None]:
#GENRES INTEGRATION CELL

print("Anobii data have been filtered (select books in italian, drop comics and books with a low number of ratings)")
print("Integrate with genre and author attributes Anobii data")
#add genres to the item table: this is contained in DFgenres:
# +---+--------+-------+----------+------------------+------------------+----------+-----+
# |id |familyid|itemid |categoryid|slug              |name              |languageid|votes|
# +---+--------+-------+----------+------------------+------------------+----------+-----+
# |1  |32423317|2823074|3         |business-economics|Business&Economics|3         |2.0  |
# +---+--------+-------+----------+------------------+------------------+----------+-----+
# Notice that if a book has more than 1 genre, DFgenres contained more than one row with that item_id, one for genre.
# Aggregation of rows with same itemid is needed 
#DFgenres_aggregated = DFgenres.groupBy('itemid')\
#                              .agg(F.concat_ws(" / ", F.collect_list('name'))\
#                                    .alias('genres'))
# DFgenres_aggregated:
# +-------+----------------------------------------------------------------------------------------------+
# |itemid |genres                                                                                        |
# +-------+----------------------------------------------------------------------------------------------+
# |1981748|Humor / Fiction&Literature / Romance / Family-Sex&Relationships / Teens / Philosophy / History|
# +-------+----------------------------------------------------------------------------------------------+

# join between items and DFgenres_aggregated to have, for each item, its genres
DFmanifestations_definitive_anobii_genres = DFmanifestations_definitive_anobii.join(DFgenres,\
                                                                                    DFgenres.itemid == DFmanifestations_definitive_anobii.item_id)\
                                                                              .select(DFmanifestations_definitive_anobii["*"],\
                                                                                      DFgenres.genre_with_votes) 

In [None]:
#GENRE PLOTS CELL
if False:
    import matplotlib.pyplot as plt
    from matplotlib import rcParams
    size = 30
    f = 1
    rcParams['figure.figsize'] = 12.6*f, 7*f
    rcParams['xtick.labelsize'] = size
    rcParams['ytick.labelsize'] = size
    rcParams['font.size'] = size
    rcParams["font.sans-serif"] = ["Liberation Sans"]
    rcParams["font.family"] = "sans-serif"
    rcParams.update({'figure.autolayout': False})
    rcParams['pdf.fonttype'] = 42
    rcParams['ps.fonttype'] = 42

    if False:
        #PLOT OCCURRENCE GENRES IN FILTERED DATASET
        genre_counter = {}

        list_genres = DFmanifestations_definitive_anobii_genres.toPandas()['genre_with_votes'].to_list()
        #print(list_genres)

        for genre_dict in list_genres:
            #genre_dict = eval(genre_dict)
            for genre in genre_dict.keys():
                if genre in genre_counter.keys():
                    genre_counter[genre] += 1
                else:
                    genre_counter[genre] = 1
        genre_counter.pop('name', None)
        print(genre_counter)

        names = list(genre_counter.keys())
        values = list(genre_counter.values())

        histogram = plt.bar(names,values)
        plt.xticks(rotation='vertical')
        plt.ylabel('Genre occurrence')
        plt.savefig('013_genres_occurrence_filtering.pdf', bbox_inches='tight')
        plt.close()
    if False:
        #PLOT NUMBER OF BOOKS PER NUMBER OF GENRES

        file = "anobii_genres/new_genres5.csv"
        DFgenres = spark.read.csv(file, header=True)

        from pyspark.sql import functions as F, Window
        DFgenres_part = DFgenres.select("*", F.row_number().over(Window.partitionBy(DFgenres.itemid).orderBy(DFgenres.votes.cast("int").desc())).alias("id_inc"))

        _list_means = []
        _list_counts = []
        _list = [i+1 for i in range(21)]
        import numpy as np
        from pyspark.sql.functions import mean, sum, count
        for i in _list:
            DFfiltered = DFgenres_part.filter(DFgenres_part.id_inc == i)
            _count = DFfiltered.select(count(DFfiltered.votes.cast('int'))).collect()
            _mean = DFfiltered.select(mean(DFfiltered.votes.cast('int'))).collect()
            _list_means.append(_mean[0][0])
            _list_counts.append(_count[0][0])
            print(_mean[0][0])
            print(_count[0][0])

        plt.figure()
        plotting = plt.bar(_list, _list_counts)
        #plt.xticks(rotation='vertical')
        plt.ylabel('Number of books')
        plt.xlabel('Number of genres')
        plt.savefig('014_number_books_per_number_genres.pdf', bbox_inches='tight')
        plt.close()

    if False:
        #PLOT AVERAGE SCORE FOR NUMBER OF GENRES
        plt.figure()
        plotting = plt.plot(_list, _list_means)
        plt.ylabel('Average total score')
        plt.xlabel('Number of genres')
        #plt.xticks(rotation='vertical')
        #plt.xticks(range(len(_list))[1:], [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20])
        plt.savefig('015_average_score_for_genre_number.pdf', bbox_inches='tight')
        plt.close()

In [None]:
#AUTHOR AND GENRE INTEGRATION CELL

# DFmanifestations_definitive_anobii_genres:
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+
# |item_id|title            |sub_title|isbn      |average_rating  |total_review|total_wishlist|no_of_page|publication_date|publisher|binding  |edition|product_type|total_votes|encrypt_item_id   |genres                                                                                        |
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+
# |1981748|giulietta squeenz|null     |8845260372|3.18773234200743|129         |110           |209       |2008-05-01      |Bompiani |Paperback|null   |1           |543        |014f24ec9629744c88|History / Fiction&Literature / Family-Sex&Relationships / Teens / Humor / Romance / Philosophy|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+
# add a column which explain the origin of the data (anobii or BCT)
DFmanifestations_definitive_anobii_genres = DFmanifestations_definitive_anobii_genres.withColumn('data_type', lit("anobii"))
# DFmanifestations_definitive_anobii_genres.show(1, False)
# DFmanifestations_definitive_anobii_genres:
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+---------+
# |item_id|title            |sub_title|isbn      |average_rating  |total_review|total_wishlist|no_of_page|publication_date|publisher|binding  |edition|product_type|total_votes|encrypt_item_id   |genres                                                                                        |data_type|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+---------+
# |1981748|giulietta squeenz|null     |8845260372|3.18773234200743|129         |110           |209       |2008-05-01      |Bompiani |Paperback|null   |1           |543        |014f24ec9629744c88|Family-Sex&Relationships / Teens / Humor / Romance / Philosophy / History / Fiction&Literature|anobii   |
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+----------------------------------------------------------------------------------------------+---------+
# add author in the DFmanifestations_definitive_anobii_genres table
# find author_id for each book
DFmanifestations_anobii_genres_author = DFmanifestations_definitive_anobii_genres.join(DFauthorbooks, \
                                                                                       DFauthorbooks.item_id == DFmanifestations_definitive_anobii_genres.item_id)\
                                                                                 .select(DFmanifestations_definitive_anobii_genres["*"], \
                                                                                         DFauthorbooks["author_id"])
# DFmanifestations_anobii_genres_author.show(1, False)
# DFmanifestations_anobii_genres_author:
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+---------+
# |item_id|            title|sub_title|      isbn|  average_rating|total_review|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|   encrypt_item_id|              genres|data_type|author_id|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+---------+
# |1981748|giulietta squeenz|     null|8845260372|3.18773234200743|         129|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|014f24ec9629744c88|Romance / Humor /...|   anobii|   354644|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+---------+
# find author (name) for each book (and remove books which has same title but different auhtor?)
DFmanifestations_anobii_genres_author = DFmanifestations_anobii_genres_author.join(DFdisplay,\
                                                                                   DFmanifestations_anobii_genres_author.author_id == DFdisplay.author_id)\
                                                                             .select(DFmanifestations_anobii_genres_author["*"], \
                                                                                     DFdisplay["author_name"])\
                                                                             .drop("author_id")

# remove books which has same title but different auhtor
DFmanifestations_anobii_genres_author = DFmanifestations_anobii_genres_author.select("*", F.min(DFmanifestations_anobii_genres_author.author_name)\
                                                                                           .over(Window.partitionBy(DFmanifestations_anobii_genres_author.item_id))\
                                                                                           .alias("author"))\
                                                                             .drop("author_name")\
                                                                             .dropDuplicates(['item_id', 'author'])
# DFmanifestations_anobii_genres_author.show(1, False)
# DFmanifestations_anobii_genres_author:
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+
# |item_id|            title|sub_title|      isbn|  average_rating|total_review|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|   encrypt_item_id|              genres|data_type|    author|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+
# |1981748|giulietta squeenz|     null|8845260372|3.18773234200743|         129|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|014f24ec9629744c88|Philosophy / Hist...|   anobii|Pulsatilla|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+
#join of item and rating tables to obtain person_id, item_id, rating and all attributes of the item in the same row
#df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])
DFloans_definitive_anobii_genres = DFstarsfilteredno0.join(DFmanifestations_anobii_genres_author, \
                                                           DFmanifestations_anobii_genres_author.item_id == DFstarsfilteredno0.item_id)\
                                                     .select(DFmanifestations_anobii_genres_author["*"], \
                                                             DFstarsfilteredno0["person_id"], \
                                                             DFstarsfilteredno0["item_review"])
# DFloans_definitive_anobii_genres.show(1, False)
# DFloans_definitive_anobii_genres:
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+
# |item_id|            title|sub_title|      isbn|  average_rating|total_review|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|   encrypt_item_id|              genres|data_type|    author|person_id|item_review|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+
# |1981748|giulietta squeenz|     null|8845260372|3.18773234200743|         129|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|014f24ec9629744c88|Philosophy / Fict...|   anobii|Pulsatilla|  1244593|          3|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+

print("Anobii data have been integreted with genre and author attributes")
print("Read and filter BCT data")

In [None]:
#LOAD BCT FILTERED DATA

%run BCT_data_handling.ipynb #Così a quanto pare
DFmanifestations_definitive_bct = DFmanifestations_definitive
DFloans_definitive_bct = DFloans_definitive
# DFmanifestations_definitive_bct.show(1, False)
# DFmanifestations_definitive_bct:
# +----------------+------------+---------+-------------+---------+--------------------+------------+
# |edition_language|edition_date|    title|       author|publisher|manifestation_id_new|ISBNISSN_new|
# +----------------+------------+---------+-------------+---------+--------------------+------------+
# |             ita|        1996|Pinocchio|Carlo Collodi|   Nuages|              107930|  8807820714|
# +----------------+------------+---------+-------------+---------+--------------------+------------+
# DFloans_definitive_bct:
# +--------------------+--------------------+-------------------+-------------------+-------------------+------------+----------+-----------+
# |manifestation_id_new|       patron_id_md5|    loan_date_begin|      loan_date_end|           due_date|from_library|to_library|end_library|
# +--------------------+--------------------+-------------------+-------------------+-------------------+------------+----------+-----------+
# |              220771|fa242f1458100dccc...|2012-09-05 11:27:25|2012-09-27 18:18:00|2012-10-05 11:27:21|          18|        18|         18|
# +--------------------+--------------------+-------------------+-------------------+-------------------+------------+----------+-----------+
print("BCT data have been read and filtered")
print("Prepare data for merge")

In [None]:
# BCT data preparation: prepare the table of loans which contains also metadata of the book
DFloans_with_titles = DFloans_definitive_bct.join(DFmanifestations_definitive_bct, \
                                                  DFloans_definitive_bct.manifestation_id_new == DFmanifestations_definitive_bct.manifestation_id_new)\
                                             .select(DFloans_definitive_bct.manifestation_id_new, \
                                                     "patron_id_md5", \
                                                     "author", \
                                                     "ISBNISSN_new", \
                                                     lower(DFmanifestations_definitive_bct.title))\
                                             .withColumnRenamed("lower(title)", "title")
# DFloans_with_titles:
# +--------------------+--------------------+-------------+------------+---------+
# |manifestation_id_new|       patron_id_md5|       author|ISBNISSN_new|    title|
# +--------------------+--------------------+-------------+------------+---------+
# |              107930|b5c0986c79b1afafd...|Carlo Collodi|  8807820714|pinocchio|
# +--------------------+--------------------+-------------+------------+---------+

# BCT data preparation: split title and subtitle
RDDloans_with_titles = DFloans_with_titles.rdd    
RDDloans_for_merge = RDDloans_with_titles.map(title_and_subtitle)  
# DFloans_with_titles.show(1, False)
DFloans_to_merge = RDDloans_for_merge.toDF(["item_id", \
                                            "person_id", \
                                            "title", \
                                            "author", \
                                            "sub_title", \
                                            "item_review", \
                                            "total_review", \
                                            "average_rating", \
                                            "total_votes", \
                                            "isbn", \
                                            "data_type"])
# BCT data preparation: add missing columns, which are present in DFloans_definitive_anobii_genres
columns_to_add = ['total_wishlist',
                  'no_of_page',
                  'publication_date',
                  'publisher',
                  'binding',
                  'edition',
                  'product_type',
                  'encrypt_item_id',
                  'genre_with_votes',
                 ]
values_to_add = {'total_wishlist': None,
                  'no_of_page': None,
                  'publication_date': None, 
                  'publisher': None, 
                  'binding': "Paperback",
                  'edition': None,
                  'product_type': 1,
                  'encrypt_item_id': None,
                  'genre_with_votes': None,
    
}
for c in columns_to_add:
        DFloans_to_merge = DFloans_to_merge.withColumn(c, lit(values_to_add[c]))

DFloans_to_merge = DFloans_to_merge.select("item_id", "title", "sub_title", "isbn", "average_rating", "total_review", "total_wishlist", "no_of_page", "publication_date", "publisher", "binding", "edition", "product_type", "total_votes", "encrypt_item_id", "genre_with_votes", "data_type", "author", "person_id", "item_review")

#DFloans_to_merge = DFloans_to_merge.withColumn("genre_with_votes",DFloans_to_merge.genre_with_votes.cast(MapType(StringType(), StringType())))
#DFloans_definitive_anobii_genres.printSchema()
#DFloans_to_merge.printSchema()

# DFloans_to_merge:
# +-------+--------------------+---------+-----------+-------------+-----------+------------+--------------+-----------+----------+-----------+--------------+----------+----------------+---------+---------+-------+------------+---------------+-----+
# |item_id|           person_id|    title|author_name|    sub_title|item_review|total_review|average_rating|total_votes|      isbn|  data_type|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|encrypt_item_id|genre|
# +-------+--------------------+---------+-----------+-------------+-----------+------------+--------------+-----------+----------+-----------+--------------+----------+----------------+---------+---------+-------+------------+---------------+-----+
# | 107930|fcc17388ef20567fe...|pinocchio|       None|Carlo Collodi|          4|        None|          None|       None|8807820714|biblioteche|          null|      null|            null|     null|Paperback|   null|           1|           null| null|
# +-------+--------------------+---------+-----------+-------------+-----------+------------+--------------+-----------+----------+-----------+--------------+----------+----------------+---------+---------+-------+------------+---------------+-----+
print("Data have been prepared for merge")
print("Merge data")
# union of the loans of bct data and anobii data
DFloans_merged = DFloans_definitive_anobii_genres.union(DFloans_to_merge)
# DFloans_merged.show(1)
# DFloans_merged
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+
# |item_id|            title|sub_title|      isbn|  average_rating|total_review|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|   encrypt_item_id|              genres|data_type|    author|person_id|item_review|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+
# |1981748|giulietta squeenz|     null|8845260372|3.18773234200743|         129|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|014f24ec9629744c88|Family-Sex&Relati...|   anobii|Pulsatilla|   734393|          5|
# +-------+-----------------+---------+----------+----------------+------------+--------------+----------+----------------+---------+---------+-------+------------+-----------+------------------+--------------------+---------+----------+---------+-----------+
# Remove books, whose title is one word only, i.e. keep books whose title contains at least a blank space
# DFloans_merged.filter(DFloans_merged.title == "pinocchio").show()
DFloans_merged_nosinglewordtitles = DFloans_merged.filter(DFloans_merged.title.contains(" ")) 
# DFloans_merged_nosinglewordtitles.filter(DFloans_merged_nosinglewordtitles.title == "pinocchio").show()

# Assign to books with the same title, the same book_id
DFloans_merged_aggregated = DFloans_merged_nosinglewordtitles.select("*", \
                                                                     F.first(DFloans_merged_nosinglewordtitles.item_id)\
                                                                      .over(Window.partitionBy(DFloans_merged_nosinglewordtitles.title).orderBy(DFloans_merged_nosinglewordtitles.data_type))\
                                                                      .alias("book_id"))\
                                                              .drop("item_id")
# DFloans_merged_aggregated.show(1)
# DFloans_merged_aggregated:
# +--------------------+-------------------+----------+----------------+------------+--------------+----------+----------------+-------------------+---------+-------+------------+-----------+------------------+--------------------+---------+----------------+---------+-----------+-------+
# |               title|          sub_title|      isbn|  average_rating|total_review|total_wishlist|no_of_page|publication_date|          publisher|  binding|edition|product_type|total_votes|   encrypt_item_id|              genres|data_type|          author|person_id|item_review|book_id|
# +--------------------+-------------------+----------+----------------+------------+--------------+----------+----------------+-------------------+---------+-------+------------+-----------+------------------+--------------------+---------+----------------+---------+-----------+-------+
# |come mi batte for...|storia di mio padre|8806198882|4.14122681883024|         206|           270|       302|      2009-11-03|Einaudi (Frontiere)|Hardcover|      1|           1|        705|01500978c278d06718|Crime / Teens / N...|   anobii|Benedetta Tobagi|   767565|          3|2806310|
# +--------------------+-------------------+----------+----------------+------------+--------------+----------+----------------+-------------------+---------+-------+------------+-----------+------------------+--------------------+---------+----------------+---------+-----------+-------+

# Assign to books with the same title, the same new_author
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.min(DFloans_merged_aggregated.author)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_author"))

#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)

# Assign to books with the same title, the same new_genre
#DFloans_merged_aggregated.show(1, False)
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.first(DFloans_merged_aggregated.genre_with_votes)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id).orderBy(DFloans_merged_aggregated.data_type))\
                                                              .alias("new_genre"))

#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)

# Assign to books with the same title, the same new_encrypt
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.min(DFloans_merged_aggregated.encrypt_item_id)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_encrypt_item_id"))
#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# Assign to books with the same title, the same new_isbn
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.min(DFloans_merged_aggregated.isbn)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_isbn"))
#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# Update metrics of loans: new_total_count (to count the number of votes or loans) 
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.count("*")\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_total_count"))

#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# Update metrics of loans: new_average_rating 
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.avg(DFloans_merged_aggregated.item_review)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_average_rating"))

#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# Update metrics of loans: new_total_review, to count the actual number of review for a book (only anobii provides real review!!) 
DFloans_merged_aggregated = DFloans_merged_aggregated.select("*", \
                                                             F.max(DFloans_merged_aggregated.total_review)\
                                                              .over(Window.partitionBy(DFloans_merged_aggregated.book_id))\
                                                              .alias("new_total_review"))

#DFinfo = DFloans_merged_aggregated
#DFinfo.filter((DFinfo.item_review != 0) & (DFinfo.item_review != 1) & (DFinfo.item_review != 2) & (DFinfo.item_review != 3) & (DFinfo.item_review != 4) & (DFinfo.item_review != 5)).show(20, False)
# Remove "old" attributes
columnsToDrop = ['author',
                 'genre_with_votes',
                 'encrypt_item_id',
                 'isbn',
                 'total_count',
                 'average_rating',
                 'total_review',]
DFloans_merged_aggregated = DFloans_merged_aggregated.drop(*columnsToDrop)
# DFloans_merged_aggregated.show(1)
# DFloans_merged_aggregated:
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+-------------------+----------+---------------+------------------+----------------+
# |            title|sub_title|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|data_type|person_id|item_review|book_id|new_author|           new_genre|new_encrypt_item_id|  new_isbn|new_total_count|new_average_rating|new_total_review|
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+-------------------+----------+---------------+------------------+----------------+
# |giulietta squeenz|     null|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|   anobii|   959028|          3|1981748|Pulsatilla|Philosophy / Hist...| 014f24ec9629744c88|8845260372|            543|3.1860036832412524|             129|
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+-------------------+----------+---------------+------------------+----------------+

# Rename "new" attributes
columnsToRenames = ['new_author',
                    'new_genre',
                    'new_encrypt_item_id',
                    'new_isbn',
                    'new_total_count',
                    'new_average_rating',
                    'new_total_review',]
for c in columnsToRenames:
    DFloans_merged_aggregated = DFloans_merged_aggregated.withColumnRenamed(c, c.replace("new_", ""))
# DFloans_merged_aggregated.show(1)
# DFloans_merged_aggregated:
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+------------------+----------+-----------+------------------+------------+
# |            title|sub_title|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|data_type|person_id|item_review|book_id|    author|               genre|   encrypt_item_id|      isbn|total_count|    average_rating|total_review|
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+------------------+----------+-----------+------------------+------------+
# |giulietta squeenz|     null|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|   anobii|   273238|          4|1981748|Pulsatilla|Humor / Philosoph...|014f24ec9629744c88|8845260372|        543|3.1860036832412524|         129|
# +-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+---------+-----------+-------+----------+--------------------+------------------+----------+-----------+------------------+------------+

# Remove ['book_id', 'person_id'] duplicates
DFloans_merged_aggregated = DFloans_merged_aggregated.dropDuplicates(['book_id', 'person_id'])
print("Data has been merged")
# before write data on file, create index for both users and books of the sparce matrix
#RDDmerged_almost_definitive = DFloans_merged_aggregated.rdd

# functions which creates the dictionary where the key is the book_id/person_id and the value is the corresponding index
#book_dictionary = create_book_dictionary(RDDmerged_almost_definitive)
#user_dictionary = create_user_dictionary(RDDmerged_almost_definitive)

# add the user_index and the book_index in the dataframe
#RDDdefinitive = RDDmerged_almost_definitive.map(addRowColumnId)
if False:
    DFloans_merged_aggregated = RDDmerged_almost_definitive.toDF(['book_id',
                                                    'title',
                                                    'sub_title',
                                                    'total_wishlist',
                                                    'no_of_page',
                                                    'publication_date',
                                                    'publisher',
                                                    'binding',
                                                    'edition',
                                                    'product_type',
                                                    'total_votes',
                                                    'data_type',
                                                    'person_id',
                                                    'item_review',
                                                    'author',
                                                    'genre',
                                                    'encrypt_item_id',
                                                    'isbn',
                                                    'total_count',
                                                    'average_rating',
                                                    'total_review'] , sampleRatio=0.9)
                                                    #'user_index', 
                                                    #'book_index'], sampleRatio=0.9)

In [None]:
#CREATION RATINGS CSV FILES CELL

ratings = DFloans_merged_aggregated.filter(~(DFloans_merged_aggregated.genre.isNull()))

# in the rating table, each row is given by "person_id", "book_id", "rating" attributes
ratings_tocsv = ratings.select(["person_id", "book_id", "item_review"])\
                       .withColumnRenamed("item_review", "rating")
# ratings_tocsv.show(1)
# ratings_tocsv:
# +---------+-------+------+
# |person_id|book_id|rating|
# +---------+-------+------+
# |   100489|1981748|     3|
# +---------+-------+------+
print('Creating CSV files of ratings...')
print("There are "+str(ratings_tocsv.count())+" interactions")
ratings_tocsv.toPandas().to_csv("2_explicit_ratings.csv")
ratings_tocsv = ratings_tocsv.drop("rating")
print("There are "+str(ratings_tocsv.count())+" interactions")
ratings_tocsv.toPandas().to_csv("2_implicit_ratings.csv")
# in the book table, the metadata about the book are reported

In [None]:
#CREATION USERS CSV FILES CELL (+ DESCRIPTIONS HANDLING)

# +-------+-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+----------+--------------------+------------------+----------+-----------+------------------+------------+----------+
# |book_id|            title|sub_title|total_wishlist|no_of_page|publication_date|publisher|  binding|edition|product_type|total_votes|data_type|    author|               genre|   encrypt_item_id|      isbn|total_count|    average_rating|total_review|book_index|
# +-------+-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+----------+--------------------+------------------+----------+-----------+------------------+------------+----------+
# |1981748|giulietta squeenz|     null|           110|       209|      2008-05-01| Bompiani|Paperback|   null|           1|        543|   anobii|Pulsatilla|Family-Sex&Relati...|014f24ec9629744c88|8845260372|        543|3.1860036832412524|         129|       319|
# +-------+-----------------+---------+--------------+----------+----------------+---------+---------+-------+------------+-----------+---------+----------+--------------------+------------------+----------+-----------+------------------+------------+----------+
books_tocsv = ratings.dropDuplicates(['book_id'])\
                     .drop("person_id")\
                     #.drop("user_index")\
                     #.drop("item_review")

#print(books_tocsv.count())

file = "anobii_genres/content.csv"
DFdescriptions = spark.read.csv(file, header=True)
#DFdescriptions.show(1, False)
DFdescriptions = DFdescriptions.dropDuplicates(["item_id"])
#DFdescriptions.filter(F.size(F.split('content', ' ')) <= 5).show(20, False)
DFdescriptions = DFdescriptions.filter(F.size(F.split('content', ' ')) >= 50)


#DFdescriptions.filter(DFdescriptions.item_id=="2015441").show(2, False)
print("There are "+str(books_tocsv.count())+" books")
#books_tocsv = books_tocsv.join(DFdescriptions, DFdescriptions.item_id == books_tocsv.book_id).select("book_id", "title", "sub_title", "total_wishlist", "no_of_page", "publication_date", "publisher", "binding", "edition", "product_type", "total_votes", "data_type", "author", "genre", "encrypt_item_id", "isbn", "total_count", "average_rating", "total_review", "book_index", "content")
books_tocsv = books_tocsv.join(DFdescriptions, DFdescriptions.item_id == books_tocsv.book_id).select("book_id", "title", "sub_title", "total_wishlist", "no_of_page", "publication_date", "publisher", "binding", "edition", "product_type", "total_votes", "data_type", "author", "genre", "encrypt_item_id", "isbn", "total_count", "average_rating", "total_review", "content")

books_tocsv = books_tocsv.withColumn("content2", books_tocsv["content"])
print(books_tocsv.select("book_id").distinct().count())

RDDbooks_tocsv = books_tocsv.rdd
RDDbooks_tocsv = RDDbooks_tocsv.map(lambda x: (x.book_id, x))
RDDreducedbooks = RDDbooks_tocsv.reduceByKey(concatenateContentsDescriptions)
RDDreducedbooks = RDDreducedbooks.map(toSingleDescriptions)
books_tocsv = RDDreducedbooks.toDF([])
#print(books_tocsv.count())

#Joiniamo DFreviews a books_tocsv (la cardinalità aumenterà ma con una funzione di riduzione con gli RDD dovremmo riportarla alla normalità)
#books_tocsv = books_tocsv.join(DFreviews, books_tocsv.book_id == DFreviews.item_id, "left").select(books_tocsv.book_id, "title", "sub_title", "total_wishlist", "no_of_page", "publication_date", "publisher", "binding", "edition", "product_type", "total_votes", "data_type", "author", "genre", "encrypt_item_id", "isbn", "total_count", "average_rating", "total_review", "book_index", "content", "comment_content", "content2")

books_tocsv = books_tocsv.join(DFreviews, books_tocsv.book_id == DFreviews.item_id, "left").select(books_tocsv.book_id, "title", "sub_title", "total_wishlist", "no_of_page", "publication_date", "publisher", "binding", "edition", "product_type", "total_votes", "data_type", "author", "genre", "encrypt_item_id", "isbn", "total_count", "average_rating", "total_review", "content", "comment_content", "content2")

#Fatto ciò,  trasformiamo books_tocsv in un rdd per poter applicare la funzione di concatenazione
#print(books_tocsv.select("book_id").distinct().count())

RDDbooks = books_tocsv.rdd
RDDbooks = RDDbooks.map(lambda x: (x.book_id, x))
RDDreducedbooks = RDDbooks.reduceByKey(concatenateContentsComments)
RDDreducedbooks = RDDreducedbooks.map(toSingleComments)
RDDreducedbooks = RDDreducedbooks.map(mergeDescriptionComments)
books_tocsv = RDDreducedbooks.toDF([])

#print(books_tocsv.count())

# in the user table, the person_id is reported for each user (in each row)
users_tocsv = ratings.select("person_id").dropDuplicates()

# users_tocsv:
# +---------+----------+
# |person_id|user_index|
# +---------+----------+
# |  1224803|     10623|
# +---------+----------+
print("There are "+str(users_tocsv.count())+" users")
users_tocsv.toPandas().to_csv("2_users.csv")

print("Csv users files have been written")

In [1]:
#UNUSED: LEXICON LIWC BASED SENTIMENT ANALYSIS (MAYBE IN THE FUTURE..?)

#BLOCCO SENTIMENT ANALYSIS --- LIWC 
#Usando un gestore di file .dic cerco di estrarre tramite una funzione delle etichette per il testo
#import liwc

books_pandas = books_tocsv.toPandas()
books_pandas = books_pandas.dropna(subset=['content2'])

content_list = books_pandas['content2'].to_list()   
parse, category_names = liwc.load_token_parser('Italian_LIWC2007_Dictionary.dic')

import re
from collections import Counter
from nltk.tokenize import word_tokenize

def tokenize(text):
    # you may want to use a smarter tokenizer
    tokenizer = word_tokenize(text)
    return tokenizer

useful_keywords = ['Parolac', 'Sesso', 'Morte', 'Emo_Pos', 'Emo_Neg', 'Ansia', 'Rabbia', 'Tristez', 'Sentim', 'Svago', 'Sen_Pos', 'Sen_Neg']

def manage_liwc(counts):
    output_labels_sentim = {'ansia': 0, 'rabbia': 0, 'tristezza': 0, 'divertimento': 0}
    adult_themes_flag = False
    output_labels_mood = {'negativo': 0, 'positivo': 0}
    for key in counts:
        #BAD VIBES
        if key == 'Emo_Neg' or key == 'Sen_Neg':
            output_labels_mood['negativo'] += counts[key]
            if False:
                if 'Ansia' in gettysburg_counts.keys():
                    output_labels_sentim['ansia'] += counts[key]
                if 'Rabbia' in gettysburg_counts.keys():
                    output_labels_sentim['rabbia'] += counts[key]
                if 'Tristez' in gettysburg_counts.keys():
                    output_labels_sentim['tristezza'] += counts[key]
        if key == 'Morte':
            output_labels_mood['negativo'] += counts[key]
            adult_themes_flag = True
            #output_labels_sentim['strong_count'] += gettysburg_counts[key]
            if False:
                if 'Ansia' in counts.keys():
                    output_labels_sentim['ansia'] += counts[key]
                if 'Rabbia' in gettysburg_counts.keys():
                    output_labels_sentim['rabbia'] += counts[key]
                if 'Tristez' in gettysburg_counts.keys():
                    output_labels_sentim['tristezza'] += counts[key]

        #ADULT THEMES
        if key == 'Parolac':
            #output_labels_sentim['strong_count'] += gettysburg_counts[key]
            adult_themes_flag = True
        if key == 'Sesso':
            #output_labels_sentim['strong_count'] += gettysburg_counts[key]
            adult_themes_flag = True
            #output_labels_sentim['love_count'] += gettysburg_counts[key]
        if key == 'Ansia':
            output_labels_mood['negativo'] += counts[key]
            output_labels_sentim['ansia'] += counts[key]
        if key == 'Rabbia':
            output_labels_mood['negativo'] += counts[key]
            output_labels_sentim['rabbia'] += counts[key]
        if key == 'Tristez':
            output_labels_mood['negativo'] += counts[key]
            output_labels_sentim['tristezza'] += counts[key]

        #GOOD VIBES#
        if key == 'Emo_Pos' or key == 'Sen_Pos':
            output_labels_mood['positivo'] += counts[key]
            if False:
                if 'Svago' in counts.keys():
                    output_labels_sentim['divertimento'] += counts[key]
                if 'Affett' in counts.keys():
                    output_labels_sentim['amore'] += counts[key]
                if 'Social' in counts.keys():
                    output_labels_sentim['socialità'] += counts[key]
        if key == 'Svago':
            output_labels_mood['positivo'] += counts[key]
            output_labels_sentim['divertimento'] += counts[key]
        if key == 'Affett':
            output_labels_mood['positivo'] += counts[key]
            output_labels_sentim['amore'] += counts[key]
        if key == 'Social':
            output_labels_mood['positivo'] += counts[key]
            output_labels_sentim['socialità'] += counts[key]

        #SENTIMENTS
        if key == 'Sentim':
            if 'Ansia' in counts.keys():
                output_labels_sentim['ansia'] += counts[key]
            if 'Rabbia' in counts.keys():
                output_labels_sentim['rabbia'] += counts[key]
            if 'Tristez' in counts.keys():
                output_labels_sentim['tristezza'] += counts[key]
            if 'Svago' in counts.keys():
                output_labels_sentim['divertimento'] += counts[key]
            if 'Affett' in counts.keys():
                output_labels_sentim['amore'] += counts[key]
            if 'Social' in counts.keys():
                output_labels_sentim['socialità'] += counts[key]
    
    max_value_sentim = max(output_labels_sentim.values())
    max_value_mood = max(output_labels_mood.values())
    if adult_themes_flag == True:
        if max_value_sentim != 0 and max_value_mood != 0:
            return (max(output_labels_sentim, key=output_labels_sentim.get), max(output_labels_mood, key=output_labels_mood.get), True)
        elif max_value_sentim == 0 and max_value_mood != 0:
            return (None, max(output_labels_mood, key=output_labels_mood.get), True)
        elif max_value_sentim != 0 and max_value_mood == 0:
            return (max(output_labels_sentim, key=output_labels_sentim.get), None, True)
        else:
            return (None, None, True)
    if max_value_sentim != 0 and max_value_mood != 0:
            return (max(output_labels_sentim, key=output_labels_sentim.get), max(output_labels_mood, key=output_labels_mood.get), False)
    elif max_value_sentim == 0 and max_value_mood != 0:
        return (None, max(output_labels_mood, key=output_labels_mood.get), False)
    elif max_value_sentim != 0 and max_value_mood == 0:
        return (max(output_labels_sentim, key=output_labels_sentim.get), None, False)
    else:
        return (None, None, False)
        
output_list = []

for index, text in enumerate(content_list):
    #print(index)
    lexicon_dict = Counter(category for token in tokenize(text) for category in parse(token) if category in useful_keywords)
    output = manage_liwc(lexicon_dict)
    output_list.append(output)

#print(len(output_list))
books_pandas['liwc_labels'] = output_list

In [None]:
#SAVE BOOKS CSV FILE
books_pandas = books_tocsv.toPandas()
books_pandas.to_csv("drive_books_filtered.csv")
print('CSV books file was created')