In [1]:
from urllib.error import HTTPError
from urllib.request import urlopen
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.document_transformers import Html2TextTransformer
from langchain.text_splitter import TokenTextSplitter
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.tools.edenai import EdenAiTextToSpeechTool
from dotenv import load_dotenv
import tiktoken
import requests
import os
import re
import cProfile
from pympler import asizeof

In [2]:
load_dotenv()

True

In [3]:
# url = "https://www.fass.se/LIF/product?userType=2&nplId=20141129000039"
# url = "https://www.apoteket.se/halsotjanster/"
url = "https://www.aftonbladet.se/nyheter/a/Rr77qd/aftonbladet-direkt?pinnedEntry=1215281"

In [4]:
def get_cost(nr_input_tokens: int, nr_output_tokens: int):
    """A utility function to check the price of a request"""
    
    price_one_input_token = 0.0005/1000
    price_one_output_token = 0.0015/1000
    price_input_tokens = nr_input_tokens * price_one_input_token
    price_output_tokens = nr_output_tokens * price_one_output_token
    total_price_us_dollars = price_input_tokens + price_output_tokens 
    total_price_kr = total_price_us_dollars * 10.48
    
    return f"{round(total_price_kr, 2)}kr"

In [5]:
# get_cost(12289, 4096)

In [6]:
class TextToSpeach:
    __slots__ = ("url", "model", "plain_text", "summary")
    def __init__(self, url: str):
        """The constructor takes an url, checks if it's valid, if so, and if status code is 200, it sets self.url to url. Otherwise, it raises an exception"""
        
        status_code = self.check_url(url)
        if status_code == 200:
            self.url = url
        elif status_code == 404:
            raise ValueError(f"The status code was {status_code}, which indicates that while the server itself is reachable, the specific page is not.")
        else:
            raise ValueError(f"The status code was {status_code}.")

        self.model = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
        self.plain_text = None
        self.summary = None

    @staticmethod
    def check_url(url: str):
        """Checks the status code of the given url when a request is sent"""
        try:
            connection = urlopen(url)
            status_code = connection.getcode()
            return status_code
        except HTTPError:
            return 404
        
    @staticmethod
    def count_tokens(text: str) -> int:
        max_nr_of_tokens_for_model_io_combined = 16385
        max_output = 4096
        
        encoder = tiktoken.get_encoding("cl100k_base")
        nr_of_tokens = len(encoder.encode(text))
        
        difference = max_nr_of_tokens_for_model_io_combined - max_output
        
        if nr_of_tokens <= difference:
            return True
        else:
            return False

    def set_text_content(self):
        """Collects the plain text from a website"""
        
        # Create an instance of WebBaseLoader and load the html
        loader = WebBaseLoader(self.url)
        data = loader.load()

        # Create an instance of Html2TextTransformer, transform the html to plain text and extract the plain text
        transformer = Html2TextTransformer()
        docs_transformed = transformer.transform_documents(data)
        plain_text = docs_transformed[0].page_content
        
        # If nr of tokens is more than max nr of tokens - max nr of output tokens, then the text will be split
        if self.count_tokens(plain_text):
            self.plain_text = plain_text
        else:
            text_splitter = TokenTextSplitter(chunk_size=5000, chunk_overlap=10)
            texts = text_splitter.split_text(plain_text)
            self.plain_text = texts

    def set_text_summary(self):
        """Summarizes the text and sets self.summary"""

        # System and user messages
        system_message = "You are a very powerful assistant"
        human_message = "Summarize the text between backticks\n```{}````\nThe summary is max 100 words"

        # This variable will contain parts of the original text if the original text had to be split
        summaries = []

        # If self.plain_text is a list
        if type(self.plain_text) == list:
            # Iterate through the list and get a summary for each text and append to summaries
            for text in self.plain_text:
                messages = [SystemMessage(content=system_message), HumanMessage(content=human_message.format(text))]
                response = self.model.invoke(messages)
                summaries.append(response.content)

            # Join the texts in a string and (if nr of tokens is acceptable) get a new summary on the whole, now much shorter text
            summaries_str = ": ".join(summaries)

            if self.count_tokens(summaries_str):
                messages = [SystemMessage(content=system_message), HumanMessage(content=human_message.format(summaries_str))]
                summary = self.model.invoke(messages)
                self.summary = summary.content
            else:
                raise ValueError(f"For now, this program cannot handle that many tokens. Read about the gpt-3.5-turbo-0125 for info")

        # if self.plain_text is a string
        else:
            messages = [SystemMessage(content=system_message), HumanMessage(content=human_message.format(self.plain_text))]
            summary = self.model.invoke(messages)
            self.summary = summary.content
            
        return self.summary

    @staticmethod
    def play(url: str):
        """Autoplay to be implemented"""
        pass

    def run(self):
        """Run the program"""
        
        # Run key methods
        self.set_text_content()
        self.set_text_summary()

        # Create messages
        system_message = ("system", f"You are a very powerful assistant")
        user_message = ("user", "Turn this text into speach: {text}")

        # Create an instance of EdenAiTextToSpeachTool (the tool that the agent will use)
        tts = EdenAiTextToSpeechTool(providers=["amazon"], language="en", voice="MALE", edenai_api_key=os.getenv("EDENAI_API_KEY"))
        tools = [tts]
        
        # Create the prompt
        prompt = ChatPromptTemplate.from_messages([system_message, user_message, MessagesPlaceholder(variable_name="agent_scratchpad")])
        
        # Create the agent
        agent = create_openai_functions_agent(llm=self.model, tools=tools, prompt=prompt)
        
        # Create an agent executor
        agent_executor = AgentExecutor(agent=agent, tools=tools)

        # Run the agent with the summary
        result = agent_executor.invoke({"text": self.summary})
        
        # Get the url where the soundfile is automatically downloaded
        pattern = r"(https.+)"
        result = re.search(pattern, result["output"])
        
        url = result.group(0).replace(")", "")
        
        response = requests.get(url).content
        
        with open(f"sound_files/output.mp3", mode="wb") as file:
            file.write(response)
            

In [7]:
# def main():
#     instance = TextToSpeach(url)
#     instance.run()

In [8]:
# def main():
#     instance = TextToSpeach(url)
#     initial_size = asizeof.asizeof(instance)
#     instance.run()
#     final_size = asizeof.asizeof(instance)
#     return initial_size, final_size

In [9]:
# if __name__ == "__main__":
#     cProfile.run('main()')

In [10]:
# if __name__ == "__main__":
#     initial, final = main()
#     print(f"Initial memory usage in bytes is: {initial} bytes\nFinal memory usage in bytes is: {final} bytes")
#     print(f"The additional usage when the program finished running is: {final - initial} bytes")

In [11]:
instance = TextToSpeach(url)

In [12]:
if __name__ == "__main__":
    instance.run()