In [18]:
import requests
import pysqlite3
import os
from datetime import datetime
from dotenv import load_dotenv

import libsql_experimental as libsql

load_dotenv()


True

In [19]:
""" Connect to Turso - embedded replicas """

DB_URL = os.getenv("TURSO_DATABASE_URL")
AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN")


conn = libsql.connect("../data/primary.db", sync_url=DB_URL, auth_token=AUTH_TOKEN)

conn.sync()

[2m2024-12-03T05:42:15.450943Z[0m [32m INFO[0m [2mlibsql::replication::remote_client[0m[2m:[0m Attempting to perform handshake with primary.
[2m2024-12-03T05:42:15.732086Z[0m [33m WARN[0m [2mlibsql::replication::remote_client[0m[2m:[0m Frames prefetching failed because of new session token returned by handshake


In [13]:
""" load env vars and define paths """

ALPHA_VANTAGE_SECRET = os.getenv("ALPHA_VANTAGE_SECRET")
OPENAI_SECRET = os.getenv("OPEN_AI_SECRET")

ABS_BASE_API_PATH = "https://api.data.abs.gov.au/sdmx-json/data"
ALPHA_VANTAGE_BASE_API_PATH = "https://www.alphavantage.co/query"

# DB_PATH = "../data/macro.db"

# You can specify the response in the API URL using the "format" query parameter. E.g: https://api.data.abs.gov.au/data/jv/all?startPeriod=2020&format=jsondata

# XML is returned by default
# Structure specific XML (good for time series): "format=structurespecificdata"
# JSON: "format=jsondata"
# CSV: "format=csv"

# You can also use the "accept" header to specify the response format as a header when you make an API call. E.g: "accept: application/xml"

# XML: application/xml
# Structure specific XML: application/vnd.sdmx.structurespecificdata+xml
# JSON: application/vnd.sdmx.data+json
# CSV: text/csv
# CSV: application/vnd.sdmx.data+csv
# CSV with labels for codelists: application/vnd.sdmx.data+csv;labels=both
# CSV file with labels for codelists: application/vnd.sdmx.data+csv;file=true;labels=bot


ALPHA_VANTAGE_SECRET

'GM4049I2JTJHM5J6'

In [14]:
""" Test API keys """

# Test with Real GDP endpoint
def test_api_key_av(key):
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "REAL_GDP",
        "apikey": key
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        print("API key is working!")
        print(response.json())
    else:
        print(f"Error: {response.status_code} - {response.text}")

test_api_key_av(ALPHA_VANTAGE_SECRET)

API key is working!
{'name': 'Real Gross Domestic Product', 'interval': 'annual', 'unit': 'billions of dollars', 'data': [{'date': '2023-01-01', 'value': '22671.096'}, {'date': '2022-01-01', 'value': '22034.828'}, {'date': '2021-01-01', 'value': '21494.798'}, {'date': '2020-01-01', 'value': '20267.585'}, {'date': '2019-01-01', 'value': '20715.671'}, {'date': '2018-01-01', 'value': '20193.896'}, {'date': '2017-01-01', 'value': '19612.102'}, {'date': '2016-01-01', 'value': '19141.672'}, {'date': '2015-01-01', 'value': '18799.622'}, {'date': '2014-01-01', 'value': '18261.714'}, {'date': '2013-01-01', 'value': '17812.167'}, {'date': '2012-01-01', 'value': '17442.759'}, {'date': '2011-01-01', 'value': '17052.41'}, {'date': '2010-01-01', 'value': '16789.75'}, {'date': '2009-01-01', 'value': '16349.11'}, {'date': '2008-01-01', 'value': '16781.485'}, {'date': '2007-01-01', 'value': '16762.445'}, {'date': '2006-01-01', 'value': '16433.148'}, {'date': '2005-01-01', 'value': '15987.957'}, {'date'

In [None]:
# """ Define data fetchers for AUSTRALIA """

# def fetch_cpi_data():
#     # CPI request params
#     dataflow_identifier = "ABS,CPI,1.0.0"
#     data_key = "all"
#     params = {
#         "startPeriod": "2023-Q1",
#         "endPeriod": "2024-Q1",
#         "detail": "dataonly",
#         "dimensionAtObservation": "TIME_PERIOD"
#     }
    
#     url = f"{ABS_BASE_API_PATH}/{dataflow_identifier}/{data_key}"
    
#     res = requests.get(url, params=params)
    
#     if res.status_code == 200:
#         data = res.json()
#         return data
#     else:
#         print(f"Error: {res.status_code} - {res.text}")
#         return None
    



In [None]:
# """ Fetch data for AUSTRALIA """

# cpi_data = fetch_cpi_data()
# if cpi_data:
#     print("Data fetched successfully")
# else:
#     print("Failed to fetch data.")

Error: 403 - {"message":"Missing Authentication Token"}
Failed to fetch data.


In [16]:
""" Define data fetchers and savers for US """

def fetch_economic_indicators_data_av(function, params):
    query_params = {**params, "function": function, "apikey": ALPHA_VANTAGE_SECRET} if params else {"function": function, "apikey": ALPHA_VANTAGE_SECRET}
    
    response = requests.get(ALPHA_VANTAGE_BASE_API_PATH, params=query_params)
    
    if response.status_code == 200:
        data = response.json()
        print(f"Response for {function}: {data}")
        return data
    
    else:
        print(f"Error fetching {function}: {response.status_code}")
        return None
    
def fetch_forex_data(function, from_currency, to_currency, **kwargs):
    params = {
        "function": function,
        "from_currency" if function == "CURRENCY_EXCHANGE_RATE" else "from_symbol": from_currency,
        "to_currency" if function == "CURRENCY_EXCHANGE_RATE" else "to_symbol": to_currency,
        "apikey": ALPHA_VANTAGE_SECRET,
        **kwargs
    }
        
    response = requests.get(ALPHA_VANTAGE_BASE_API_PATH, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Error {response.status_code}: {response.text}")
    
def fetch_commodity_data(function, **kwargs):
    params = {
        "function": function,
        "apikey": ALPHA_VANTAGE_SECRET,
        **kwargs
    }
    
    response = requests.get(ALPHA_VANTAGE_BASE_API_PATH, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Error {response.status_code}: {response.text}")
    
    
def filter_by_date(data, start_date=None, end_date=None):
    # Convert start_date and end_date to datetime objects
    if start_date:
        start_date = datetime.strptime(start_date, "%Y-%m-%d")
    if end_date:
        end_date = datetime.strptime(end_date, "%Y-%m-%d")

    # Filter the time series data
    if "data" in data:
        filtered_data = []
        for item in data["data"]:
            item_date = datetime.strptime(item["date"], "%Y-%m-%d")
            if (not start_date or item_date >= start_date) and (not end_date or item_date <= end_date):
                filtered_data.append(item)
        data["data"] = filtered_data

    return data
    
def save_economic_indicator_data(category, data, time_series_key="data"):
    # conn = pysqlite3.connect(DB_PATH)
    # cursor = conn.cursor()
    
    conn.execute(f"""
    CREATE TABLE IF NOT EXISTS {category} (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        date TEXT,
        value REAL
    )           
    """)
    
    for item in data[time_series_key]:
        date = item.get("date")
        value = item.get("value")
        conn.execute(f"""
        INSERT INTO {category} (date, value) VALUES (?, ?)                        
        """, (date, value))
        
    conn.commit()
    # conn.close()
    conn.sync() # embedded replicas only
    
def save_forex_to_db(category, data, time_series_key="Time Series FX (Daily)"):
    # conn = pysqlite3.connect(DB_PATH)
    # cursor = conn.cursor()

    # Create table
    conn.execute(f"""
    CREATE TABLE IF NOT EXISTS {category} (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        date TEXT,
        open REAL,
        high REAL,
        low REAL,
        close REAL
    )
    """)

    # Insert data
    if time_series_key in data:
        for date, values in data[time_series_key].items():
            conn.execute(f"""
            INSERT INTO {category} (date, open, high, low, close) 
            VALUES (?, ?, ?, ?, ?)
            """, (date, float(values["1. open"]), float(values["2. high"]), float(values["3. low"]), float(values["4. close"])))

    conn.commit()
    conn.sync()
    
def save_commodity_to_db(category, data, time_series_key="data"):
    # conn = pysqlite3.connect(DB_PATH)
    # cursor = conn.cursor()

    # Create table
    conn.execute(f"""
    CREATE TABLE IF NOT EXISTS {category} (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        date TEXT,
        value REAL
    )
    """)

    # Insert data
    for item in data[time_series_key]:
        conn.execute(f"""
        INSERT INTO {category} (date, value) VALUES (?, ?)
        """, (item["date"], item["value"]))

    conn.commit()
    # conn.close()
    conn.sync()
    
# def update_raw_data(indicators):
#     results = {"success": [], "errors": []}
#     for category, details in indicators.items():
#         try:
#             print(f"Fetching {category}...")
#             data = fetch_economic_indicators_data_av(details["function"], details["params"])
#             save_economic_indicator_data(category, data)
#             results["success"].append(category)
#             print(f"{category} updated successfully.")
#         except Exception as e:
#             results["errors"].append({"category": category, "error": str(e)})
#             print(f"Error fetching {category}: {e}")

#     return results
    
    
def update_all_data(indicators):
    results = {"success": [], "errors": []}

    for category, details in indicators.items():
        try:
            print(f"Fetching {category}...")
            if "from_currency" in details:
                data = fetch_forex_data(**details)
                save_forex_to_db(category, data)
            elif "commodity" in category:
                data = fetch_commodity_data(**details)
                save_commodity_to_db(category, data)
            else:
                data = fetch_economic_indicators_data_av(details["function"], details["params"])
                save_economic_indicator_data(category, data)
            results["success"].append(category)
            print(f"{category} updated successfully.")
        except Exception as e:
            results["errors"].append({"category": category, "error": str(e)})
            print(f"Error fetching {category}: {e}")

    return results
    
    

In [17]:
""" Define main US fetching loop """

indicators = {
    # Economic indicators
    "real_gdp": {"function": "REAL_GDP", "params": {"interval": "quarterly"}},
    "cpi": {"function": "CPI", "params": {"interval": "monthly"}},
    "treasury_yields": {"function": "TREASURY_YIELD", "params": {"interval": "monthly", "maturity": "10year"}},
    "federal_funds_rate": {"function": "FEDERAL_FUNDS_RATE", "params": {"interval": "monthly"}},
    "unemployment_rate": {"function": "UNEMPLOYMENT", "params": {}},
    "inflation_rate": {"function": "INFLATION", "params": {}},
    
    # Forex
    "forex_usd_aud": {"function": "FX_DAILY", "from_currency": "USD", "to_currency": "AUD"},
    "forex_aud_jpy": {"function": "FX_DAILY", "from_currency": "AUD", "to_currency": "JPY"},

    # Commodities
    "commodity_wti": {"function": "WTI", "interval": "monthly"},
    "commodity_brent": {"function": "BRENT", "interval": "monthly"},
    "commodity_copper": {"function": "COPPER", "interval": "monthly"},
    "commodity_aluminum": {"function": "ALUMINUM", "interval": "monthly"}
}

results = update_all_data(indicators)

# Print Results
print("\nSummary:")
print(f"Success: {results['success']}")
if results["errors"]:
    print("Errors:")
    for error in results["errors"]:
        print(f"- {error['category']}: {error['error']}")
        
        

Fetching real_gdp...
Response for REAL_GDP: {'name': 'Real Gross Domestic Product', 'interval': 'quarterly', 'unit': 'billions of dollars', 'data': [{'date': '2024-07-01', 'value': '5866.67'}, {'date': '2024-04-01', 'value': '5817.169'}, {'date': '2024-01-01', 'value': '5638.455'}, {'date': '2023-10-01', 'value': '5831.583'}, {'date': '2023-07-01', 'value': '5724.051'}, {'date': '2023-04-01', 'value': '5646.288'}, {'date': '2023-01-01', 'value': '5469.175'}, {'date': '2022-10-01', 'value': '5668.877'}, {'date': '2022-07-01', 'value': '5544.386'}, {'date': '2022-04-01', 'value': '5484.289'}, {'date': '2022-01-01', 'value': '5337.277'}, {'date': '2021-10-01', 'value': '5591.973'}, {'date': '2021-07-01', 'value': '5408.478'}, {'date': '2021-04-01', 'value': '5360.97'}, {'date': '2021-01-01', 'value': '5133.377'}, {'date': '2020-10-01', 'value': '5298.828'}, {'date': '2020-07-01', 'value': '5134.93'}, {'date': '2020-04-01', 'value': '4783.855'}, {'date': '2020-01-01', 'value': '5049.973'},

In [76]:
""" Define fetcher for news sentiment individually """


def fetch_news_sentiment(tickers=None, topics=None, time_from=None, time_to=None, sort="LATEST", limit=50):
    """
    Fetch market news and sentiment from Alpha Vantage.
    
    :param tickers: Comma-separated string of tickers (e.g., 'AAPL,TSLA')
    :param topics: Comma-separated string of topics (e.g., 'economy_macro,technology')
    :param time_from: Start time in YYYYMMDDTHHMM format (e.g., '20231101T0000')
    :param time_to: End time in YYYYMMDDTHHMM format (optional)
    :param sort: Sorting order (LATEST, EARLIEST, RELEVANCE)
    :param limit: Number of articles to fetch (default is 50)
    :return: JSON response from the API
    """
    base_url = "https://www.alphavantage.co/query"
    params = {
        "function": "NEWS_SENTIMENT",
        "apikey": ALPHA_VANTAGE_SECRET,
        "tickers": tickers,
        "topics": topics,
        "time_from": time_from,
        "time_to": time_to,
        "sort": sort,
        "limit": limit
    }

    response = requests.get(base_url, params={k: v for k, v in params.items() if v})
    if response.status_code == 200:
        data = response.json()
        print(f"Response for NEWS_SENTIMENT: {data}")
        return data
    
    else:
        print(f"Error fetching NEWS_SENTIMENT: {response.status_code}")
        raise Exception(f"Error {response.status_code}: {response.text}")
     

In [None]:
def save_news_to_db(data):
    # conn = pysqlite3.connect(DB_PATH)
    # cursor = conn.cursor()
    
    # Create table
    conn.execute(f"""
    CREATE TABLE IF NOT EXISTS news_sentiment (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        published_at TEXT,
        headline TEXT,
        summary TEXT,
        sentiment_score REAL,
        ticker TEXT,
        topic TEXT
    )
    """)

    # Insert data into the table
    for article in data.get("feed", []):
        conn.execute("""
        INSERT INTO news_sentiment (published_at, headline, summary, sentiment_score, ticker, topic)
        VALUES (?, ?, ?, ?, ?, ?)
        """, (
            article["time_published"],
            article["title"],
            article["summary"],
            article.get("overall_sentiment_score", None),
            article.get("tickers", None),
            article.get("topics", None)
        ))

    conn.commit()
    conn.sync()

In [None]:
# broad
news_data = fetch_news_sentiment(
    topics="economy_macro,economy_fiscal,economy_monetary",
    time_from="20231101T0000",
    sort="LATEST",
    limit=100
)

# # forex
# news_data = fetch_news_sentiment(
#     tickers="FOREX:USD",
#     topics="economy_macro",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=50
# )

# # financial markets
# news_data = fetch_news_sentiment(
#     topics="financial_markets",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # company earnings
# news_data = fetch_news_sentiment(
#     topics="earnings",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # energy and transportation
# news_data = fetch_news_sentiment(
#     topics="energy_transportation",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )


# # real estate and construction
# news_data = fetch_news_sentiment(
#     topics="real_estate",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # Technology
# news_data = fetch_news_sentiment(
#     topics="technology",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )


# # Geopolitical
# news_data = fetch_news_sentiment(
#     topics="economy_macro",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )


# # central bank and policy
# news_data = fetch_news_sentiment(
#     topics="economy_monetary",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # global trade and commodities
# news_data = fetch_news_sentiment(
#     topics="economy_fiscal,economy_macro",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )


# # behavioural indicators
# news_data = fetch_news_sentiment(
#     topics="retail_wholesale",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # Climate and env 
# news_data = fetch_news_sentiment(
#     topics="energy_transportation",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )

# # comprehensive multi-topic
# news_data = fetch_news_sentiment(
#     topics="economy_macro,financial_markets,economy_fiscal,economy_monetary,energy_transportation",
#     time_from="20231101T0000",
#     sort="LATEST",
#     limit=100
# )


save_news_to_db(news_data)


Response for NEWS_SENTIMENT: {'Information': 'Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly remove all daily rate limits.'}
Response for NEWS_SENTIMENT: {'Information': 'Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly remove all daily rate limits.'}
Response for NEWS_SENTIMENT: {'Information': 'Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly remove all daily rate limits.'}
Response for NEWS_SENTIMENT: {'Information': 'Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/pr