In [1]:
# retrieving company metadata and exploration
!pip install httpx tqdm pandas chardet



In [3]:
import httpx
import os
import pandas as pd

def download_file(client, url, file_path):
    """Download a file from a given URL to a specified path."""
    if not os.path.exists(file_path):
        with client.stream("GET", url) as response:
            response.raise_for_status()
            with open(file_path, 'wb') as f:
                for chunk in response.iter_bytes(chunk_size=8192):
                    f.write(chunk)

data_url = 'https://data.gov.au/data/dataset/bc515135-4bb6-4d50-957a-3713709a76d3/resource/55ad4b1c-5eeb-44ea-8b29-d410da431be3/download/business_names_202404.csv'
# Initialize an HTTP client
client = httpx.Client()
file_path = f"data/{data_url.split('/')[-1]}"
# Fetch the metadata csv:
download_file(client, data_url, file_path)

In [4]:
import chardet
file_path = f"data/{data_url.split('/')[-1]}"
with open(file_path, 'rb') as f:
    result = chardet.detect(f.read(10000))
encoding = result['encoding']
df = pd.read_csv(file_path, encoding=encoding, engine='python', sep=None)
df

Unnamed: 0,REGISTER_NAME,BN_NAME,BN_STATUS,BN_REG_DT,BN_CANCEL_DT,BN_RENEW_DT,BN_STATE_NUM,BN_STATE_OF_REG,BN_ABN
0,BUSINESS NAMES,SILENT SCISSORZ,Registered,07/11/2018,,07/11/2025,,,7.664328e+10
1,BUSINESS NAMES,LITTLE MIRACLES PRESCHOOL & LO...,Registered,27/07/2022,,27/07/2025,,,2.397982e+10
2,BUSINESS NAMES,A Cut Above Painting & Texture Coating,Registered,04/12/2019,,04/12/2022,,,8.663468e+10
3,BUSINESS NAMES,HOMSAFE,Registered,07/02/2019,,31/12/2024,,,5.609895e+10
4,BUSINESS NAMES,COASTAL EARTH WORKS,Registered,30/05/2019,,30/05/2026,,,8.857312e+10
...,...,...,...,...,...,...,...,...,...
3072030,BUSINESS NAMES,zzz tyres and auto services,Registered,12/01/2024,,12/01/2025,,,2.547044e+10
3072031,BUSINESS NAMES,ZZZINKED DIGITAL,Registered,08/04/2017,,08/04/2024,,,8.367605e+10
3072032,BUSINESS NAMES,ZZZY,Registered,27/08/2020,,27/08/2023,,,1.656213e+10
3072033,BUSINESS NAMES,ZZZZ BEST TEES,Registered,12/05/2023,,12/05/2026,,,7.866088e+10


In [36]:
!pip install dask tqdm requests openai dask-expr chardet tqdm pandas matplotlib

Collecting dask
  Downloading dask-2024.4.1-py3-none-any.whl.metadata (3.8 kB)
Collecting requests
  Using cached requests-2.31.0-py3-none-any.whl.metadata (4.6 kB)
Collecting dask-expr
  Downloading dask_expr-1.0.10-py3-none-any.whl.metadata (2.4 kB)
Collecting matplotlib
  Downloading matplotlib-3.8.4-cp311-cp311-macosx_11_0_arm64.whl.metadata (5.8 kB)
Collecting click>=8.1 (from dask)
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting cloudpickle>=1.5.0 (from dask)
  Using cached cloudpickle-3.0.0-py3-none-any.whl.metadata (7.0 kB)
Collecting fsspec>=2021.09.0 (from dask)
  Using cached fsspec-2024.3.1-py3-none-any.whl.metadata (6.8 kB)
Collecting partd>=1.2.0 (from dask)
  Using cached partd-1.4.1-py3-none-any.whl.metadata (4.6 kB)
Collecting pyyaml>=5.3.1 (from dask)
  Using cached PyYAML-6.0.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (2.1 kB)
Collecting toolz>=0.10.0 (from dask)
  Using cached toolz-0.12.1-py3-none-any.whl.metadata (5.1 kB)
Collecting impo

In [5]:
import httpx
from openai import OpenAI
import pandas as pd
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import json
openapi_client = OpenAI(api_key="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")


core_messages = [
                {"role": "system", 
                "content": """
                You are a financial expert, who is able to provide detailed information about companies 
                listed on the Australian Stock exchange (ASX) in JSON format. You should provide your 
                response with the following fields/schema:
                {
                // Name of the ASX company
                "company_name": string,
                // where the company is located/headquartered, the FULL location (i.e stree number, street name, city, state, postcode)
                "address": string,
                // short summary about company
                "summary": string,
                // detailed overview of company profile, innclude industry, history etc.
                "details": string
                // companies website (i.e https://1414degrees.com.au/)
                "website": string,
                // company about or investor page (i.e https://1414degrees.com.au/investors/)
                "website_about": string,
                // link to company logo (i.e https://1414degrees.com.au/wp-content/uploads/2023/04/1414degrees-GreyRed-RGB.png)
                "company_logo_link": string,
                // "directions and/or senior leadership information"
                "directors": [{"name": "links to additional sources of information"string, "title": string}],
                // links to additional sources of information
                "references": [{"url": string, "description": string}]
                }
                """},
            ]

def validate_company_metadata(company_metadata_json):
    """
    Validate the company metadata JSON object.
    looks at the different fields and the types of the fields and roughly assesses 
    if the JSON object is valid. If the JSON object is not valid, it will raise an exception. 
    which can allow follow up queries to be made.
    Additionally  does some more advanced checks for urls:
     * it will check if the URL is valid and does not return a 404 errors etc
     * for images like in the company logo, it will check if the image is valid and can be downloaded
    the schema we are checking against is as follows:
    {
        // Name of the ASX company
        "company_name": string,
        // where the company is located/headquartered, the FULL location (i.e stree number, street name, city, state, postcode)
        "address": string,
        // short summary about company
        "summary": string,
        // detailed overview of company profile, innclude industry, history etc.
        "details": string
        // companies website (i.e https://1414degrees.com.au/)
        "website": string,
        // company about or investor page (i.e https://1414degrees.com.au/investors/)
        "website_about": string,
        // link to company logo (i.e https://1414degrees.com.au/wp-content/uploads/2023/04/1414degrees-GreyRed-RGB.png)
        "company_logo_link": string,
        // "directions and/or senior leadership information"
        "directors": [{"name": "links to additional sources of information"string, "title": string}],
        // links to additional sources of information
        "references": [{"url": string, "description": string}]
    }
    """
    if not isinstance(company_metadata_json, dict):
        raise ValueError("Company metadata JSON object should be a dictionary")
    if "company_name" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'company_name' field")
    if not isinstance(company_metadata_json["company_name"], str):
        raise ValueError("Company metadata JSON object 'company_name' field should be a string")
    if "address" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'address' field")
    if not isinstance(company_metadata_json["address"], str):
        raise ValueError("Company metadata JSON object 'address' field should be a string")
    if "summary" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'summary' field")
    if not isinstance(company_metadata_json["summary"], str):
        raise ValueError("Company metadata JSON object 'summary' field should be a string")
    if "details" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'details' field")
    if not isinstance(company_metadata_json["details"], str):
        raise ValueError("Company metadata JSON object 'details' field should be a string")
    if "website" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'website' field")
    if not isinstance(company_metadata_json["website"], str):
        raise ValueError("Company metadata JSON object 'website' field should be a string")
    if "website_about" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'website_about' field")
    if not isinstance(company_metadata_json["website_about"], str):
        raise ValueError("Company metadata JSON object 'website_about' field should be a string")
    if "company_logo_link" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'company_logo_link' field")
    if not isinstance(company_metadata_json["company_logo_link"], str):
        raise ValueError("Company metadata JSON object 'company_logo_link' field should be a string")
    if "directors" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'directors' field")
    if not isinstance(company_metadata_json["directors"], list):
        raise ValueError("Company metadata JSON object 'directors' field should be a list")
    for director in company_metadata_json["directors"]:
        if not isinstance(director, dict):
            raise ValueError("Company metadata JSON object 'directors' field should be a list of dictionaries")
        if "name" not in director:
            raise ValueError("Company metadata JSON object 'directors' field should have a 'name' field")
        if not isinstance(director["name"], str):
            raise ValueError("Company metadata JSON object 'directors' field 'name' field should be a string")
        if "title" not in director:
            raise ValueError("Company metadata JSON object 'directors' field should have a 'title' field")
        if not isinstance(director["title"], str):
            raise ValueError("Company metadata JSON object 'directors' field 'title' field should be a string")
    if "references" not in company_metadata_json:
        raise ValueError("Company metadata JSON object should have a 'references' field")
    if not isinstance(company_metadata_json["references"], list):
        raise ValueError("Company metadata JSON object 'references' field should be a list")
    for reference in company_metadata_json["references"]:
        if not isinstance(reference, dict):
            raise ValueError("Company metadata JSON object 'references' field should be a list of dictionaries")
        if "url" not in reference:
            raise ValueError("Company metadata JSON object 'references' field should have a 'url' field")
        if not isinstance(reference["url"], str):
            raise ValueError("Company metadata JSON object 'references' field 'url' field should be a string")
        if "description" not in reference:
            raise ValueError("Company metadata JSON object 'references' field should have a 'description' field")
        if not isinstance(reference["description"], str):
            raise ValueError("Company metadata JSON object 'references' field 'description' field should be a string")
    # check if the urls are valid
    for reference in company_metadata_json["references"]:
        try:
            response = httpx.get(reference["url"])
            if [response.status_code] in [200,301, 302, 303, 304]:
                company_metadata_json["references"].remove(reference)
        except Exception as e:
            company_metadata_json["references"].remove(reference)
        
            # TODO: do some error handling and try find alternative references
            # raise ValueError(f"URL {reference['url']} is not valid, got status: {response.status_code}, response: {response.text}")
    try:
        response = httpx.get(company_metadata_json["website"])
    except Exception as e:
        raise ValueError(f"URL {company_metadata_json['website']} is likely an invalid format, must contain `http://` or `https://`, got error: {str(e)}")
    
    if [response.status_code] in [200, 301, 302, 303, 304]:
        raise ValueError(f"URL {company_metadata_json['website']} is not valid, got status: {response.status_code}, response: {response.text}")
    
    try:
        response = httpx.get(company_metadata_json["website_about"])
    except Exception as e:
        raise ValueError(f"URL {company_metadata_json['website']} is likely an invalid format or failed to connect, must contain `http://` or `https://`")
    if [response.status_code] in [200, 301,  302, 303, 304]:
        raise ValueError(f"URL {company_metadata_json['website_about']} is not valid, got status: {response.status_code}, response: {response.text}")
    try:
        response = httpx.get(company_metadata_json["company_logo_link"])
    except Exception as e:
        raise ValueError(f"URL {company_metadata_json['website']} is likely an invalid format, must contain `http://` or `https://`")
    if [response.status_code] in [200, 301, 302, 303, 304]:
        raise ValueError(f"URL {company_metadata_json['company_logo_link']} is not valid, got status: {response.status_code}, response: {response.text}")
    # check the logo is an image
    if not response.headers['Content-Type'].startswith('image'):
        raise ValueError(f"URL {company_metadata_json['company_logo_link']} is not an image")
    # check the logo is of a specific format (png, jpg, jpeg, svg)
    if not response.headers['Content-Type'].endswith(('png', 'jpg', 'jpeg')):
        raise ValueError(f"URL {company_metadata_json['company_logo_link']} is not a valid image format")
def get_company_metadata(stock_code):
    attempts = 5  # Initialize attempt count
    while attempts > 0:
        try:
            response = openapi_client.chat.completions.create(
                model='gpt-3.5-turbo-0125',
                response_format={"type": "json_object"},
                messages=core_messages + [{"role": "user", "content": f"give me information on the ASX company with stock code {stock_code}"}]
            )
            company_metadata_json = json.loads(response.choices[0].message.content)
            validate_company_metadata(company_metadata_json)
        except json.JSONDecodeError as e:
            # Error handling specifically for JSON decoding errors
            print(f"JSON decode error on attempt {8 - attempts}: {str(e)}")
            # Update the retry message for specific errors
            core_messages.append({"role": "user", "content": f"the JSON object provided is not valid, please provide a valid JSON object. Here is the error message: {str(e)}"})
            attempts -= 1
            # Missing the decrement for attempts here, also this block might not reset attempts correctly
        except ValueError as e:
            # Error handling for validation failures
            if "URL" in str(e) and "is not valid" in str(e):
                # Fetching page content should be within try-except block
                try:
                    page_content = ' '.join(httpx.get(company_metadata_json['website']).text.split(" ")[:400])
                    msg = {"role": "user", "content": f"an error was seen in one of the URLs: {str(e)} is and/or is not valid, please provide a valid URL. Here is the content of the company website: {page_content}, use this to potentially find a more valid URL"}
                    if not any([page_content in m.get("content", "") for m in core_messages]):
                        core_messages.append(msg)
                except Exception as httpx_error:
                    print(f"HTTPX error when fetching page content: {httpx_error}")
                    core_messages.append({"role": "user", "content": f"couldn't use the company website link provided to get additional metadata, please ensure this link is correct. Here is the error message: {str(httpx_error)}"})
            elif "is not an image" in str(e):
                 # Fetching page content should be within try-except block
                try:
                    page_content = ' '.join(httpx.get(company_metadata_json['website']).text.split(" ")[:400])
                    msg = {"role": "user", "content": f"an error was seen in one of the URLs: {str(e)} is and/or is not valid, please provide a valid URL. Here is the content of the company website: {page_content}, use this to potentially find a more valid URL for the image"}
                    if not any([page_content in m.get("content", "") for m in core_messages]):
                        core_messages.append(msg)
                except Exception as httpx_error:
                    print(f"HTTPX error when fetching page content: {httpx_error}")
                    core_messages.append({"role": "user", "content": f"couldn't use the company website link provided to get additional metadata, please ensure this link is correct. Here is the error message: {str(httpx_error)}"})
            core_messages.append({"role": "user", "content": f"the JSON object provided is not valid according to the provided schema, please ensure the structure is correct as per the schema. Here is the error message: {str(e)}"})
            attempts -= 1
            print(f"Validation error on attempt {8 - attempts}: {str(e)}")
        except Exception as e:
            # For unexpected errors, decrementing attempts and logging the attempt
            print(f"Unexpected error on attempt {8 - attempts}: {str(e)}, {e}")
            attempts -= 1
            continue  # Continue to the next iteration if it's not the final attempt

            # Common processing after successful response
        try:
            df = pd.json_normalize(company_metadata_json)
            df['stock_code'] = stock_code
            df.index = [stock_code]
            return df
        except Exception as e:
            print(f"Error normalizing JSON to DataFrame: {str(e)}")
            core_messages.append({"role": "user", "content": f"failed to normalize JSON object to DataFrame, likely malformed json, please ensure the structure is correct as per the schema"})
            attempts -= 1

    # After all attempts
    response = openapi_client.chat.completions.create(
                model='gpt-4-0125-preview',
                response_format={"type": "json_object"},
                messages=core_messages + [{"role": "user", "content": f"give me information on the ASX company with stock code {stock_code}"}]
            )
    company_metadata_json = json.loads(response.choices[0].message.content)
    df = pd.json_normalize(company_metadata_json)
    df['stock_code'] = stock_code
    df.index = [stock_code]
    return df
    # raise Exception(f"Failed to retrieve and process company metadata after 8 attempts. stock: {stock_code} messages: {core_messages}")


ImportError: cannot import name 'OpenAI' from 'openai' (/opt/homebrew/lib/python3.11/site-packages/openai/__init__.py)

In [20]:

res = get_company_metadata('14D')

res

Unnamed: 0,company_name,address,summary,details,website,website_about,company_logo_link,directors,references,stock_code
14D,1414 Degrees Limited,"25 North Terrace, Adelaide, South Australia, 5000","1414 Degrees is a developer of sustainable, cl...",1414 Degrees operates in the energy storage in...,https://1414degrees.com.au/,https://1414degrees.com.au/investors/,https://1414degrees.com.au/wp-content/uploads/...,"[{'name': 'Jim Caddy', 'title': 'Chairman'}, {...",[{'url': 'https://www.asx.com.au/asx/share-pri...,14D


In [54]:
# download asx directory from asx
# from: https://www.asx.com.au/markets/trade-our-cash-market/directory
import time
asx_company_url = 'https://asx.api.markitdigital.com/asx-research/1.0/companies/directory/file?access_token=83ff96335c2d45a094df02a206a39ff4'
company_list_path = f"data/ASX_Listed_Companies_07-04-2024_11-03-45_AEST.csv"
download_file(client, data_url, company_list_path)
import chardet
with open(company_list_path, 'rb') as f:
    result = chardet.detect(f.read(10000))
encoding = result['encoding']
df_company_list = pd.read_csv(company_list_path, encoding=encoding, engine='python', sep=None)
df_company_list

# iterate through the stocks and get the metadata from the openAI query. Parallelise this using dask
stocks = df_company_list['ASX code'].tolist()

delayed_results = [dask.delayed(get_company_metadata)(stock) for stock in stocks]

ddf = dd.from_delayed(delayed_results)


with ProgressBar():
    agg_df = ddf.compute()
    ddf.result()

agg_df.head()

[##                                      ] | 5% Completed | 17m 56sss


KeyboardInterrupt: 

In [55]:
agg_df.iloc[10].company_logo_link

'https://www.1414degrees.com.au/wp-content/uploads/2023/04/1414degrees-GreyRed-RGB.png'

In [None]:
# TODO:
# refactor code into a resolver pattern, where each field of the schema has its own "resolver". 
# Each resolver can then have a more specific context provided and appropriate error handling, as well as fallback/default behaviour. 
# Some advantages this will provide:
# * after x attempts to resolve a field, we can provide a default value
# * we can provide more specific error handling for each field
# * we can provide more specific context for each field
# * we can provide more specific error messages for each field and typing
# * we can provide more specific error messages for each field and validation
# * we could enhance the parallelisation of the data discovery as we could break up work across different resolvers, however would likely translate to even more queries and cost etc.
# * resolvers would be entirely domain/context specific workflows and hence can be customised and extended as needed