# Financial Modeling Prep

This notebook is used to query the financial model prep API to generate a transcript dataset. The dataset is saved in the form of an sqlite database.

## Dependencies

In [None]:
import urllib.request
import json
from typing import Dict, List, Tuple, Callable
import pandas
from google.colab import drive
import os
import sqlite3 as sq
from dataclasses import dataclass
import time
import datetime
import numpy as np

In [None]:
drive.mount('/content/drive')

## Paths

An API key is required in order to access the financial model prep API. To run this notebook please contact Nate (gess0043@umn.edu) before May 10th 2023 in order to receive the API key.

In [None]:
project_path = "/content/drive/MyDrive/CSCI-5541/Project/"
data_path = os.path.join(project_path, "data")

In [None]:
with open(os.path.join(project_path, "api_key.txt"), "r") as fP:
    api_key = fP.read()

In [None]:
if not os.path.exists(data_path):
    os.makedir(data_path)

In [None]:
database_path = os.path.join(data_path, "earnings_transcripts_data_test_2.db")

## Functions to pull from API

### Transcript [Endpoint](https://site.financialmodelingprep.com/developer/docs/earning-call-transcript-api/#Python)

This endpoint is used to pull transcript data from the financial hub API

In [None]:
def generate_transcript_url(company_symbol: str, quarter: int, year: int, api_key: str) -> str:
    return f"https://financialmodelingprep.com/api/v3/earning_call_transcript/{company_symbol}?quarter={quarter}&year={year}&apikey={api_key}"

def get_jsonparsed_data(url:str, data: dict = None) -> Dict:
    response = urllib.request.urlopen(
        url
    )
    data = response.read().decode("utf-8")
    return json.loads(data)


In [None]:
sample_url = generate_transcript_url(company_symbol="AAPL", quarter=1, year=2011, api_key=api_key)
sample_transcript = get_jsonparsed_data(sample_url)

In [None]:
sample_transcript[0]['date']

### Price [Endpoint](https://site.financialmodelingprep.com/developer/docs/historical-stock-data-free-api/)

This endpoint is used to pull historical price data for a given company on a given date.

In [None]:
def generate_price_url(company_symbol: str, from_date: str, to_date: str, api_key: str) -> str:
    return f"https://financialmodelingprep.com/api/v3/historical-price-full/{company_symbol}?from={from_date}&to={to_date}&apikey={api_key}"

def generate_price_url_from(company_symbol: str, from_date: str, api_key: str) -> str:
    return f"https://financialmodelingprep.com/api/v3/historical-price-full/{company_symbol}?from={from_date}&apikey={api_key}"


In [None]:
sample_price_url = generate_price_url(company_symbol="AAPL", from_date="2011-01-20", to_date="2011-01-20", api_key=api_key)
sample_price = get_jsonparsed_data(sample_price_url)

In [None]:
sample_price

## Create Database

### Database Utility
Utility for interacting with the database

In [None]:
class DB_Util:
    def __init__(self, database_path: str):
        self.con = sq.connect(database_path)

    
    def create_or_drop_table(self, sql: str) -> bool:
        cursor = self.con.cursor()
        try:
            cursor.execute(
                sql,
            )
            return True
        except Exception as e:
            print(e)
            return False
        finally:
            self.con.commit()
            cursor.close()
            del cursor

    def execute_query(self, sql: str, args: List) -> List:
        cursor = self.con.cursor()
        try:
            query_results = cursor.execute(
                sql,
                args
            )
            return query_results.fetchall()
        except Exception as e:
            print(e)
            return []
        finally:
            cursor.close()
            del cursor


    def insert_data(self, sql: str, data: List) -> bool:
        cursor = self.con.cursor()
        try:
            cursor.execute(
                sql,
                data                
            )
            return True
        except Exception as e:
            print(e)
            return False
        finally:
            self.con.commit()
            cursor.close()
            del cursor
    
    def close_connection(self):
        self.con.close()
        del self.con
            

### Data Classes

Objects to hold data from tables

In [None]:
@dataclass
class Company:
    symbol: str

    def to_dict(self):
        return {
            "symbol": self.symbol
        }

@dataclass
class Transcript:
    symbol: str
    date: str
    year: int
    quarter: int
    transcript: str

    def to_dict(self):
        return {
            "symbol": self.symbol,
            "date": self.date,
            "year": self.year,
            "quarter": self.quarter,
            "transcript": self.transcript
        }


@dataclass
class Price:
    symbol: str
    date: str
    opening_price: float
    closing_price: float

    def to_dict(self):
        return {
            "symbol": self.symbol,
            "date": self.date,
            "opening_price": self.opening_price,
            "closing_price": self.closing_price,
        }

@dataclass
class Label:
    id: int
    symbol: str
    transcript_date: str
    price_day_of_meeting: float
    avg_value: float
    delta_days: int
    total_days: int
    avg_type: str
    label: str    

### Create Database
Running the cell below will open an existing database if the path points to an existing database or it will create a new database. 

In [None]:
db_util = DB_Util(database_path=database_path)

### Creating Tables
The cell below creates the tables in the database. To drop tables in order to alter table definitions uncomment the second cell in this section and run it.

In [None]:
table_definitions = [
    '''
        CREATE TABLE IF NOT EXISTS company(
            symbol CHAR[10] NOT NULL PRIMARY KEY
        )
    ''',
    '''           
    CREATE TABLE IF NOT EXISTS price(
        symbol CHAR[10] NOT NULL,
        date CHAR[10],
        opening_value FLOAT,
        closing_value FLOAT,
        PRIMARY KEY (symbol, date)
        FOREIGN KEY (symbol) REFERENCES company(symbol)
    )
    ''',
    '''
    CREATE TABLE IF NOT EXISTS transcript(
        symbol CHAR[10] NOT NULL,
        year INTEGER,
        quarter INTEGER,
        date CHAR[10],
        transcript CLOB,
        FOREIGN KEY (symbol) REFERENCES company(symbol),
        PRIMARY KEY(symbol, year, quarter, date)
    )
    ''',
    # Not imposing any constraints because we will probably need to experiment with this
    '''
    CREATE TABLE IF NOT EXISTS label(
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        symbol CHAR[10],
        transcript_date CHAR[10],
        price_day_of_meeting FLOAT,
        avg_value FLOAT,
        delta_days INTEGER,
        total_days INTEGER,
        avg_type CHAR[10],
        label CHAR[20],
        FOREIGN KEY (symbol) REFERENCES company
    )
    '''    
]

table_creation_status = [
    db_util.create_or_drop_table(
        sql=table_definition
    )
    for table_definition in table_definitions
]
print(table_creation_status)

In [None]:
# table_names = [
#     # 'company', 
#     # 'price', 
#     # 'transcript', 
#     'label'
# ]
# drop_definitions = [
#     f'''
#     DROP TABLE IF EXISTS {table_name} 
#     '''
#     for table_name in table_names
# ]

# drop_status = [
#     db_util.create_or_drop_table(drop_definition)
#     for drop_definition in drop_definitions
# ]
# print(drop_status)

## Adding Data

### Add Companies

Running the cells below will insert the companies in the list below into the database

In [None]:
def add_companies(companies: List[Company], db_util: DB_Util):
    for company in companies:
        status = db_util.insert_data(
            sql= ''' INSERT INTO company(symbol) Values(?) ''',
            data = [company.symbol]
        )
        if not status:
            print(f"Unable to add company {company.symbol}")

        


In [None]:
companies = list(
        map(lambda symbol: Company(symbol=symbol),
            ["AAPL", "MSFT", "AMZN", "NVDA", "GOOGL", "GOOG", "BRK.B", "TSLA", "META", "UNH", "XOM", "JNJ", "JPM", "V", "PG", "MA", "CVX", "HD", "LLY", "ABBV"]
        )
    )

In [None]:
add_companies(companies,db_util)

### Add Transcript
Functions for adding transcripts

In [None]:
def add_transcript(transcript: Transcript, db_util: DB_Util)->bool:
        return db_util.insert_data(
            sql=''' INSERT INTO transcript (symbol, year, quarter, date, transcript) Values(?, ?, ?, ?, ?) ''',
            data= [transcript.symbol, transcript.year, transcript.quarter, transcript.date, transcript.transcript]
        )

def add_transcripts(transcipts: List[Transcript], db_util: DB_Util):
    for transcript in transcipts:
        status = add_transcript(transcript, db_util)
        if not status:
            print(f"Unable to add transcript {transcript.symbol} {transcript.date}")



### Add Prices
Functions for adding prices

In [None]:
def add_price(price: Price, db_util: DB_Util):
    return db_util.insert_data(
        sql = '''INSERT INTO price (symbol, date, opening_value, closing_value) VALUES(?, ?, ?, ?)''',
        data = [price.symbol, price.date, price.opening_price, price.closing_price]
    )

def add_prices(prices: List[Price], db_util: DB_Util):
    for price in prices:
        status = add_price(price, db_util)
        if not status:
            print(f"Unable to add price from company {price.symbol} on {price.date}")


### Add Labels
Functions for adding labels

In [None]:
def add_label(label: Label, db_util: DB_Util):
    return db_util.insert_data(
        sql = '''INSERT INTO label (symbol, transcript_date, price_day_of_meeting, avg_value, delta_days, total_days, avg_type, label) VALUES(?, ?, ?, ?, ?, ?, ?, ?)''',
        data = [label.symbol, label.transcript_date, label.price_day_of_meeting, label.avg_value, label.delta_days, label.total_days, label.avg_type, label.label]
    )

def add_labels(labels: List[Label], db_util: DB_Util):
    for label in labels:
        status = add_label(label, db_util)
        if not status:
            print(f"Unable to add label from company {label.symbol} on {label.date} using method {label.avg_type}")


## Pull Data From financialmodelingprep

The functions below are used to leverage the database utility, and functions from the previous section in order to add transcripts, prices and labels. The The database caches results to prevent duplicate runs using the same parameters.

### Pull Transcript Data
Running the two cells below pulls company transcripts.

In [None]:
def pull_company_transcripts(companies: List[Company], years: List[int], quarters: List[int], db_util: DB_Util,api_key: str, db_only: bool = False) -> Tuple[List[Transcript], List[str]]:
    transcripts = list()
    failures = list()
    for company in companies:
        for year in years:
            for quarter in quarters:
                query_result = db_util.execute_query(
                    sql = '''SELECT * FROM transcript t WHERE t.symbol = ? and t.year = ? and t.quarter = ?''',
                    args = [company.symbol, year, quarter]
                )
                if query_result == [] and not db_only:
                    print(f"Fetching from API {company.symbol}-{year}-{quarter}")
                    # Query API and update Database
                    url = generate_transcript_url(company_symbol=company.symbol,quarter=quarter, year=year, api_key=api_key)
                    try:
                        transcript_result = get_jsonparsed_data(url=url)[0]
                        transcript = Transcript(
                            symbol=transcript_result['symbol'],
                            year=transcript_result['year'],
                            quarter=transcript_result['quarter'],
                            date=transcript_result['date'],
                            transcript=transcript_result['content']
                        )
                        add_transcript(transcript=transcript, db_util=db_util)
                        transcripts.append(transcript)
                    except Exception as e:
                        print(url)
                        failures.append(url)
                    time.sleep(0.5)
                else:
                    print(f"Fetching from DB {company.symbol}-{year}-{quarter}")
                    for transcript_record in query_result:
                        transcripts.append(
                            Transcript(
                                symbol=transcript_record[0],
                                year=transcript_record[1],
                                quarter=transcript_record[2],
                                date=transcript_record[3],
                                transcript=transcript_record[4],
                            )
                        )
    return transcripts, failures





In [None]:
years = [2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022]
quarters = [1,2,3,4]


In [None]:
transcripts,failures = pull_company_transcripts(
    companies = companies,
    years=years,
    quarters=quarters,
    db_util=db_util,
    api_key=api_key,
    db_only=False
)

### Pull Pricing Data
Running the cells from below pulls the historical pricing data from the financial model prep api and inserts it into the database.

In [None]:
def pull_transcript_prices_from_web_api(transcripts: List[Transcript], delta: int, db_util: DB_Util, api_key: str) -> Tuple[List[Price],List[Transcript]]:
    prices = []
    errors = []
    for transcript in transcripts:
        call_date = datetime.datetime.strptime(transcript.date, '%Y-%m-%d %H:%M:%S').date()
        weekday = call_date.weekday()
        # Call was on a weekday
        if call_date.weekday() < 5:
            start_date = call_date
        # Call was on a weekend
        else:
            start_date = call_date - datetime.timedelta(days=weekday-5)

        if delta < 0:
            end_date = start_date
            start_date = start_date + datetime.timedelta(days=delta)
        else:
            end_date = start_date + datetime.timedelta(days=delta)

        print(f"{transcript.symbol}  {start_date} - {end_date}")
        pricing_url = generate_price_url(
            company_symbol=transcript.symbol,
            from_date=str(start_date),
            to_date = str(end_date),
            api_key = api_key
        )
        pricing_results = get_jsonparsed_data(pricing_url)
        try:
            current_prices = [
                Price(
                    symbol=pricing_results['symbol'],
                    date=pricing_result['date'],
                    opening_price=pricing_result['open'],
                    closing_price=pricing_result['close'],                    
                ) for pricing_result in pricing_results["historical"]
            ]
            add_prices(prices=current_prices, db_util=db_util)
            prices.extend(current_prices)
        except Exception as e:
            print(f"Error: {e}, {transcript.symbol}-{str(call_date)}-{str(end_date)}")
            errors.append(transcript)
        time.sleep(0.5)
    return prices, errors

def pull_prices_from_data_base_transcript_list(transcripts: List[Transcript], offset: int, delta: int, con: sq.Connection) -> List[Price]:
    cursor = con.cursor()
    prices = []
    for transcript in transcripts:
        call_date = datetime.datetime.strptime(transcript.date, '%Y-%m-%d %H:%M:%S').date()
        start_date = call_date + datetime.timedelta(days=offset)
        end_date = start_date + datetime.timedelta(days=delta)
        current_prices_results = cursor.execute(
            '''
            SELECT * FROM price p WHERE p.symbol = ? and date(p.date) > date(?) and date(p.date) < date(?)
            ''',
            [
                transcript.symbol, str(start_date), str(end_date)
            ]
        )
        current_prices = [
            Price(
                symbol=current_price_result[0],
                date=current_price_result[1],
                opening_price = current_price_result[2],
                closing_price = current_price_result[3]
            )
            for current_price_result in current_prices_results
        ]
        prices.extend(current_prices)
    return prices

def pull_transcript_prices_from_database(transcript: Transcript, offset: int, delta: int, con: sq.Connection) -> List[Price]:
    cursor = con.cursor()
    call_date = datetime.datetime.strptime(transcript.date, '%Y-%m-%d %H:%M:%S').date()
    start_date = call_date + datetime.timedelta(days=offset)
    end_date = start_date + datetime.timedelta(days=delta)
    current_prices_results = cursor.execute(
        '''
        SELECT * FROM price p WHERE p.symbol = ? and date(p.date) >= date(?) and date(p.date) <= date(?)
        ''',
        [
            transcript.symbol, str(start_date), str(end_date)
        ]
    )
    prices =  [
        Price(
            symbol=current_price_result[0],
            date=current_price_result[1],
            opening_price = current_price_result[2],
            closing_price = current_price_result[3]
        )
        for current_price_result in current_prices_results
    ]   
    cursor.close()
    del cursor
    return prices

def pull_prices_from_database(symbol: str, start_date: datetime.date, end_date: datetime.date, db_util: DB_Util) -> List[Price]:
    current_prices_results = db_util.execute_query(
        sql = '''
        SELECT * FROM price p WHERE p.symbol = ? and date(p.date) >= date(?) and date(p.date) <= date(?) ORDER BY date(p.date)
        ''',
        args = [
            symbol, str(start_date), str(end_date)
        ]
    )
    prices =  [
        Price(
            symbol=current_price_result[0],
            date=current_price_result[1],
            opening_price = current_price_result[2],
            closing_price = current_price_result[3]
        )
        for current_price_result in current_prices_results
    ]   
    return prices
    




In [None]:
prices, errors = pull_transcript_prices_from_web_api(
    transcripts=transcripts,
    delta=30,
    db_util=db_util,
    api_key=api_key
)

In [None]:
# prices = pull_prices_from_data_base_transcript_list(
#     transcripts=transcripts,
#     offset=-7,
#     delta=37,
#     db_util=db_util,
# )

# pull_prices_from_database(
#     symbol = transcripts[0].symbol,
#     start_date = datetime.datetime.strptime(transcripts[1].date,'%Y-%m-%d %H:%M:%S').date(),
#     end_date = datetime.datetime.strptime(transcripts[1].date,'%Y-%m-%d %H:%M:%S').date() + datetime.timedelta(days=2),
#     db_util = db_util
# )

## Generating Labels
Running the cells below generates the ground truths based on the pricing data.

In [None]:
def mean_three_class(mean_price: float, price_day_of: float) -> str:
    if 1.01 * price_day_of < mean_price:
        return "POSITIVE"
    elif 0.99 * price_day_of < mean_price < 1.01 * price_day_of:
        return "NEUTRAL"
    else:
        return "NEGATIVE"

def standard_deviation_three_class(standard_deviation: float, price_day_of: float)-> str:
    if 0.01 < standard_deviation/price_day_of:
        return "VOLATILE"
    else:
        return "NOT VOLATILE"


def generate_mean_labels(transcripts: List[Transcript], delta: int, db_util: DB_Util, label_method: Callable[[float, float], str], label_string: str):
    for transcript in transcripts:
        call_date = datetime.datetime.strptime(transcript.date, '%Y-%m-%d %H:%M:%S').date()
        weekday = call_date.weekday()
        # Call was on a weekday
        if call_date.weekday() < 5:
            start_date = call_date
        # Call was on a weekend
        else:
            start_date = call_date - datetime.timedelta(days=weekday-4)
        
        end_date = start_date + datetime.timedelta(days=delta)
        prices = pull_prices_from_database(transcript.symbol,start_date, end_date, db_util)
        total_price = 0
        total_records = 2 * len(prices)
        if len(prices) == 0:
            print(f"ERROR {transcript.symbol} {transcript.date}")
            return
        call_date_price = prices[0]
        for price in prices[1:]:
            total_price += price.opening_price + price.closing_price

        mean_price = float(total_price/total_records)

        label_class = label_method(mean_price=mean_price, price_day_of = prices[0].closing_price)

        transcript_label = Label(
            id=None,
            symbol=transcript.symbol,
            transcript_date=transcript.date,
            price_day_of_meeting=prices[0].closing_price,
            avg_value= mean_price,
            delta_days=delta,
            total_days=len(prices),
            avg_type=label_string,
            label=label_class
        )
        add_label(label=transcript_label, db_util=db_util)


def generate_standard_deviation_label(transcripts: List[Transcript], delta: int, db_util: DB_Util, label_method: Callable[[float, float], str], label_string: str):
        for transcript in transcripts:
            call_date = datetime.datetime.strptime(transcript.date, '%Y-%m-%d %H:%M:%S').date()
            weekday = call_date.weekday()
            # Call was on a weekday
            if call_date.weekday() < 5:
                start_date = call_date
            # Call was on a weekend
            else:
                start_date = call_date - datetime.timedelta(days=weekday-4)
            
            end_date = start_date + datetime.timedelta(days=delta)
            prices = pull_prices_from_database(transcript.symbol,start_date, end_date, db_util)
            total_price = 0
            total_records = 2 * len(prices)
            if len(prices) == 0:
                print(f"ERROR {transcript.symbol} {transcript.date}")
                return
            call_date_price = prices[0]

            prices_list = []
            for price in prices[1:]:
                prices_list.append(price.opening_price)
                prices_list.append(price.closing_price)

            prices_array = np.array(prices_list)
            mean_price = np.mean(prices_array)
            standard_deviation_price = np.std(prices_array)


            label_class = label_method(standard_deviation=standard_deviation_price, price_day_of = prices[0].closing_price)

            transcript_label = Label(
                id=None,
                symbol=transcript.symbol,
                transcript_date=transcript.date,
                price_day_of_meeting=prices[0].closing_price,
                avg_value= standard_deviation_price,
                delta_days=delta,
                total_days=len(prices),
                avg_type=label_string,
                label=label_class
            )
            add_label(label=transcript_label, db_util=db_util)


        

In [None]:
# generate_mean_labels(
#     transcripts,
#     delta=7,
#     db_util=db_util,
#     label_method=mean_three_class,
#     label_string="MEAN3CLASS"
# )

generate_standard_deviation_label(
    transcripts,
    delta=7,
    db_util=db_util,
    label_method=standard_deviation_three_class,
    label_string="STANDARDDEVIATION2CLASS"
)



## Saving and Closing Connection

In [None]:
db_util.close_connection()
del db_util