In [3]:
import psycopg2
import dotenv
from psycopg2 import extras
import timescale_vector.client as tsv_client
import os
import json 
import pandas as pd
from openai import OpenAI

### Setup Timescale

In [4]:
dotenv.load_dotenv()

# -- Timescale

TIMESCALE_USER = os.getenv("TIMESCALE_USER")
TIMESCALE_PASSWORD = os.getenv("TIMESCALE_PASSWORD")
TIMESCALE_PORT = os.getenv("TIMESCALE_PORT")
TIMESCALE_HOST = os.getenv("TIMESCALE_HOST")
TIMESCALE_DB = os.getenv("TIMESCALE_DB")
CONNECTION = f"postgres://{TIMESCALE_USER}:{TIMESCALE_PASSWORD}@{TIMESCALE_HOST}:{TIMESCALE_PORT}/{TIMESCALE_DB}"

conn = psycopg2.connect(CONNECTION, cursor_factory=extras.DictCursor)
cur = conn.cursor()


vec = tsv_client.Sync(
    service_url=CONNECTION, table_name="fortune_100", num_dimensions=1536
)
vec.create_tables()

# -- OpenAI

oai_client = OpenAI()

### Load company data to Timescale 

In [57]:
# cur.execute("DROP TABLE IF EXISTS fortune_100")
# conn.commit()

In [8]:
import uuid


def get_embeddings(text: str) -> list[int]:
    return (
        oai_client.embeddings.create(model="text-embedding-ada-002", input=text)
        .data[0]
        .embedding
    )


def prepare_data_for_timescaledb(df: pd.DataFrame) -> list[tuple]:
    """ 
    For the timescale_vector client, the data needs to be in the following schema:
    - id is the UUID which uniquely identifies each vector.
    - metadata is a JSONB column which stores the metadata associated with each vector.
    - contents is the text column which stores the content we want vectorized (in this case the company description).
    - embedding is the vector column which stores the vector embedding representation of the content.
    """
    uuid_list = [
        uuid.uuid5(uuid.NAMESPACE_DNS, row["company"]) for i, row in df.iterrows()
    ]
    metadata_list = [
        {k: v for k, v in row.items() if not pd.isnull(v) and k != "description"}
        for i, row in df.iterrows()
    ]
    content_list = df["description"].tolist()
    embedding_list = [get_embeddings(description) for description in content_list]

    # Create list of tuples/records
    records = list(zip(uuid_list, metadata_list, content_list, embedding_list))
    return records




def read_csv(fpath: str) -> pd.DataFrame:
    df = pd.read_csv(fpath)
    df.columns = df.columns.str.replace(" ", "_").str.lower()
    df["market_cap"] = df["market_cap"].replace("-", 0).astype(float)
    return df


In [None]:
df = read_csv("data/fortune_100_with_descriptions.csv")
records = prepare_data_for_timescaledb(df)
vec.upsert(records)

### Simple Vector Search

In [None]:
query = "Robotics"
vec_query = get_embeddings(query)

results = vec.search(query_embedding=vec_query, limit=4)
for row in results:
    print(row.get("metadata").get("company"))
    print(row.get("contents") + '\n')

### Add news & sales data

In [32]:
from pgcopy import CopyManager
import eventregistry


def get_company_news(company_name: str, n_results: int, lang: str = "eng") -> list[dict]:
    """
    Get company news from Event Registry API
    """
    er = eventregistry.EventRegistry(apiKey = os.getenv("EVENT_REGISTRY_API_KEY"))
    q = eventregistry.QueryArticlesIter(conceptUri = er.getConceptUri(company_name), keywords=company_name, keywordsLoc="title", lang=["eng"])
    q.setRequestedResult(eventregistry.RequestArticlesInfo(count = n_results))
    res = er.execQuery(q)
    return res['articles']['results']


def upload_news_to_timescaledb(news: list[dict], company_id: str):
    for article in news:
        cur.execute(
            "INSERT INTO news (company_id, title, body, url, published_at) VALUES (%s, %s, %s, %s, %s)",
            (
                company_id,
                article["title"],
                article["body"],
                article["url"],
                article["dateTimePub"],
            ),
        )
        conn.commit()


def upload_sales_to_timescaledb(sales: pd.DataFrame):
    with conn.cursor() as cur:
        sales['date'] = pd.to_datetime(sales['date'])
        data = list(sales.itertuples(index=False, name=None))
        copy_manager = CopyManager(conn, "sales", ["transaction_id", "company_id", "date", "amount"])
        try:
            copy_manager.copy(data)
        except ValueError as e:
            print(f"Error occurred: {e}")
        conn.commit()


In [None]:
# -- Create news and sales tables
cur.execute("DROP TABLE IF EXISTS sales")
cur.execute("DROP TABLE IF EXISTS news")
cur.execute(
    "CREATE TABLE IF NOT EXISTS news (id SERIAL PRIMARY KEY, company_id TEXT, title TEXT, body TEXT, url TEXT, published_at TIMESTAMP)"
)
cur.execute(
    "CREATE TABLE IF NOT EXISTS sales (transaction_id TEXT, company_id TEXT, date TIMESTAMP, amount FLOAT)"
)
conn.commit()

# -- Fetch & upload news data
"""
You can also use the pre-loaded news in 'data/all_news.json', if you don't want to sign up for the event registry API. 
However, the news-json does not have a mapping to the companies. 
You could create this mapping by iterating over the companies, and matching them with news that contain the company name in the title.
"""
cur.execute("SELECT * FROM fortune_100")
rows = cur.fetchall()
for i, row in enumerate(rows):
    print(i)
    company = (row.get('metadata').get('company'))
    company_id = str(row.get('id'))
    try:
        news = get_company_news(company, n_results=5)
        upload_news_to_timescaledb(news, company_id)
    except Exception as e:
        print(f"Error fetching news for {company}: {e}")

# -- Upload sales data from csv

sales_df = pd.read_csv("data/dummy_sales_transactions.csv")
upload_sales_to_timescaledb(sales_df)

In [41]:
conn.rollback()

In [114]:
cur.execute("""
    SELECT * 
    FROM news 
    JOIN (
            SELECT * from fortune_100 
            WHERE metadata->>'company' = 'Walmart'
        ) AS walmart
    ON news.company_id = walmart.id::text
"""
)
news_data = cur.fetchall()
for row in news_data:
    print(row)

[1, '73d5d45c-2547-580c-8421-e4819278af3a', 'Walmart $4 find makes car interior look brand new thanks to deep spring clean', 'DRIVERS are in the mood for a vehicle re-vamp after a TikToker showed them how their car can look brand new in just a few hours.\n\nWhile many people start spring cleaning their homes, TikTok users have been reminded not to forget their cars.\n\nNatalie Steinman (@nataliesteinman) showcased how she gets her vehicle looking brand new with some elbow grease, easy changes, and a little-known children\'s toy that doubles as a cleaning product.\n\n"Let the spring cleaning comments," Steinman told viewers in the caption of the video.\n\nSteinman focuses entirely on the interior of her car and starts by filling a trash bag full of receipts, empty bottles, and food wrappers.\n\nShe then removes the mats which she later cleans with soap and water and scrubs with a toilet brush.\n\nUsing a car hoover she then removes any crumbs and dirt from her Ford\'s carpets and storag

### Combined vector + relational queries

In [115]:
# -- Get result for vector search

vec_query = get_embeddings("Smartphones, design")
# vec_results = vec.search(query_embedding=vec_query, limit=3)
vec_results = vec.search(
    query_embedding=vec_query, limit=3, 
    filter={"state": "CA"}, 
    predicates=tsv_client.Predicates("num_of_employees", ">", 100000)
)


# -- Get related sales data
for result in vec_results:
    id = result.get('id')
    cur.execute(f"SELECT SUM(amount) FROM sales WHERE company_id = '{id}'")
    sales = cur.fetchall()[0][0]
    print(f"{result.get('metadata').get('company')}: {sales}$")

Apple: 273503.0$
Intel: 247760.0$
Alphabet: 248683.0$


### Construct queries from natural language

In [26]:
import instructor
import enum
from typing import List
from pydantic import Field, BaseModel
from datetime import date
import json 
from timescale_vector import client as tsv_client


vec = tsv_client.Sync(
    service_url=CONNECTION, table_name="fortune_100", num_dimensions=1536
)

oai_client = OpenAI()

client = instructor.patch(oai_client)

query_planner_prompt = f"""
You are a world class query planning algorithm capable of breaking apart questions into its dependency queries,
such that the answers can be used to inform the parent question. 
1. The queries should be of three types, depending on what is searched: "COMPANY", "SALES" or "NEWS".
2. Before running SALES or NEWS queries, we first need a company for which we are finding sales/news for. 
3. Determine parameters to be passed to each query. 
    3.1 for COMPANY-queries, determine an appropriate 'keywords'-parameter, which is list of keywords that can be used for finding the company. Where applicable, return also a 'filter'-parameter, in the format of [{{'field':'value'}},..]. Filters can be applied to the following fields: 'state', a two-letter state code, or 'city', the company's city. For example, {{'state':'NY'}}
    3.2 for NEWS- and SALES-queries, determine an optional daterange if specified, with parameters 'min_date' and 'max_date'. The current date is {date.today()}.
4. Do not answer the questions, simply provide a correct compute graph with good specific questions to ask and relevant dependencies. 
5. Before you call the function, think step-by-step to get a better understanding of the problem.
"""


class QueryType(str, enum.Enum):
    """Enumeration representing the types of queries that can be made to our database"""

    COMPANY_SEARCH = "COMPANY"
    NEWS_SEARCH = "NEWS"
    SALES_SEARCH = "SALES"


class Query(BaseModel):
    """Class representing a single query in a query plan."""

    id: int = Field(..., description="Unique ID of the query")

    parameters: dict = Field(
        ..., description="The parameters to be passed to the query"
    )

    dependencies: List[int] = Field(
        description="List of IDs of queries that need to be answered before this query"
    )

    query_type: QueryType = Field(
        description="Type of query, either a vector query, news query or sales query"
    )

    def run(self) -> list[tuple]:
        """Runs a query depending on the query_type"""
        if self.query_type == QueryType.COMPANY_SEARCH:
            vector_query = get_embeddings(self.parameters.get("keywords"))
            filters = self.parameters.get("filter")
            results = vec.search(vector_query, limit=3, filter=filters)
            return [(r.get('metadata').get('company'), str(r.get('id'))) for r in results]

        elif self.query_type == QueryType.SALES_SEARCH:
            query = f"SELECT SUM(amount) FROM sales WHERE company_id = '{self.parameters.get('company_id')}'"
            if "min_date" in self.parameters and "max_date" in self.parameters:
                query += f" AND date BETWEEN '{self.parameters.get('min_date')}' AND '{self.parameters.get('max_date')}'"
            elif "min_date" in self.parameters:
                query += f" AND date >= '{self.parameters.get('min_date')}'"
            elif "max_date" in self.parameters:
                query += f" AND date <= '{self.parameters.get('max_date')}'"
            cur.execute(query)
            results = cur.fetchall()
            return results

        elif self.query_type == QueryType.NEWS_SEARCH:
            query = f"SELECT title FROM news WHERE company_id = '{self.parameters.get('company_id')}'"
            if "min_date" in self.parameters and "max_date" in self.parameters:
                query += f" AND published_at BETWEEN '{self.parameters.get('min_date')}' AND '{self.parameters.get('max_date')}'"
            elif "min_date" in self.parameters:
                query += f" AND published_at >= '{self.parameters.get('min_date')}'"
            elif "max_date" in self.parameters:
                query += f" AND published_at <= '{self.parameters.get('max_date')}'"

            cur.execute(query)
            results = cur.fetchall()
            return [r.get('title') for r in results]


class QueryPlan(BaseModel):
    """Container class representing a tree of queries to run against a database, to answer a user's question"""

    query_plan: List[Query] = Field(
        ..., description="The query plan representing the queries to run"
    )

    def execute(self):
        # Dict to store results from all queries
        results = {}

        # Execute the queries in the order of their dependencies
        for query in self.query_plan:
            print(f"Running query {query.id}: {query.query_type.name}")
            if not query.dependencies:
                results[query.query_type] = query.run()
                company_ids = [r[1] for r in results[query.query_type]]
            else:
                # Then execute the query for all companies returned by dependency
                all_results = []
                for id in company_ids:
                    query.parameters['company_id'] = id
                    all_results.append(query.run())
                results[query.query_type] = all_results
        
        return results
        

def query_planner(question: str) -> QueryPlan:
    PLANNING_MODEL = "gpt-4-0613"
    messages = [
        {"role": "system", "content": query_planner_prompt},
        {
            "role": "user",
            "content": f"Consider: {question}\n Generate the correct query plan.",
        },
    ]
    plan = client.chat.completions.create(
        model=PLANNING_MODEL,
        response_model=QueryPlan,
        messages=messages,
        temperature=0,
        max_tokens=1000,
    )
    return plan

In [43]:
question = "Get me AI companies in California, and last weeks news related to them, and last months sales for each"
plan = query_planner(question)
plan.model_dump()

{'query_plan': [{'id': 1,
   'parameters': {'keywords': ['AI'], 'filter': [{'state': 'CA'}]},
   'dependencies': [],
   'query_type': <QueryType.COMPANY_SEARCH: 'COMPANY'>},
  {'id': 2,
   'parameters': {'min_date': '2024-03-15', 'max_date': '2024-03-22'},
   'dependencies': [1],
   'query_type': <QueryType.NEWS_SEARCH: 'NEWS'>},
  {'id': 3,
   'parameters': {'min_date': '2024-02-22', 'max_date': '2024-03-22'},
   'dependencies': [1],
   'query_type': <QueryType.SALES_SEARCH: 'SALES'>}]}

In [44]:
results = plan.execute()

for k, v in results.items():
    if k == QueryType.COMPANY_SEARCH:
        print("Companies:")
    if k == QueryType.SALES_SEARCH:
        print("Sales:")
    if k == QueryType.NEWS_SEARCH:
        print("News:")
    for val in v:
        print(val)

Running query 1: COMPANY_SEARCH
Running query 2: NEWS_SEARCH
Running query 3: SALES_SEARCH
Companies:
('Apple', '33a3f638-20dc-5797-a9e2-660ae1bad782')
('Intel', '390293f2-6297-554c-8a51-93d478191795')
('Alphabet', '73b37e59-1e24-5e0b-afbf-e84263089fbe')
News:
["Tech giants show Epic support against Apple's payment rules", "The 30 Best Tech Deals in Amazon's Big Spring Sale  --  Including Apple, Beats, Dyson, and More Up to 77% Off", 'Meta, Microsoft slam Apple over app store policy']
['Intel Gets $20Bn From CHIPS Fund, Plans To Invest $100Bn In US', 'Intel To Receive Up To $8.5 Billion In CHIPS Act Grants As US Speeds Up Semiconductor Manufacturing Drive', 'Intel To Receive Up To $8.5 Billion In CHIPS Act Grants As US Speeds Up Semiconductor Manufacturing Drive']
['Is Alphabet (GOOG) Stock Trading at a Reasonable Valuation?', 'Alphabet Inc CEO Sundar Pichai Sells 22,500 Shares', "Gene Munster Agrees With Dan Nathan's Views That These 2 Magnificent 7 Companies Are AI Underdogs: 'I Beli

### Generate answer

In [46]:
answer_prompt = """
You will respond to a user's question, based on the results of several queries.
"""
def generate_answer(question, results):
    answer = client.chat.completions.create(
        model="gpt-4-0613",
        messages=[
            {"role": "system", "content": answer_prompt},
            {"role": "user", "content": f"User question: {question}\n Query results: {results}"},
        ],
        temperature=0,
        max_tokens=1000,
    ).choices[0].message.content
    return answer

a = generate_answer("Get me AI companies in California, and last weeks news related to them, and also get sales for last month for each", results)
print(a)



Here are some AI companies in California, recent news related to them, and their sales for the last month:

1. Apple:
   - News: 
     - "Tech giants show Epic support against Apple's payment rules"
     - "The 30 Best Tech Deals in Amazon's Big Spring Sale  --  Including Apple, Beats, Dyson, and More Up to 77% Off"
     - "Meta, Microsoft slam Apple over app store policy"
   - Sales for last month: $88,268.0 million

2. Intel:
   - News: 
     - "Intel Gets $20Bn From CHIPS Fund, Plans To Invest $100Bn In US"
     - "Intel To Receive Up To $8.5 Billion In CHIPS Act Grants As US Speeds Up Semiconductor Manufacturing Drive"
   - Sales for last month: $90,357.0 million

3. Alphabet:
   - News: 
     - "Is Alphabet (GOOG) Stock Trading at a Reasonable Valuation?"
     - "Alphabet Inc CEO Sundar Pichai Sells 22,500 Shares"
     - "Gene Munster Agrees With Dan Nathan's Views That These 2 Magnificent 7 Companies Are AI Underdogs: 'I Believe That Will Change' - Alphabet (NASDAQ:GOOG), Apple (

In [104]:
plan_json = """{
    "query_plan": [
        {
            "id": 1,
            "parameters": {
                "keywords": [
                    "insurance",
                    "clients"
                ]
            },
            "dependencies": [],
            "query_type": "COMPANY"
        },
        {
            "id": 2,
            "parameters": {
                "min_date": "2024-02-18",
                "max_date": "2024-03-20"
            },
            "dependencies": [
                1
            ],
            "query_type": "SALES"
        },
        {
            "id": 3,
            "parameters": {
                "min_date": "2024-03-13",
                "max_date": "2024-03-20"
            },
            "dependencies": [
                1
            ],
            "query_type": "NEWS"
        }
    ]
}
"""
# plan_json = """{
#     "query_plan": [
#         {
#             "id": 1,
#             "parameters": {
#                 "keywords": [
#                     "insurance",
#                     "clients"
#                 ]
#             },
#             "dependencies": [],
#             "query_type": "COMPANY"
#         }
#     ]
# }
# """
plan = QueryPlan.parse_raw(plan_json)

In [None]:
"""Stashed code, using sql instead of the tsv_client"""
# import uuid

# cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
# conn.commit()


# def create_company_table():
#     create_table_query = """
#     DROP TABLE IF EXISTS fortune_100;
#     CREATE TABLE fortune_100 (
#         company TEXT,
#         rank FLOAT,
#         rank_change FLOAT,
#         revenue FLOAT,
#         profit FLOAT,
#         num_of_employees FLOAT,
#         sector TEXT,
#         city TEXT,
#         state TEXT,
#         newcomer BOOLEAN,
#         ceo_founder BOOLEAN,
#         ceo_woman BOOLEAN,
#         profitable BOOLEAN,
#         prev_rank FLOAT,
#         ceo TEXT,
#         website TEXT,
#         ticker TEXT,
#         market_cap FLOAT,
#         description TEXT,
#         embeddings vector(1536)
#     );
#     """
#     cur.execute(create_table_query)
#     conn.commit()


# def prepare_data_for_timescaledb(df: pd.DataFrame) -> list[tuple]:
#     uuid_list = [uuid.uuid4() for _ in range(len(df))]
#     metadata_list = [
#         {k: v for k, v in row.items() if not pd.isnull(v) and k != "description"}
#         for i, row in df.iterrows()
#     ]
#     content_list = df["description"].tolist()
#     embedding_list = [get_embeddings(description) for description in content_list]

#     # Create list of tuples/records
#     records = list(zip(uuid_list, metadata_list, content_list, embedding_list))
#     return records


# def load_csv_to_timescaledb(csv_file_path, table_name):
#     df = pd.read_csv(csv_file_path)
#     cols = df.columns
#     cols = [col.replace(" ", "_").lower() for col in cols]
#     # cur.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS embedding vector(1536)")
#     df.columns = df.columns.str.replace(" ", "_").str.lower()
#     df["market_cap"] = df["market_cap"].replace("-", 0).astype(float)
#     # Use iterative inserts to efficiently copy data from the CSV to the TimescaleDB table
#     for i, row in df.iterrows():
#         filtered_values = tuple(row[col] for col in cols)
#         try:
#             cur.execute(
#                 f"INSERT INTO {table_name} ({', '.join(cols)}) VALUES ({', '.join(['%s' for _ in cols])})",
#                 filtered_values,
#             )
#         except Exception as e:
#             print(f"Exception: {e}")
#             print(filtered_values)
#             print(
#                 f"INSERT INTO {table_name} ({', '.join(cols)}) VALUES ({', '.join(['%s' for _ in cols])})",
#                 filtered_values,
#             )

#     conn.commit()


# # create_company_table()
# # load_csv_to_timescaledb("fortune_100_with_embeddings.csv", "fortune_100")

In [None]:
results = plan.execute()

def parse_results(results:dict):
    for k, v in results.items():
        if k == QueryType.COMPANY_SEARCH:
            print("Companies:")
            company_results = v
        if k == QueryType.SALES_SEARCH:
            print("Sales:")
            sales_results = v
        if k == QueryType.NEWS_SEARCH:
            print("News:")
            news_results = v
        for val in v:
            print(val)
    return list(zip(company_results,sales_results,news_results))

parsed_results = parse_results(results)