In [6]:
import requests
from bs4 import BeautifulSoup
from config import Config
from urllib.parse import urlparse, urljoin
from urllib.request import urlopen
import certifi
import json
import openai
import os

cfg = Config()

In [7]:
# Function to check if the URL is valid
def is_valid_url(url):
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except ValueError:
        return False


# Function to sanitize the URL
def sanitize_url(url):
    return urljoin(url, urlparse(url).path)


# Define and check for local file address prefixes
def check_local_file_access(url):
    local_prefixes = ['file:///', 'file://localhost', 'http://localhost', 'https://localhost']
    return any(url.startswith(prefix) for prefix in local_prefixes)


def get_response(url, headers=cfg.user_agent_header, timeout=10):
    try:
        # Restrict access to local files
        if check_local_file_access(url):
            raise ValueError('Access to local files is restricted')

        # Most basic check if the URL is valid:
        if not url.startswith('http://') and not url.startswith('https://'):
            raise ValueError('Invalid URL format')

        sanitized_url = sanitize_url(url)

        response = requests.get(sanitized_url, headers=headers, timeout=timeout)

        # Check if the response contains an HTTP error
        if response.status_code >= 400:
            return None, "Error: HTTP " + str(response.status_code) + " error"

        return response, None
    except ValueError as ve:
        # Handle invalid URL format
        return None, "Error: " + str(ve)

    except requests.exceptions.RequestException as re:
        # Handle exceptions related to the HTTP request (e.g., connection errors, timeouts, etc.)
        return None, "Error: " + str(re)


def scrape_text(url):
    """Scrape text from a webpage"""
    response, error_message = get_response(url)
    if error_message:
        return error_message

    soup = BeautifulSoup(response.text, "html.parser")

    for script in soup(["script", "style"]):
        script.extract()

    text = soup.get_text()
    lines = (line.strip() for line in text.splitlines())
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    text = '\n'.join(chunk for chunk in chunks if chunk)

    return text


def extract_hyperlinks(soup):
    """Extract hyperlinks from a BeautifulSoup object"""
    hyperlinks = []
    for link in soup.find_all('a', href=True):
        hyperlinks.append((link.text, link['href']))
    return hyperlinks


def format_hyperlinks(hyperlinks):
    """Format hyperlinks into a list of strings"""
    formatted_links = []
    for link_text, link_url in hyperlinks:
        formatted_links.append(f"{link_text} ({link_url})")
    return formatted_links


def scrape_links(url):
    """Scrape links from a webpage"""
    response, error_message = get_response(url)
    if error_message:
        return error_message

    soup = BeautifulSoup(response.text, "html.parser")

    for script in soup(["script", "style"]):
        script.extract()

    hyperlinks = extract_hyperlinks(soup)

    return format_hyperlinks(hyperlinks)



def split_text(text, max_length=3000):
    """Split text into chunks of a maximum length"""
    paragraphs = text.split("\n")
    current_length = 0
    current_chunk = []

    for paragraph in paragraphs:
        # If the paragraph itself is longer than max_length, split it into smaller substrings
        while len(paragraph) > max_length:
            substring = paragraph[:max_length]
            yield substring
            paragraph = paragraph[max_length:]
        
        # Check if adding the paragraph to the current chunk exceeds max_length
        if current_length + len(paragraph) + 1 <= max_length:
            current_chunk.append(paragraph)
            current_length += len(paragraph) + 1
        else:
            yield "\n".join(current_chunk)
            current_chunk = [paragraph]
            current_length = len(paragraph) + 1

    if current_chunk:
        yield "\n".join(current_chunk)


def create_message(chunk, question):
    """Create a message for the user to summarize a chunk of text"""
    return {
        "role": "user",
        "content": f"\"\"\"{chunk}\"\"\" Using the above text, please answer the following question: \"{question}\" -- if the question cannot be answered using the text, please summarize the text. " \
            " Respond in bullet points starting with a short and creative title that summarizes the text.  Do not include the word Title in your title"
    }


def summarize_text(text, question):
    """Summarize text using the LLM model"""
    if not text:
        return "Error: No text to summarize"

    text_length = len(text)
    print(f"Text length: {text_length} characters")

    summaries = []
    chunks = list(split_text(text))

    for i, chunk in enumerate(chunks):
        #print(f"Summarizing chunk {i + 1} / {len(chunks)}")
        messages = [create_message(chunk, question)]

        response = openai.ChatCompletion.create(
                    model=cfg.fast_llm_model,
                    messages=messages,
                    temperature=0,
                    max_tokens=900
                )
            
        summaries.append(response.choices[0].message["content"])
        print("\n" + str(i) + " " + response.choices[0].message["content"])

    #print(f"Summarized {len(chunks)} chunks.")

    combined_summary = "\n".join(summaries)
    if len(chunks) > 1 :
        messages = [create_message(combined_summary, question)]

        response = openai.ChatCompletion.create(
                    model=cfg.fast_llm_model,
                    messages=messages,
                    temperature=0,
                    max_tokens=1800
                )
        final_summary =response.choices[0].message["content"]

    else:
        final_summary = combined_summary

    return final_summary


In [8]:
def get_text_summary(url, question):
    """Return the results of a google search"""
    text = scrape_text(url)
    summary = summarize_text(text, question)
    return """ "Result" : """ + summary

In [9]:
#get_text_summary("https://wtirealist.substack.com/p/frontera-lets-be-realistic?utm_source=post-email-title&publication_id=1114715&post_id=114436945&isFreemail=true&utm_medium=email","what are the main points")

In [10]:
FMP_KEY=os.getenv("FMP_KEY")
url = "https://financialmodelingprep.com/api/v3/earning_call_transcript/"+"TRTN"+"?quarter=4&year=2022&apikey="+FMP_KEY
response = urlopen(url, cafile=certifi.where())
data = response.read().decode("utf-8")
js = json.loads(data)
fa=js[0]
text=fa["content"]
summarize_text(text, "what are the main points")


  response = urlopen(url, cafile=certifi.where())


Text length: 39140 characters

0 Triton International's Strong Financial Performance in 2022

- Outstanding year for Triton International with strong Q4 results
- Generated $2.76 of adjusted net income per share in Q4 and $11.32 of adjusted earnings per share for the full year 2022
- Achieved an annualized return on equity of 25.4% in Q4 and 28.4% for the full year 2022
- Market environments slowed in 2022 following exceptional container demand in 2021
- Consumers are shifting spending back from goods to services and global economic headwinds are hitting trades
- Logistical bottlenecks have eased, freeing container capacity and decreasing utilization from last year's record level
- Triton has significant operational and financial advantages in the market
- Majority of containers are on multi-year long term leases with low cost financing and long term fixed rate debt
- Shifted investment focus from fleet growth in 2021 to share repurchases in 2022, purchasing over 9.1 million shares
- E

KeyboardInterrupt: 