In [None]:
import json
import pandas as pd
from time import sleep
import boto3
import os
import time
import awswrangler as wr
import numpy as np
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.exceptions import ClientError
pd.options.display.max_columns = None

## Util Functions

In [None]:
def save_txt_file(txt_obj, path):
    """Saves text file locally"""
    with open(path, "w") as text_file:
        text_file.write(txt_obj)

In [None]:
def save_json_metadata_file(data, path):
    """Saves json obj locally"""        
    with open(path, 'w') as json_file:
        json.dump(data, json_file, indent=4)

In [None]:

def upload_file_to_s3(file_path, bucket_name, s3_path, max_retries=3):
    """
    Uploads a single file to an S3 bucket with retries on failure.

    :param file_path: Path to the file to upload
    :param bucket_name: S3 bucket name
    :param s3_path: Full S3 path for the file to be uploaded to
    :param max_retries: Maximum number of retries on failure
    """
    s3_client = boto3.client('s3')
    attempts = 0
    while attempts < max_retries:
        try:
            s3_client.upload_file(file_path, bucket_name, s3_path)
            # print(f"Uploaded {file_path} to {s3_path}")
            break  # Break out of the loop on success
        except Exception as e:
            attempts += 1
            # print(f"Error uploading {file_path}: {e}. Attempt {attempts} of {max_retries}")
            if attempts < max_retries:
                sleep_time = 2 ** attempts  # Exponential back-off
                # print(f"Retrying in {sleep_time} seconds...")
                sleep(sleep_time)
            else:
                print(f"Failed to upload {file_path} after {max_retries} attempts.")

In [None]:
def upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder, max_workers=100):
    """
    Uploads a folder to an S3 bucket using multiple threads for efficiency.

    :param folder_path: Local path to the folder
    :param bucket_name: S3 bucket name
    :param s3_folder: Folder path in S3 bucket
    :param max_workers: Maximum number of threads
    """
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for subdir, dirs, files in os.walk(folder_path):
            for file in files:
                full_path = os.path.join(subdir, file)
                s3_path = os.path.join(s3_folder, os.path.relpath(full_path, folder_path))
                futures.append(executor.submit(upload_file_to_s3, full_path, bucket_name, s3_path))

        for idx, future in enumerate(as_completed(futures)):
            if idx % max_workers==0:
                print(idx)
            # Wait for all futures to complete, can handle results or exceptions here
            pass

## ICMS Issuer

In [None]:
 
# Load icms_issuer data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT * 
FROM icms_issuer 
where 1=1
and exclude_from_data_products = False
"""

df = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df.head(5)

In [None]:
def get_issuer_paragraph(row)->str:
    """Generate issuer string paragraph containing details from structured data"""
    description = row['description']
    year_founded = row['year_founded']
    lifecycle_status = row['lifecycle_status']
    domicile_country_code = row['domicile_country_code']
    domicile_state_code = row['domicile_state_code']
    legal_entity_name = row['legal_entity_name']
    slug = row['slug']
    is_banner_visible = row['is_banner_visible']
    banner_message = row['banner_message']
    search_aliases = row['search_aliases']
    stock_split_message = row['stock_split_message']
    exited_date = row['exited_date']
    name = row['name']
    sector = row['sector']
    sub_sector = row['sub_sector']
    
    
    name_str = f"{name} ({slug}): "
    description_str=f"""{f"{description}" if not pd.isna(description) else ""}"""
    sector_str=f"""{f"It operates in the {sector} sector and more specifically the {sub_sector} sectors." if not pd.isna(sub_sector) else ""}"""
    country_str=f"""{f"It is domiciled in the {domicile_country_code}." if not pd.isna(domicile_country_code) else ""}"""
    state_str=f"""{f"The state it is domiciled in is {domicile_state_code}." if not pd.isna(domicile_state_code) else ""}"""
    banner_message_str=f"""{f"{banner_message}" if not pd.isna(banner_message) else ""}"""
    stock_split_message_str=f"""{f"{stock_split_message}" if not pd.isna(stock_split_message) else ""}"""
    
    aliases_str = ""
    try:  
        aliases_str += "The company can also be known as: "
        aliases_list = []
        for alias in eval(search_aliases):
            aliases_list.append(alias.strip())

        aliases_str += f"{str(aliases_list)}."
    except:
        aliases_str = ""
        
    
    str_list = [name_str, description_str, sector_str, country_str, state_str, aliases_str, banner_message_str, stock_split_message_str]
    issuer_str = ""
    for s in str_list:
        if s != "":
            issuer_str += f" {s}"
        
    return issuer_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally
for idx, row in df.iterrows():
    
    issuer_para = get_issuer_paragraph(row)
    name = row['name']
    slug = row["slug"]
    country = row["domicile_country_code"]
    year_founded = row["year_founded"]
    sector = row["sector"]
    sub_sector = row["sub_sector"]

    metadata = {
        "metadataAttributes": {
            "company_name": f"""{f"{name}" if not pd.isna(name) else "N/A"}""",
            "company_id": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "country": f"""{f"{country}" if not pd.isna(country) else "N/A"}""",
            "year_founded": f"""{f"{year_founded}" if not pd.isna(year_founded) else "N/A"}""",
            "sector": f"""{f"{sector}" if not pd.isna(sector) else "N/A"}""",
            "sub_sector": f"""{f"{sub_sector}" if not pd.isna(sub_sector) else "N/A"}""",
        }
    }
    
    # if idx % 1000==0:
    #     print(idx)

    filepath = f"outputs/issuer_info/{slug}.txt"
    metadata_ext = ".metadata.json"
    save_txt_file_s3(issuer_para, filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency
folder_path = 'outputs/issuer_info'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'issuer_info'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder)

## Key People

In [None]:
# load key people data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name as issuer_name
    , kp.* 
FROM icms_issuer_key_person kp
join icms_issuer ii
on ii.slug=kp.issuer_slug
where 1=1
and ii.exclude_from_data_products = false
"""

df_kp = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_kp.head(10)

In [None]:
def get_key_employees(df, issuer_slug, issuer_name)->str:
    """Generate key employee string"""
    ke = df[(df['issuer_slug']==issuer_slug) & (df['role_type']=='KeyEmployee')].reset_index()
    if len(ke)>0:
        key_employee_str=f"The key employees at {issuer_name} ({issuer_slug}) include: "
        for idx, row in ke.iterrows():
            name = row['name']
            context = row['context']
            
            if idx == 0:
                name_context_str = f"{name} ({context})"
            else:
                name_context_str = f", {name} ({context})"
                
            key_employee_str += name_context_str
        # print(key_employee_str)
        return key_employee_str
    else:
        # print(key_employee_str)
        return ""
    
       
def get_board_members(df, issuer_slug, issuer_name)->str:
    """Generate board members string"""
    bm = df[(df['issuer_slug']==issuer_slug) & (df['role_type']=='BoardMember')].reset_index()      
    if len(bm)>0:
        board_members_str=f"The Board Members at {issuer_name} ({issuer_slug}) include: "
        for idx, row in bm.iterrows():
            name = row['name']
            context = row['context']
            
            if idx == 0:
                name_context_str = f"{name} ({context})"
            else:
                name_context_str = f", {name} ({context})"
                
            board_members_str += name_context_str
        # print(board_members_str)    
        return board_members_str
    else:
        # print(board_members_str)
        return ""
            
        

In [None]:
# Generating paragraph for key people and board members for each issuer and saving file and metadata locally

issuer_list = list(set(zip(df_kp['issuer_name'], df_kp['issuer_slug'])))
for issuer in issuer_list:
    issuer_name = issuer[0]
    issuer_slug = issuer[1]
    
    kp_str = get_key_employees(df_kp, issuer_slug, issuer_name)
    
    metadata = {
        "metadataAttributes": {
            "company_name": f"""{f"{issuer_name}" if not pd.isna(issuer_name) else "N/A"}""",
            "company_id": f"""{f"{issuer_slug}" if not pd.isna(issuer_slug) else "N/A"}"""
        }
    }
    
    if kp_str != "":
        filepath = f"outputs/key_employees/{issuer_slug}.txt"
        save_txt_file_s3(kp_str, path=filepath)
        save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")
    
    bm_str = get_board_members(df_kp, issuer_slug, issuer_name)
    if bm_str != "":
        filepath = f"outputs/board_members/{issuer_slug}.txt"
        save_txt_file_s3(bm_str, path=filepath)
        save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")
        

In [None]:
# Uploading files to S3 concurrently to increase latency

folder_path = 'outputs/key_employees'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'key_employees'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder)

In [None]:
# Uploading files to S3 concurrently to increase latency

folder_path = 'outputs/board_members'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'board_members'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder)

## Funding Rounds

In [None]:
# Load data from funding_rounds
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name as issuer_name
    , fr.* 
FROM funding_rounds fr
join icms_issuer ii
on fr.issuer_id_name=ii.slug
where 1=1
    and ii.exclude_from_data_products = false
"""

df_fr = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_fr.head(10)

In [None]:
def get_funding_round_paragraph(row) -> str:
    """Generate frunding rounds string paragraph containing details from structured data"""
    
    name = row["issuer_name"]
    slug = row["issuer_id_name"]
    funding_date = row["funding_date"]
    implied_valuation_dollars = row["implied_valuation_dollars"]
    money_raised_dollars = row["money_raised_dollars"]
    number_of_shares = row["number_of_shares"]
    share_price_cents = row["share_price_cents"]
    share_type = row["share_type"]

    conversion_ratio = row["conversion_ratio"]
    liquidation_preference = row["liquidation_preference"]
    liquidation_preference_order = row["liquidation_preference_order"]
    participation = row["participation"]
    participation_cap = row["participation_cap"]

    dividend_percent = row["dividend_percent"]
    dividend_rate = row["dividend_rate"]
    dividend_authorized = row["dividend_authorized"]
    dividend_cumulative = row["dividend_cumulative"]
    dividend_type = row["dividend_type"]
    blocking_right = row["blocking_right"]

    investors = row["investors"]

    name_str = f"{name} ({slug})"
    funding_date_str = (
        f"""{f"{funding_date.date()}" if not pd.isna(funding_date) else ""}"""
    )
    implied_valuation_dollars_str = f"""{"" if (pd.isna(implied_valuation_dollars) or implied_valuation_dollars==0) else f"${implied_valuation_dollars:,.0f}"}"""
    money_raised_dollars_str = f"""{"" if (pd.isna(money_raised_dollars) or money_raised_dollars==0) else f"${money_raised_dollars:,.0f}"}"""
    number_of_shares_common_str = f"""{"" if (pd.isna(number_of_shares) or number_of_shares==0) else f"{number_of_shares:,.0f}"}"""
    number_of_shares_str = f"""{"" if (pd.isna(number_of_shares) or number_of_shares==0) else f"Number of shares: {number_of_shares:,.0f}"}"""
    share_price_str = f"""{"" if (pd.isna(share_price_cents) or share_price_cents==0) else f"Share price: ${share_price_cents/100:,.2f}"}"""
    share_type_str = f"""{f"{share_type}" if not pd.isna(share_type) else ""}"""
    conversion_ratio_str = f"""{"" if (pd.isna(conversion_ratio) or conversion_ratio==0) else f"Conversion Ratio: {conversion_ratio:.2f}"}"""
    liquidation_preference_str = f"""{"" if (pd.isna(liquidation_preference) or liquidation_preference==0) else f"Liquidation Preference: {liquidation_preference:.2f}"}"""
    liquidation_preference_order_str = f"""{"" if (pd.isna(liquidation_preference_order) or liquidation_preference_order==0) else f"Liquidation Preference Order: {liquidation_preference_order:.0f}"}"""
    participation_str = f"""{f"Participation Rights: {participation}" if not pd.isna(participation) else ""}"""
    participation_cap = f"""{"" if (pd.isna(participation_cap) or participation_cap==0) else f"Participation Cap: {participation_cap:.2f}"}"""
    dividend_percent_str = f"""{"" if (pd.isna(dividend_percent) or dividend_percent==0) else f"Dividend percentage: {dividend_percent:.2f}"}"""
    dividend_rate_str = f"""{"" if (pd.isna(dividend_rate) or dividend_rate==0) else f"Dividend Rate; {dividend_rate:.2f}"}"""
    dividend_authorized_str = f"""{f"Dividend Authorized: {dividend_authorized}" if not pd.isna(dividend_authorized) else ""}"""
    dividend_cumulative_str = f"""{f"Dividend cumulative: {dividend_cumulative}" if not pd.isna(dividend_cumulative) else ""}"""
    dividend_type_str = (
        f"""{f"Dividend type: {dividend_type}" if not pd.isna(dividend_type) else ""}"""
    )
    blocking_right_str = f"""{f"Blocking rights: {blocking_right}" if not pd.isna(blocking_right) else ""}"""

    try:
        investors_list = eval(investors)
        if len(investors_list) > 0:
            investors_str = "Investors: "
            for i, investor in enumerate(investors_list):
                if i == 0:
                    investors_str += investor
                else:
                    investors_str += f", {investor}"

        else:
            investors_str = ""
    except:
        investors_str = ""

    if re.search(r"Common", share_type):
        output_str = f"As of {funding_date_str}, {name_str} has {number_of_shares_common_str} {share_type_str} shares. Below are the other details related to the shares:"
    else:
        output_str = f"{name_str} raised {money_raised_dollars_str} in a {share_type_str} funding round on {funding_date_str}, implying a valuation of {implied_valuation_dollars_str}. Below are the details from the funding round:"

    obj_list = [
        investors_str,
        number_of_shares_str,
        share_price_str,
        conversion_ratio_str,
        liquidation_preference_str,
        liquidation_preference_order_str,
        participation_str,
        participation_cap,
        dividend_percent_str,
        dividend_rate_str,
        dividend_authorized_str,
        dividend_cumulative_str,
        dividend_type_str,
        blocking_right_str,
    ]

    for obj in obj_list:
        if obj != "":
            output_str += f"\n {obj}"

    return output_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in df_fr.iterrows():
    
    funding_round_para = get_funding_round_paragraph(row)
    
    name = row['issuer_name']
    slug = row['issuer_id_name']
    share_type = row['share_type']
    funding_date = row['funding_date']

    metadata = {
        "metadataAttributes": {
            "company_name": f"""{f"{name}" if not pd.isna(name) else "N/A"}""",
            "company_id": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "share_type": f"""{f"{share_type}" if not pd.isna(share_type) else "N/A"}""",
            "funding_date": f"""{f"{funding_date.date()}" if not pd.isna(funding_date) else "N/A"}"""
        }
    }
    
    # if idx % 1000 == 0:
    #     print(idx)
    
    filepath = f'outputs/funding_rounds/"{slug}_{share_type}_{funding_date.date()}.txt'
    save_txt_file_s3(funding_round_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency
folder_path = 'outputs/funding_rounds'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'funding_rounds'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder)

## Public Marks

In [None]:
# Load public marks data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name,
    pm.*
FROM public_marks pm
join icms_issuer ii
on pm.issuer_id_name=ii.slug
where 1=1
 and ii.exclude_from_data_products = false
;
"""

df_pm = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_pm.head(10)

In [None]:
def get_public_marks_paragraph(row)->str:
    """Generate public marks string paragraph containing details from structured data"""
    name = row["name"]
    slug = row["issuer_id_name"]
    reported_on = row['reported_on']
    share_price_cents = row['price_cents']
    implied_enterprise_value_cents = row['implied_enterprise_value_cents']
    share_type = row['share_type']
    fund_name = row['fund_name']
    fund_ticker = row['fund_ticker']
    fund_family = row['fund_family']
    source_url = row['url']
    is_after_exit = row['is_after_exit']
    
    name_str = f"{name} ({slug})"
    reported_on_str = (
        f"""{f"{reported_on.date()}" if not pd.isna(reported_on) else ""}"""
    )
    share_price_str = f"""{"" if (pd.isna(share_price_cents) or share_price_cents==0) else f"${share_price_cents/100:,.2f}"}"""
    implied_enterprise_value_str = f"""{"" if (pd.isna(implied_enterprise_value_cents) or implied_enterprise_value_cents==0) else f"The implied enterprise value based on the share price is ${implied_enterprise_value_cents/100:,.0f}."}"""
    share_type_str = f"""{f"{share_type}" if not pd.isna(share_type) else ""}"""
    fund_name_str = f"""{f"{fund_name}" if not pd.isna(fund_name) else ""}"""
    fund_ticker_str = f"""{f"{fund_ticker}" if not pd.isna(fund_ticker) else ""}"""
    fund_family_str = f"""{f"{fund_family}" if not pd.isna(fund_family) else ""}"""
    source_url_str = f"""{f"Source: {source_url}" if not pd.isna(source_url) else ""}"""
    is_after_exit_str = f"""{"Note this was after the company exited." if is_after_exit else ""}"""
    
    
    public_marks_str = f"On {reported_on_str}, {fund_family_str} ({fund_name_str}) reported a mututal fund public mark on it's investment in {name_str}, pricing its {share_type_str} shares at {share_price_str} per share."
    
    str_list = [implied_enterprise_value_str, is_after_exit_str, source_url_str]
    for s in str_list:
        if s != "":
            public_marks_str += f" {s}"
            
    # print(public_marks_str)
    return public_marks_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in df_pm.iterrows():
    
    public_marks_para = get_public_marks_paragraph(row)
    name = row["name"]
    slug = row["issuer_id_name"]
    id = row["id"]
    reported_on = row['reported_on']
    fund_ticker = row['fund_ticker']
    fund_family = row['fund_family']
    is_after_exit = row['is_after_exit']    
    
    metadata = {
        "metadataAttributes": {
            "company_name": f"""{f"{name}" if not pd.isna(name) else "N/A"}""",
            "company_id": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "reported_on": f"""{f"{reported_on.date()}" if not pd.isna(reported_on) else "N/A"}""",
            "fund_family": f"""{f"{fund_family}" if not pd.isna(fund_family) else "N/A"}""",
            "is_after_exit": f"""{f"{is_after_exit}" if not pd.isna(is_after_exit) else "N/A"}""",
        }
    }
    
    # if idx % 5000 == 0:
    #     print(idx)
    
    filepath = f"outputs/public_marks/{slug}_{id}_{reported_on.date()}.txt"
    metadata_ext = ".metadata.json"    
    save_txt_file_s3(public_marks_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency
folder_path = 'outputs/public_marks'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'public_marks'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder, max_workers=100)

## Secondary Transactions

In [None]:
# Load secondary transactions data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name
    , st.*
FROM secondary_transactions st
join icms_issuer ii
on st.issuer_id_name=ii.slug
where 1=1
 and ii.exclude_from_data_products = false
;
"""

df_st = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_st.head(10)

In [None]:
def get_secondary_transactions_paragraph(row)->str:
    """Generate secondary transactions string paragraph containing details from structured data"""
    name = row["name"]
    slug = row["issuer_id_name"]
    closed_date = row['closed_date']
    share_class = row['class']
    number_of_shares = row['number_of_shares']
    price_per_share_cents = row['price_per_share_cents']
    transaction_size_cents = row['transaction_size_cents']
    structure = row['structure']
    rofr = row['rofr']
    source_created_at = row['source_created_at']
    source_updated_at = row['source_updated_at']
    execution_venue = row['execution_venue']
    buyer_type = row['buyer_type']
    seller_type = row['seller_type']
    
    name_str = f"{name} ({slug})"
    share_class_str = f"""{f"{share_class}" if not pd.isna(share_class) else ""}"""
    structure_str = f"""{f"Structure: {structure}" if not pd.isna(structure) else ""}"""
    rofr_str = f"""{f"Right of first refusal: {rofr}" if not pd.isna(rofr) else ""}"""
    closed_date_str = (
        f"""{f"{closed_date.date()}" if not pd.isna(closed_date) else ""}"""
    )
    source_created_at_str = (
        f"""{f"{source_created_at.date()}" if not pd.isna(source_created_at) else ""}"""
    )
    source_updated_at_str = (
        f"""{f"{source_updated_at.date()}" if not pd.isna(source_updated_at) else ""}"""
    )
    price_per_share_str = f"""{"" if (pd.isna(price_per_share_cents) or price_per_share_cents==0) else f"${price_per_share_cents/100:,.2f}"}"""
    transaction_size_str = f"""{"" if (pd.isna(transaction_size_cents) or transaction_size_cents==0) else f"Total transaction size: ${transaction_size_cents/100:,.2f}"}"""
    number_of_shares_str = f"""{"" if (pd.isna(number_of_shares) or number_of_shares==0) else f"{number_of_shares:,.0f}"}"""
    execution_venue_str = f"""{f"Execution venue: {execution_venue}" if not pd.isna(execution_venue) else ""}"""
    buyer_type_str = f"""{f"Buyer: {buyer_type}" if not pd.isna(buyer_type) else ""}"""
    seller_type_str = f"""{f"Seller: {seller_type}" if not pd.isna(seller_type) else ""}"""
    
    secondary_transactions_str = f"On {closed_date_str}, {number_of_shares_str} {share_class_str} shares of {name_str} were purchased for {price_per_share_str} per share. Transaction details from the trade below:"
    
    str_list = [transaction_size_str, rofr_str, buyer_type_str, seller_type_str, structure_str, execution_venue_str]
    for s in str_list:
        if s != "":
            secondary_transactions_str += f"\n{s}"
            
    # print(secondary_transactions_str)
    return secondary_transactions_str

In [None]:
len(df_st)

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in df_st.iterrows():
    
    secondary_transactions_para = get_secondary_transactions_paragraph(row)
    
    transaction_id = row["id"].split('/')[0]
    name = row["name"]
    slug = row["issuer_id_name"]
    closed_date = row['closed_date']
    transaction_size_cents = row['transaction_size_cents']
    structure = row['structure']
    rofr = row['rofr']
    buyer_type = row['buyer_type']
    seller_type = row['seller_type']  
    
    metadata = {
        "metadataAttributes": {
            "company_name": f"""{f"{name}" if not pd.isna(name) else "N/A"}""",
            "company_id": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "transaction_size_str": f"""{"" if (pd.isna(transaction_size_cents) or transaction_size_cents==0) else f"{transaction_size_cents/100:,.2f}"}""",
            "closed_date": f"""{f"{closed_date.date()}" if not pd.isna(closed_date) else "N/A"}""",
            "rofr": f"""{f"{rofr}" if not pd.isna(rofr) else "N/A"}""",
            "buyer_type": f"""{f"{buyer_type}" if not pd.isna(buyer_type) else "N/A"}""",
            "seller_type": f"""{f"{seller_type}" if not pd.isna(seller_type) else "N/A"}"""
        }
    }
    
    # if idx % 1000 == 0:
    #     print(idx)
    
    filepath = f"outputs/secondary_transactions/{slug}_{transaction_id}_{closed_date.date()}.txt"
    metadata_ext = ".metadata.json"    
    save_txt_file_s3(secondary_transactions_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency
folder_path = 'outputs/secondary_transactions'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'secondary_transactions'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder, max_workers=100)

## VWAP

In [None]:
# Load VWAP data

SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name
    , vw.*
FROM vwap vw
join icms_issuer ii
on vw.issuer_slug=ii.slug
where 1=1
 and ii.exclude_from_data_products = false
 and vw.data_source = 'All'
 and vw.archived_at is null
;
"""

df_vw = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_vw.head()

In [None]:
def get_vwap_paragraph(row)->str:
    """Generate vwap string paragraph containing details from structured data"""
    name = row["name"]
    slug = row["issuer_slug"]
    calc_date = row['calc_date']
    share_type = row['share_type']
    data_source = row['data_source']
    vwap_7 = row['vwap_7']
    vwap_30 = row['vwap_30']
    vwap_45 = row['vwap_45']
    vwap_60 = row['vwap_60']
    vwap_90 = row['vwap_90']
    
    name_str = f"{name} ({slug})"
    share_type_str = f"""{"" if pd.isna(share_type) or share_type == 'All' else f"{share_type} "}"""
    data_source_str = f"""{f"{data_source}" if not pd.isna(data_source) else ""}"""
    calc_date_str = (
        f"""{f"{calc_date.date()}" if not pd.isna(calc_date) else ""}"""
    )
    vwap_7_str = f"""{"" if (pd.isna(vwap_7) or vwap_7==0) else f"7-day vwap: {vwap_7:,.2f}"}"""
    vwap_30_str = f"""{"" if (pd.isna(vwap_30) or vwap_30==0) else f"30-day vwap: {vwap_30:,.2f}"}"""
    vwap_45_str = f"""{"" if (pd.isna(vwap_45) or vwap_45==0) else f"45-day vwap: {vwap_45:,.2f}"}"""
    vwap_60_str = f"""{"" if (pd.isna(vwap_60) or vwap_60==0) else f"60-day vwap: {vwap_60:,.2f}"}"""
    vwap_90_str = f"""{"" if (pd.isna(vwap_90) or vwap_90==0) else f"90-day vwap: {vwap_90:,.2f}"}"""
    
    vwap_str = f"On {calc_date_str}, the volume-weighted average price (VWAP) for {name_str} {share_type_str}shares was:"
    
    str_list = [vwap_7_str, vwap_30_str, vwap_45_str, vwap_60_str, vwap_90_str]
    for s in str_list:
        if s != "":
            vwap_str += f"\n{s}"
            
    # print(vwap_str)
    return vwap_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in df_vw.iterrows():
    
    vwap_para = get_vwap_paragraph(row)
    
    slug = row["issuer_slug"]
    calc_date = row['calc_date']
    share_type = row['share_type']
    
    metadata = {
        "metadataAttributes": {
            "slug": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "calc_date": f"""{f"{calc_date.date()}" if not pd.isna(calc_date) else "N/A"}""",
            "share_type": f"""{f"{share_type}" if not pd.isna(share_type) else "N/A"}""",
        }
    }
    
    # if idx == 1000:
    #     print(idx)
    
    
    filepath = f"vwap/{slug}_{share_type}_{calc_date.date()}.txt"
    metadata_ext = ".metadata.json"    
    save_txt_file_s3(vwap_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency

folder_path = 'outputs/vwap'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'vwap'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder, max_workers=100)

## Forge Prices

In [None]:
# Load Forge Prices data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name
    , fp.*
FROM forge_prices fp
join icms_issuer ii
on fp.issuer_slug=ii.slug
where 1=1
 and ii.exclude_from_data_products = false
--and date >= date('2023-07-01')
;
"""

df_fp = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_fp.head()

In [None]:
def get_forge_prices_paragraph(row)->str:
    """Generate forge prices string paragraph containing details from structured data"""
    name = row["name"]
    slug = row["issuer_slug"]
    date = row['date']
    price = row['price']
    
    name_str = f"{name} ({slug})"
    share_type_str = f"""{"" if pd.isna(share_type) or share_type == 'All' else f"{share_type} "}"""

    date_str = (
        f"""{f"{date.date()}" if not pd.isna(date) else ""}"""
    )
    price_str = f"""{"" if (pd.isna(price) or price==0) else f"${price:,.2f}"}"""
    
    forge_price_str = f"The Forge Price for {name_str} on {date_str} was {price_str} per share."
            
    # print(forge_price_str)
    return forge_price_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in dff_fp.iterrows():
    
    forge_prices_para = get_forge_prices_paragraph(row)
    
    slug = row["issuer_slug"]
    date = row['date']
    
    metadata = {
        "metadataAttributes": {
            "slug": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "date": f"""{f"{date.date()}" if not pd.isna(date) else "N/A"}""",
        }
    }
    
    filepath = f"forge_prices/{slug}_{date.date()}.txt"
    metadata_ext = ".metadata.json"    
    save_txt_file_s3(forge_prices_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

## Indications of Interest (IOI)

In [None]:
# Load IOI data
SCHEMA_NAME = "datalake-curated-production"

query_stry = """
SELECT 
    ii.name
    , ih.*
FROM ioi_history ih
join icms_issuer ii
on ih.issuer_id_name=ii.slug
where 1=1
 and ii.exclude_from_data_products = false
 and record_date >= date('2023-07-01')
;
"""

df_ih = wr.athena.read_sql_query(sql=query_stry, database=SCHEMA_NAME)
# df_ih.head()

In [None]:
def get_ioi_paragraph(row)->str:
    """Generate IOI string paragraph containing details from structured data"""
    name = row["name"]
    slug = row["issuer_id_name"]
    record_date = row['record_date']
    created_at = row['created_at']
    ioi_type = row['type']
    share_class = row['class']
    interests_share_type = row['interests_share_type']
    min_volume = row['min_volume']
    client_type = row['client_type']
    max_volume = row['max_volume']
    user_initiated = row['user_initiated']
    min_price = row['min_price']
    max_price = row['max_price']
    deal_amount_min = row['deal_amount_min']
    deal_amount_max = row['deal_amount_max']
    implied_valuation_min = row['implied_valuation_min']
    implied_valuation_max = row['implied_valuation_max']
    
    name_str = f"{name} ({slug})"
    record_date_str = (
        f"""{f"{record_date.date()}" if not pd.isna(record_date) else ""}"""
    )
    created_at_str = (
        f"""{f"{created_at.date()}" if not pd.isna(created_at) else ""}"""
    )
    ioi_type_str = f"""{f"{ioi_type}" if not pd.isna(ioi_type) else ""}"""
    share_class_str = f"""{f"{share_class}" if not pd.isna(share_class) else ""}"""
    interests_share_type_str = f"""{f"{interests_share_type}" if not pd.isna(interests_share_type) else ""}"""
    min_volume_str = f"""{"" if (pd.isna(min_volume) or min_volume==0) else f"Min volume: {min_volume/100:,.2f}"}"""
    max_volume_str = f"""{"" if (pd.isna(max_volume) or max_volume==0) else f"Max volume: {max_volume/100:,.2f}"}"""
    client_type_str = f"""{f"{client_type}" if not pd.isna(client_type) else ""}"""
    user_initiated_str = f"""{f"User initiated: {user_initiated}" if not pd.isna(user_initiated) else ""}"""
    min_price_str = f"""{"" if (pd.isna(min_price) or min_price==0) else f"Min price: ${min_price/100:,.2f}"}"""
    max_price_str = f"""{"" if (pd.isna(max_price) or max_price==0) else f"Max price: ${max_price/100:,.2f}"}"""
    deal_amount_min_str = f"""{"" if (pd.isna(deal_amount_min) or deal_amount_min==0) else f"Min deal amount: ${deal_amount_min/100:,.2f}"}"""
    deal_amount_max_str = f"""{"" if (pd.isna(deal_amount_max) or deal_amount_max==0) else f"Max deal amount: ${deal_amount_max/100:,.2f}"}"""
    implied_valuation_min_str = f"""{"" if (pd.isna(implied_valuation_min) or implied_valuation_min==0) else f"Min implied valuation: ${implied_valuation_min/100:,.2f}"}"""
    implied_valuation_max_str = f"""{"" if (pd.isna(implied_valuation_max) or implied_valuation_max==0) else f"Max implied valuation: ${implied_valuation_max/100:,.2f}"}"""


    ioi_str = f"On {record_date_str}, an indication of interest (IOI) to {ioi_type.upper()} {name_str} {interests_share_type} shares was submitted by an {client_type} client. Below are further details on the IOI:"
    
    str_list = [user_initiated_str, min_volume_str, max_volume_str,min_price_str, max_price_str, deal_amount_min_str, deal_amount_max_str,
                implied_valuation_min_str, implied_valuation_max_str]
    for s in str_list:
        if s != "":
            ioi_str += f"\n{s}"
            
    # print(ioi_str)
    return ioi_str

In [None]:
# Generating paragraph for each issuer and saving file and metadata locally

for idx, row in df_ih.iterrows():
    
    ioi_para = get_ioi_paragraph(row)
    
    slug = row["issuer_id_name"]
    record_date = row['record_date']
    ioi_id = row['ioi_id']
    ioi_type = row['type']
    
    metadata = {
        "metadataAttributes": {
            "slug": f"""{f"{slug}" if not pd.isna(slug) else "N/A"}""",
            "ioi_type": f"""{f"{ioi_type}" if not pd.isna(ioi_type) else "N/A"}""",
            "record_date": f"""{f"{record_date.date()}" if not pd.isna(record_date) else "N/A"}""",
        }
    }
    
    # if idx % 1000==0:
    #     print(idx)
    
    filepath = f"outputs/ioi/{slug}_{ioi_id}_{record_date.date()}.txt"
    metadata_ext = ".metadata.json"    
    save_txt_file_s3(ioi_para, path=filepath)
    save_json_metadata_file(metadata, f"{filepath}{metadata_ext}")

In [None]:
# Uploading files to S3 concurrently to increase latency

folder_path = 'outputs/ioi'
bucket_name = 'knowledge-base-aiml-test'
s3_folder = 'ioi'

upload_folder_to_s3_concurrently(folder_path, bucket_name, s3_folder, max_workers=100)