In [50]:
"""
!pip install nltk
!pip install pandas
!pip install reportlab
!pip install openai
!pip install matplotlib
"""


'\n!pip install nltk\n!pip install pandas\n!pip install reportlab\n!pip install openai\n!pip install matplotlib\n'

# Imports

In [1]:
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfgen import canvas
from reportlab.platypus import Paragraph
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.styles import ParagraphStyle

import nltk
from nltk.stem import WordNetLemmatizer

# General
from string import digits
import re
from typing import List
import logging
import os
import time

import pandas as pd
import matplotlib.pyplot as plt
from pandas.core.series import Series

# Replace YOUR_API_KEY with your OpenAI API key
import openai
openai.api_key = "YOUR_API_KEY"

In [2]:
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\jojoshulk\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\jojoshulk\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Generate pdf function

In [3]:
def generate_pdf(product: str,
                 review: str,
                 review_positive: str,
                 review_negative: str,
                 product_image: str,
                 output_path: str,
                 average_rating: float,
                 rating_frequencies: str,
                 rating_history: str):
    """
    Function that generates a product review pdf
    Args:
        product:
        review:
        product_image:
        output_path:
        average_rating:
        rating_frequencies:
        rating_history:

    Returns:

    """
    # Register the font with reportlab
    pdfmetrics.registerFont(TTFont('Arial', 'Arial.ttf'))
    pdfmetrics.registerFont(TTFont('VeraBd', 'VeraBd.ttf'))

    # Create a PDF canvas
    c = canvas.Canvas(f"{output_path}", pagesize=A4)
    styles = getSampleStyleSheet()

    # Draw the product image
    if product_image:
        c.drawImage(product_image, 50, 410, height=350, width=200, preserveAspectRatio=True)
    else:
        c.setFont("VeraBd", 20)
        c.drawCentredString(150, 600, f"Product Image")
        c.drawCentredString(150, 570, f"not provided.")

    # Set the font for the product name
    c.setFont("VeraBd", 30)
    x = c._pagesize[0] / 2
    c.drawCentredString(x, 790, f"{product}")


    # --- Review text box --- #
    # Set the font for the review
    c.setFont("Arial", 30)

    # Draw a rectangle for the review text box
    style = ParagraphStyle(name='Normal_1', fontSize=13)
    c.rect(290, 590, 270, 160, stroke=0, fill=0)
    p = Paragraph(review, style=style)
    w, h = p.wrap(250, 0)
    p.drawOn(c, 300, 750 - h)


    # --- Average Rating --- #
    # Set the font for the product name
    c.setFont("VeraBd", 20)
    x = 300 + 250 /2
    c.drawCentredString(x, 550, f"Rating: {average_rating:.2f}/5")

    # Draw the average rating image
    c.drawImage(rating_frequencies, 290, 360, width=260,preserveAspectRatio=True)

    # Draw the rating history image
    c.drawImage(rating_history, 100, 10, width=400, height=200, preserveAspectRatio=True)

    # Draw a rectangle for the review text box
    style = ParagraphStyle(name='Normal_1', fontSize=12)
    # c.rect(20, 200, 250, 160, stroke=1, fill=0)
    p = Paragraph(review_negative.replace("\n", "<br/>"), style=style)
    w, h = p.wrap(230, 0)
    p.drawOn(c, 30, 370 - h)

    # Draw a rectangle for the review text box
    style = ParagraphStyle(name='Normal_1', fontSize=12)
    # c.rect(310, 200, 250, 160, stroke=1, fill=0)
    p = Paragraph(review_positive.replace("\n", "<br/>"), style=style)
    w, h = p.wrap(230, 0)
    p.drawOn(c, 320, 370 - h)



    # Set the font for the Negatives title
    c.setFillColorRGB(0.91,0.15,0.16)
    c.setFont("VeraBd", 20)
    c.drawCentredString(150, 370, f"Negatives")

    # Set the font for the positives title
    c.setFillColorRGB(0.05,0.56,0.06)
    c.setFont("VeraBd", 20)
    c.drawCentredString(430, 370, f"Positives")



    # Save the PDF
    c.save()



# Plot generator

In [4]:
class PlotGenerator:
    """class that generates and saves plots given a csv containing reviews"""

    RATING_FREQUENCY: str = "rating_frequencies.png"
    RATING_HISTORY: str= "rating_history.png"


    def __init__(self, filename: str):
        self.filename = filename
        self.df = self.read_csv(filename)
        self.logger = self._setup_logger()

    def _setup_logger(self):
        """Setup up logger"""

        # Create logger
        logger = logging.getLogger(self.__class__.__name__)
        logger.setLevel(logging.INFO)

        if not logger.handlers:
            # Create console handler and set level to debug
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)

            # Create formatter
            formatter = logging.Formatter('[%(asctime)s] %(levelname)s [%(name)s] - %(message)s')

            # Add formatter to ch
            ch.setFormatter(formatter)

            # Add ch to logger
            logger.addHandler(ch)

        return logger

    @staticmethod
    def read_csv(filename: str) -> pd.DataFrame:
        """Read a csv file and returns the output in a Dataframe"""

        df = pd.read_csv(filename)
        df.sort_values(by='date', inplace=True)
        df.reset_index(inplace=True, drop=True)
        return df

    def generate_plots(self) -> None:
        self._plot_rating_frequencies(output_path=self.RATING_FREQUENCY)
        self._plot_mean_ratings(output_path=self.RATING_HISTORY)
        self.logger.info("Plots generated.")


    def _plot_rating_frequencies(self, output_path: str) -> None:
        """Plot a bar chart of the frequencies of the ratings in a pandas DataFrame."""

        self.logger.info("Generating rating frequencies png file.")
        # Count the frequencies of the ratings
        ratings = self.df['rating'].value_counts()

        # Get the labels (ratings) and values (frequencies)
        labels = ratings.index
        values = ratings.values

        # Convert the frequencies to percentages
        total_ratings = sum(values)
        values = [value / total_ratings * 100 for value in values]

        # Create the bar plot
        plt.barh(labels, values, height=0.6, color='#FF9900', edgecolor='none')

        # Add labels to the bars
        for index, value in enumerate(values):
            plt.text(value+3, labels[index], f"{value:.0f}" + '%', va='center',  fontsize=20)

        # Set the y-axis labels
        plt.yticks(labels, [str(label) + ' stars'
                            if str(label) != "1" else str(label) + ' star'
                            for label in labels],  fontsize=20)
        plt.box(False)
        plt.xticks([])

        # Save the plot as a PDF file
        plt.savefig(output_path, format='png', bbox_inches = 'tight')
        plt.close()

    def _plot_mean_ratings(self, output_path: str) -> None:
        """
        Plot a line chart of the mean ratings over the years in a pandas DataFrame.
        """

        self.logger.info("Generating rating history png file.")
        df = self.df.copy()
        # Convert the date column to datetime
        df['date'] = pd.to_datetime(df['date'])

        # Convert the date column to a period with the frequency "Q" (quarters)
        df['period'] = df['date'].dt.to_period('Q')

        # Group the ratings by year and quarter and calculate the mean rating for each year and quarter
        mean_ratings = df.groupby('period')['rating'].mean()

        # Group the reviews by year and quarter and count the number of reviews with ratings between 1 and 2
        low_ratings = df[(df['rating'] >= 1) & (df['rating'] <= 2)].groupby('period')['rating'].count()/df.groupby('period')['rating'].count()

        # Get the years and quarters and mean ratings and number of reviews with ratings between 1 and 2
        periods = mean_ratings.index
        mean_ratings = mean_ratings.values
        low_ratings = low_ratings.values

        # Convert the periods to strings in the format "YYYY-QQ"
        labels = [period.strftime('%Y-%m') for period in periods]

        # Create the figure and axis with twin y-axis
        fig, ax = plt.subplots()
        ax2 = ax.twinx()

        # Create the line plots
        ax.plot(labels, mean_ratings, color='#FF9900', linewidth=4)
        ax2.bar(labels, low_ratings, color='#1f77b4', linewidth=2, alpha=0.8)

        # Rotate the x-axis labels and start the y-axis from 0
        ax.set_xticklabels(labels, rotation=45)
        ax.set_ylim(0, 5,2)

        # Add axis labels and title
        ax.set_xlabel('Year and quarter',  fontsize=20)
        ax.set_ylabel('Mean rating',  fontsize=20, color='#FF9900')
        ax2.set_ylabel('Negative vs Total \nRatings',  fontsize=20, color='#1f77b4')

        # Save the plot as a PDF file
        _ = plt.savefig(output_path, format='png', bbox_inches = 'tight')
        plt.close()

# pre-processing function

In [5]:
"""
Pre-processes the text, using nltk, in order to use in language models
"""
def preprocess(sentence):
    """
    Function to process a given sentence.
    INPUT: raw string (tweet)
    OUTPUT: processed string
    """
    # initialize lemmatizer
    stemmer = WordNetLemmatizer()

    # define trans for digits
    remove_digits = str.maketrans('', '', digits)

    #convert the sentence to lower
    sentence = sentence.lower()

    # Remove underscores
    sentence = re.sub(r'[-_]', '', sentence)
    sentence = re.sub(r'_[A-Za-z0-9]+', ' ', sentence)
    sentence = re.sub(r'^ _\s+', ' ', sentence)

    # Remove websites
    sentence = re.sub('https?://[A-Za-z0-9./]+', ' ', sentence)

    #remove all non words
    sentence = re.sub(r'\W', ' ', sentence)

    #remove all single characters
    sentence = re.sub(r'\b\w\b', ' ', sentence)

    # Remove numbers
    sentence = re.sub(r'[0-9]', ' ', sentence)
    sentence = sentence.translate(remove_digits)

    #remove multiple whitespaces
    sentence = re.sub(' +', ' ', sentence)

    # Split the sentence based on whitespaces (--> List of words)
    sentence = sentence.split()

    # # Lemmatization
    # sentence = [stemmer.lemmatize(word) for word in sentence]

    # Reconstruct the sentence by joining the words on each whitespace
    sentence = ' '.join(sentence)
    return sentence

# OpenAI summary class

In [46]:
class OpenAISummary:
    """Class that uses openAI API to generate product summaries, using all, negative and positive reviews"""


    def __init__(self, all_reviews: Series,
                 negative_reviews: Series,
                 positive_reviews: Series):
        self.all_reviews = all_reviews
        self.negative_reviews = negative_reviews
        self.positive_reviews = positive_reviews
        self.logger = self._setup_logger()
        self.max_length: int = 3400


    def _setup_logger(self):
        """Setup up logger"""

        # Create logger
        logger = logging.getLogger(self.__class__.__name__)
        logger.setLevel(logging.INFO)

        if not logger.handlers:
            # Create console handler and set level to debug
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)

            # Create formatter
            formatter = logging.Formatter('[%(asctime)s] %(levelname)s [%(name)s] - %(message)s')

            # Add formatter to ch
            ch.setFormatter(formatter)

            # Add ch to logger
            logger.addHandler(ch)

        return logger


    def make_tokens_list(self, reviews: Series,
                         shuffling: bool = True) ->  List[str]:
        """Splits reviews (Series) into strings with number of tokens (words) equals max_length.
        """

        self.logger.info("Making token list.")
        token_list: List[str] = []
        token = ""

        # shuffle reviews
        if shuffling:
            self.logger.info("shuffling Reviews.")
            reviews = reviews.sample(frac=1)

        # find review length
        for review in reviews:
            review_length: int = len(re.findall(r'\w+', review))
            token_length: int = len(re.findall(r'\w+', token))


            # add string
            if review_length + token_length <= self.max_length and review_length >= 2:
                token += review

            # append review
            elif review_length + token_length > self.max_length:
                token_list.append(token)
                token = review

        if not token_list:
            token_list.append(token)
        self.logger.info(f"Made token list. Number of elements in the list is: {len(token_list)+1}")

        return token_list


    def summarize_text(self, text: str, what: str, max_length: int) -> str:
        """
        Function that uses open AI completion API and returns the text response

        Args:
            text: text to feed the API
            what: that to do with the text (eg. summarize these reviews)
            max_length: maximum response length
        """
        if openai.api_key == "YOUR_API_KEY":
            self.logger.error(f"Invalid API KEY!!!")
            raise ValueError("Please use your OpenAI API key found in the first cell. The current value is 'YOUR_API_KEY'.")

        # Use the OpenAI API to summarize the text
        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=f"{what} {text}",
            temperature=0.5,
            max_tokens=100,
            top_p=1,
            frequency_penalty=1,
            presence_penalty=1
        )
        api_response = response["choices"][0]["text"]

        return api_response


    def _generate_interim_summary(self, reviews: Series,
                                  max_length: int = 100,
                                  shuffling: bool = False) -> str:
        """Generate a max_length word, all reviews summary"""

        summary_interim: str = ""

        # only keep first 10 packs of tokens
        for i, token in enumerate(self.make_tokens_list(reviews, shuffling=shuffling)[:40]):
            self.logger.info(f"Sending API request #{i}")

            summary_interim += self.summarize_text(what=f"summarize these reviews in less than {max_length} words", text=token, max_length=max_length)
            time.sleep(1)

        self.logger.info(f"Generated Interim summary")
        return summary_interim

    def generate_summary(self, max_length: int = 100):
        """Generates a summary of all reviews"""

        self.logger.info(f"Generating all reviews summary.")
        summary_interim: str = self._generate_interim_summary(self.all_reviews)

        summary: str = self.summarize_text(what=f"Make a product review in less than {max_length} words about this product:", text=summary_interim, max_length=max_length)

        self.logger.info(f"Generated Summary")
        return summary


    def generate_positive(self, max_length: int = 100):
        """Extracts positive information"""
        self.logger.info(f"Generating positive reviews bullets")

        summary_interim: str = self._generate_interim_summary(self.positive_reviews)
        summary: str = self.summarize_text(what=f"make 3 bullets of good things about this product:", text=summary_interim, max_length=max_length)

        self.logger.info(f"Generated Positive Summary")
        return summary


    def generate_negative(self, max_length: int = 100):
        """Extracts negative information"""
        self.logger.info(f"Generating negative reviews bullets.")
        summary_interim: str = self._generate_interim_summary(self.negative_reviews, shuffling=True)

        summary: str = self.summarize_text(what=f"make 3 bullets of bad things about this product:", text=summary_interim, max_length=max_length)

        self.logger.info(f"Generated Negative Summary")
        return summary


# Summarize Function

In [51]:
def summarize(csv_path: str,
              png_path: str = None
              ) -> None:
    """
    Creates a one page pdf containing a summary review of the product

    Args:
        csv_path: path of the csv file. Should have the format: amazon_reviews_{product_name}.csv
        png_path: path of the product image from amazon
    """

    # save plots
    plot_generator = PlotGenerator(csv_path)
    plot_generator.generate_plots()

    # pre-process text
    plot_generator.df["text"]: pd.DataFrame = plot_generator.df["text"].apply(lambda row: preprocess(row) if isinstance(row, str) else '')

    # split into all reviews, positive reviews, negative reviews
    df = plot_generator.df
    all_reviews: Series = df["text"]
    negative_reviews: Series = df.loc[df.rating.apply(lambda x: str(x)).isin(["1","2"])]["text"]
    positive_reviews: Series = df.loc[df.rating.apply(lambda x: str(x)).isin(["4","5"])]["text"]


    # Get openAI responses
    summarizer = OpenAISummary(all_reviews=all_reviews,
                               negative_reviews=negative_reviews,
                               positive_reviews=positive_reviews)

    review_text = summarizer.generate_summary()
    review_positive = summarizer.generate_positive()
    review_negative = summarizer.generate_negative()



    product: str = csv_path.replace(".csv", "")\
        .split("amazon_reviews_")[-1]\
        .replace("_", " ")\
        .capitalize()
    output: str = f"./{product.replace(' ', '_')}_summary.pdf"

    print("Generating pdf report.")
    generate_pdf(product=product,
                review=review_text.strip().capitalize(),
                 review_positive=review_positive,
                 review_negative=review_negative,
                 product_image=png_path,
                 output_path=output,
                 average_rating=df['rating'].mean(),
                 rating_frequencies=r"./rating_frequencies.png",
                 rating_history=r"./rating_history.png")


    # delete plot files
    print("Deleting png files.")
    os.remove(r"./rating_frequencies.png")
    os.remove(r"./rating_history.png")



# Execution cell

In [49]:
csv_path: str = r"./backup/amazon_reviews_Vans_Ward_Sneaker.csv"
png_path : str = r"./Vans_Ward_Sneaker.png"

summarize(csv_path=csv_path,
          png_path=png_path)


[2022-12-31 19:08:22,182] INFO [PlotGenerator] - Generating rating frequencies png file.
[2022-12-31 19:08:22,298] INFO [PlotGenerator] - Generating rating history png file.
[2022-12-31 19:08:22,799] INFO [PlotGenerator] - Plots generated.
[2022-12-31 19:08:22,835] INFO [OpenAISummary] - Generating all reviews summary.
[2022-12-31 19:08:22,836] INFO [OpenAISummary] - Making token list.
[2022-12-31 19:08:23,469] INFO [OpenAISummary] - Made token list. Number of elements in the list is: 4
[2022-12-31 19:08:23,471] INFO [OpenAISummary] - Sending API request #0
[2022-12-31 19:08:28,523] INFO [OpenAISummary] - Sending API request #1
[2022-12-31 19:08:33,720] INFO [OpenAISummary] - Sending API request #2
[2022-12-31 19:08:38,690] INFO [OpenAISummary] - Generated Interim summary
[2022-12-31 19:08:40,267] INFO [OpenAISummary] - Generated Summary
[2022-12-31 19:08:40,268] INFO [OpenAISummary] - Generating positive reviews bullets
[2022-12-31 19:08:40,269] INFO [OpenAISummary] - Making token lis

Generating pdf report.
Deleting png files.
