In [38]:
'''
Imports for testing purposes
'''

import requests
from bs4 import BeautifulSoup
import urllib
from tqdm.notebook import tqdm
import pandas as pd
import os
from ast import literal_eval
from datetime import datetime
from utils.base_templates import NewsArticle, ArticleCollection, Company, Insider

import database.database_creator as dc
import database.database_utils as du

In [None]:
input_file = "database/insiders_2024-10-22.csv"
output_file = "database/full_insiders_info_2024-10-22.csv"

dc.save_insiders_to_csv(input_file, output_file)
dc.scrape_all_companies()

In [32]:
df = pd.read_csv('database/companies_info_2024-11-01.csv')
df.columns

Index(['name', 'isin', 'ticker', 'industry', 'sector', 'profile', 'executives',
       'link', 'country'],
      dtype='object')

In [59]:
def str_to_dict_expansion(dict_repr: any) -> dict:
    dict_repr = str(dict_repr)
    if dict_repr.strip() == '{}':
        return {}
    else:
        try:
            return ast.literal_eval(dict_repr.strip())
        except Exception as e:
            print(e)
            return {}

def find_company_and_insiders_from_article(article_link: str):
    # Step 1: Fetch the article from the Articles collection
    articles_collection = du.get_db_collection(du.DB_NAME, 'Articles')
    article = articles_collection.find_one({"link": article_link})
    if not article:
        print("Article not found in the database.")
        return None

    # Step 2: Fetch the company linked to this article
    company_link = 'https://www.marketscreener.com' + article.get('company_link', '')
    companies_collection = du.get_db_collection(du.DB_NAME, 'Companies')
    company = companies_collection.find_one({"link": company_link})
    if not company:
        print("Company not found in the database.")
        return None

    # Step 3: Get executive 'href' links to query the Insiders collection
    executives = literal_eval(company.get('executives', '{}'))
    insider_links = [
        exec_info['href'] 
        for exec_category in executives.values() 
        for exec_info in exec_category.values()
    ]

    # Step 4: Fetch insiders based on the collected links
    insiders_collection = du.get_db_collection(du.DB_NAME, 'Insiders')
    insiders = list(insiders_collection.find({"link": {"$in": insider_links}}))

    # Return the collected data as a dictionary
    return {
        "article": article,
        "company": company,
        "insiders": insiders
    }

def generate_markdown_report(article_link: str, output_dir: str = "reports"):
    # Fetch article, company, and insiders data
    data = find_company_and_insiders_from_article(article_link)
    if data is None:
        return

    article = data["article"]
    company = data["company"]
    insiders = data["insiders"]

    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Generate the markdown file name using the article headline and date
    headline = article.get('headline', 'unknown_headline').replace(' ', '_')
    publication_date = article.get('publication_date', 'unknown_date')
    file_name = f"{headline}_{publication_date}.md"
    file_path = os.path.join(output_dir, file_name)

    # Write the markdown report
    with open(file_path, 'w') as md_file:
        md_file.write(f"# {article.get('headline')}\n")
        md_file.write(f"**Publication Date:** {article.get('publication_date')}\n")
        md_file.write(f"**Category:** {article.get('category')}\n")
        md_file.write(f"**Source:** {article.get('source')}\n\n")
        md_file.write("## Article Content\n")
        md_file.write(f"{article.get('content')}\n\n")
        md_file.write("## Company Information\n")
        md_file.write(f"- **Name:** {company.get('name')}\n")
        md_file.write(f"- **ISIN:** {company.get('isin')}\n")
        md_file.write(f"- **Ticker:** {company.get('ticker')}\n")
        md_file.write(f"- **Industry:** {company.get('industry')}\n")
        md_file.write(f"- **Sector:** {company.get('sector')}\n")
        md_file.write(f"- **Country:** {company.get('country')}\n")
        profile = company.get('profile', '').replace('-', '\\-')
        md_file.write(f"- **Profile:** {profile}\n\n")
        md_file.write("## Linked Insiders\n")
        for insider in insiders:
            md_file.write(f"- **Name:** {insider.get('name')}\n")
            md_file.write(f"  - **Current Position:** {insider.get('current_position')}\n")
            md_file.write(f"  - **Current Company:** {insider.get('current_company')}\n")
            md_file.write(f"  - **Company URL:** {insider.get('company_url')}\n")
            md_file.write(f"  - **Net Worth:** {insider.get('net_worth', 'N/A')}\n")
            known_holdings = str_to_dict_expansion(insider.get('known_holdings', '{}'))
            md_file.write(f"  - **Known Holdings:**\n")
            for company, details in known_holdings.items():
                md_file.write(f"    - **{company}:**\n")
                md_file.write(f"      - **Link:** {details.get('link', 'N/A')}\n")
                md_file.write(f"      - **Date:** {details.get('date', 'N/A')}\n")
                md_file.write(f"      - **Number of Shares:** {details.get('number_of_shares', 'N/A')}\n")
                md_file.write(f"      - **Valuation:** {details.get('valuation', 'N/A')}\n")
                md_file.write(f"      - **Valuation Date:** {details.get('valuation_date', 'N/A')}\n")
            md_file.write(f"  - **Age:** {insider.get('age', 'N/A')}\n")
            md_file.write(f"  - **Industries:** {', '.join(literal_eval(insider.get('industries', [])))}\n")
            md_file.write(f"  - **Summary:** {insider.get('summary', 'N/A')}\n")
            md_file.write("  - **Active Positions:**\n")
            active_positions = str_to_dict_expansion(insider.get('active_positions', '{}'))
            for position, date in active_positions.items():
                md_file.write(f"    - {position}: {date}\n")
            md_file.write("  - **Former Positions:**\n")
            former_positions = str_to_dict_expansion(insider.get('former_positions', '{}'))
            for position, date in former_positions.items():
                md_file.write(f"    - {position}: {date}\n")
            md_file.write("  - **Education:**\n")
            trainings = str_to_dict_expansion(insider.get('trainings', '{}'))
            for training, details in trainings.items():
                md_file.write(f"    - {training}: {details}\n")
            md_file.write("\n---\n\n")

    print(f"Markdown report saved to {file_path}")

# Example usage
df = pd.read_csv('database/marketscreener_articles_2024-11-01.csv')
link = df.iloc[103].link
generate_markdown_report(link)

Markdown report saved to reports/Dabur_India_to_Buy_Majority_Stake_in_Sesa_Care_2024-10-30 16:00:00.md


In [18]:
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'
}

def parse_time_string(time_str: str) -> datetime:
    """
    Parses a time string into a datetime object with the current date or year.
    
    Args:
        time_str (str): The time string to convert, e.g., '02:55am', 'Oct. 14', or '2024-10-20'.
        
    Returns:
        datetime: A datetime object with the appropriate date and time.
    """
    formats = [
        ('%I:%M%p', lambda t: datetime.combine(datetime.now().date(), t.time())),
        ('%b. %d', lambda d: datetime(datetime.now().year, d.month, d.day)),
        ('%Y-%m-%d', lambda d: d)
    ]
    
    for fmt, constructor in formats:
        try:
            parsed_date = datetime.strptime(time_str, fmt)
            return constructor(parsed_date)
        except ValueError:
            continue
    
    raise ValueError(f"Time string '{time_str}' does not match expected formats.")

def fetch_html_content(url: str) -> str:
    """
    Fetches and returns the HTML content of a given URL.
    
    Args:
        url (str): The URL to fetch.
        
    Returns:
        str: The HTML content of the page.
    """
    try:
        response = requests.get(url, headers=HEADERS)
        response.raise_for_status()
        return urllib.parse.unquote(response.text)
    except requests.RequestException as e:
        raise RuntimeError(f"Failed to fetch URL {url}: {e}")

def extract_article_text(soup: BeautifulSoup) -> str:
    """
    Extracts the article text from a BeautifulSoup object.
    
    Args:
        soup (BeautifulSoup): The BeautifulSoup object containing the HTML content.
        
    Returns:
        str: The extracted article text.
    """
    article_div = soup.find('div', class_='txt-s4 article-text')
    if not article_div:
        return 'Article text not found in the provided URL'
    return article_div.get_text(separator='\n', strip=True)

def extract_marketscreener_article(url: str) -> str:
    """
    Extracts the article text from a MarketScreener article URL.
    
    Args:
        url (str): The URL of the MarketScreener article.
        
    Returns:
        str: The extracted article text.
    """
    try:
        html_content = fetch_html_content(url)
        soup = BeautifulSoup(html_content, 'html.parser')
        return extract_article_text(soup)
    except Exception as e:
        return f'Error with extracting article text: {e}'

def get_articles_from_marketscreener(url: str) -> ArticleCollection:
    """
    Retrieves articles from a MarketScreener page.
    
    Args:
        url (str): The URL of the MarketScreener page.
        
    Returns:
        ArticleCollection: A collection of articles extracted from the page.
    """
    try:
        html_content = fetch_html_content(url)
        soup = BeautifulSoup(html_content, 'html.parser')
    except RuntimeError as e:
        print(e)
        return ArticleCollection()

    articles_list = []  # Initialize a list to store NewsArticle objects
    category = "MarketScreener "  # Assuming the source is MarketScreener
    extraction_date = datetime.now()  # Current date and time for extraction

    tables = soup.find_all('table')
    for table in tables:
        for row in table.find_all('tr'):
            headline_tag = row.find('a', class_='c txt-s1 txt-overflow-2 link link--no-underline my-5 my-m-0')
            if not headline_tag:
                continue

            headline = headline_tag.text.strip()
            link = 'https://www.marketscreener.com' + headline_tag.get('href')
            # Find the company link
            company_tag = row.find('a', class_='link link--blue c-flex align-top')
            company_name = company_tag.get('title') if company_tag else None
            company_link = company_tag.get('href') if company_tag else None

            source_tag = row.find('span', class_='c-block p-5 badge badge--small txt-s1')
            source = source_tag.get('title') if source_tag else source
            time_tag = row.find('span', class_='js-date-relative txt-muted h-100')
            publication_date = parse_time_string(time_tag.text.strip()) if time_tag else None

            article = NewsArticle(
                headline=headline,
                content=extract_marketscreener_article(link),
                link=link,
                company_name=company_name,
                company_link=company_link,
                source=source,
                publication_date=publication_date
            )
            articles_list.append(article)  # Append each NewsArticle to the list

    # Create and return an ArticleCollection object with all fields filled
    return ArticleCollection(
        extraction_date=extraction_date,
        articles=articles_list
    )

def save_articles_to_csv(endpoint_list, base_url, csv_file_name):
    """
    Iterates through the endpoint list, retrieves article collections, and saves them to a CSV file.
    
    Args:
        endpoint_list (list): List of endpoints to retrieve articles from.
        base_url (str): The base URL for the MarketScreener news.
        csv_file_name (str): The name of the CSV file to save the articles.
    """
    all_articles = []

    for endpoint in endpoint_list:
        url = f'{base_url}/{endpoint}/'
        articles = get_articles_from_marketscreener(url)
        
        for article in articles.articles:
            all_articles.append({
                'category': endpoint,
                'extraction_date': articles.extraction_date,
                'headline': article.headline,
                'content': article.content,
                'company_name': article.company_name,
                'company_link': article.company_link,
                'link': article.link,
                'source': article.source,
                'publication_date': article.publication_date
            })

    # Convert the list of dictionaries to a DataFrame
    df = pd.DataFrame(all_articles)

    # Save the DataFrame to a CSV file
    df.to_csv(csv_file_name, index=False)
    print(f"Data saved to {csv_file_name}")

In [19]:
# Usage
endpoint_list = ['IPO', 'mergers-acquisitions', 'rumors']
base_url = 'https://www.marketscreener.com/news/companies'
csv_file_name = f"marketscreener_articles_{datetime.now().strftime('%Y-%m-%d')}.csv"
save_articles_to_csv(endpoint_list, base_url, csv_file_name)

Data saved to marketscreener_articles_2024-11-01.csv


In [None]:
'''
Test using archive.is to access paywalled news sites
'''

import requests
from bs4 import BeautifulSoup
import urllib.parse

def search_archive_is(original_url: str) -> str:
    """
    Searches archive.is for an existing archived URL of the given original URL.
    """
    search_url = f"https://archive.is/{original_url}"
    response = requests.get(search_url, headers=HEADERS)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, "html.parser")
        # The search results usually have <a> tags pointing to the archived URL
        result = soup.find_all('a', href=True)
        if result:
            return result
    return None

url = 'https://www.reuters.com/business/healthcare-pharmaceuticals/bicara-therapeutics-targets-265-mln-proceeds-upsized-us-ipo-2024-09-11/'
test = search_archive_is(url)
for item in test:
    print(item['href'])