In [2]:
from dotenv import load_dotenv
import os
import csv
import time
import requests
from sqlalchemy import create_engine, text
import pandas as pd

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

In [3]:
load_dotenv()

fmp_api_key = os.getenv('FMP_API_KEY')

db_name = os.getenv('DB_NAME')
db_password = os.getenv('DB_PASSWORD')

snp_path = os.path.join('data-ref', 'snp500.csv')
nas_path = os.path.join('data-ref', 'nasdaq.csv')

In [4]:
mysql_engine = create_engine(f"mysql+mysqlconnector://root:{db_password}@127.0.0.1:3306/{db_name}")
conn = mysql_engine.connect()
list(conn.execute(text("show tables;")))

[]

In [4]:
from datetime import datetime

def format_date_time_object(
    datetime_object: datetime,
    with_time: bool = False
) -> str:
    """
    DEPS:
        datetime -- in requirements.txt
    DEF:
        format a datetime object as a string.
    
    ARGS:
        datetime_object: the datetime object to format
        with_time: whether to include the time in the formatted string
    
    RETURNS:
        a formatted date string
    """

    if with_time:
        return datetime_object.strftime("%Y-%m-%d %H:%M:%S")
    else:
        return datetime_object.strftime("%Y-%m-%d")

In [5]:
def api_call_get(
    url: str, 
    params: dict,
    headers: dict = None,
    return_other_than_json: bool = False
) -> object:
    """
    DEPS:
        requests
    DEF:
        takes a url and parameters and packages and sends the get requests and returns json or response object
    ARGS:
        url: the url endpoint you want to call
        params: {key: value} pairs that serve as parameters for api
        headers: pass headers as {key: value} pairs for more advance config and authorization
        return_other_than_json: returns the response object rather than the json formatted response which allows for more inspection
    RETURNS:
        list of dicts or reponse object if successful, statuscode if not
    """
    if headers is not None:
        response = requests.get(url, params=params, headers=headers)
        if response.status_code != 200:
            return response.raise_for_status()
        else:
            return response.json()
    else:
        response = requests.get(url, params=params)
        if response.status_code != 200:
            return response.raise_for_status()
        else:
            if response.json() == []:
                return "empty response"
            else:
                return response.json()


In [6]:
def call_eod(symbol):
    today = datetime.now()
    time_delta_5_year = today - relativedelta(years=5) + timedelta(days=1) ## max amount of data i can get form api


    base_url = "https://financialmodelingprep.com/stable/historical-price-eod/full"

    today_param = format_date_time_object(today)
    time_delta_param = format_date_time_object(time_delta_5_year)


    params = {
        "symbol": symbol,
        "apikey": fmp_api_key,
        "from": time_delta_param,
        "to": today_param
    }

    response = api_call_get(base_url, params=params)

    return response

def call_eod_5_min(symbol):
    today = datetime.now()

    base_url = "https://financialmodelingprep.com/stable/historical-chart/5min"

    today_param = format_date_time_object(today)


    params = {
        "symbol": symbol,
        "apikey": fmp_api_key,
        "from": today_param,
        "to": today_param
    }

    response = api_call_get(base_url, params=params)

    return response

In [12]:
symbol = 'pltr'.upper()
response = call_eod_5_min(symbol)

In [16]:
df = pd.DataFrame(response)
df['symbol'] = f'{symbol}'
df['date_time_id'] = df['symbol'] + '_' + df['date']
df

Unnamed: 0,date,open,low,high,close,volume,symbol,date_time_id
0,2025-04-10 15:55:00,88.7900,87.8769,88.7900,88.540,2588027,PLTR,PLTR_2025-04-10 15:55:00
1,2025-04-10 15:50:00,87.8277,87.8277,88.7300,88.715,2000806,PLTR,PLTR_2025-04-10 15:50:00
2,2025-04-10 15:45:00,86.9250,86.7600,87.3200,87.315,1444357,PLTR,PLTR_2025-04-10 15:45:00
3,2025-04-10 15:40:00,87.7250,86.8781,87.7300,86.880,1528924,PLTR,PLTR_2025-04-10 15:40:00
4,2025-04-10 15:35:00,87.8550,87.6500,88.2099,87.710,1109924,PLTR,PLTR_2025-04-10 15:35:00
...,...,...,...,...,...,...,...,...
73,2025-04-10 09:50:00,89.9400,89.3100,90.3500,89.470,2159591,PLTR,PLTR_2025-04-10 09:50:00
74,2025-04-10 09:45:00,88.6950,88.6950,90.1800,89.925,2372610,PLTR,PLTR_2025-04-10 09:45:00
75,2025-04-10 09:40:00,89.5795,88.3501,89.5795,88.830,2679434,PLTR,PLTR_2025-04-10 09:40:00
76,2025-04-10 09:35:00,89.4600,89.1200,90.1300,89.525,3073888,PLTR,PLTR_2025-04-10 09:35:00


In [None]:
def transform_and_commit_data(
    response: object
) -> None:
    """
    DEPS:
        pandas, established connection to a sql database, in this case a mysql data base
    DEF:
        transform response json to df, make a primary key transformation and then write to a sql table
    ARGS:
        reponse: response object from an api in json format
    RETURNS:
        nothing, commits to the db or throws an error, the data should be appended if already exists
    """
    df = pd.DataFrame(response)
    df['date_time_id'] = df['symbol'] + '_' + df['date']
    cols = ['date_time_id'] + [col for col in df.columns if col != 'date_time_id']
    df = df[cols]a
    df.to_sql(
        name='raw_ingestion_test_5_min',
        con=mysql_engine,
        if_exists='append',
        index=False,
        chunksize=1000
    )
    conn.commit()

In [17]:
def transform_and_commit_data_2(
    response: object
) -> None:
    """
    DEPS:
        pandas, established connection to a sql database, in this case a mysql data base
    DEF:
        transform response json to df, make a primary key transformation and then write to a sql table
    ARGS:
        reponse: response object from an api in json format
    RETURNS:
        nothing, commits to the db or throws an error, the data should be appended if already exists
    """
    df = pd.DataFrame(response)
    df['symbol'] = f'{symbol}'
    df['date_time_id'] = df['symbol'] + '_' + df['date']
    df.to_sql(
        name='raw_ingestion_test_5_min',
        con=rds_engine,
        if_exists='append',
        index=False,
        chunksize=1000
    )
    conn.commit()

In [19]:
import mysql.connector
import os
from dotenv import load_dotenv
import time

load_dotenv()

rds_db_connection = os.getenv('RDS_DB_CONNECTION_NAME')
rds_db_password = os.getenv('RDS_DB_PASSWORD')

rds_engine = create_engine(f"mysql+mysqlconnector://admin:{rds_db_password}@{rds_db_connection}:3306/test_db")
conn = rds_engine.connect()
# conn = mysql.connector.connect(
#     host=rds_db_connection,
#     user="admin",
#     password=rds_db_password,
#     port=3306,
#     connection_timeout=10
# )

In [24]:
pd.read_sql("""
select * from raw_ingestion_test_5_min
""", conn)

Unnamed: 0,date,open,low,high,close,volume,symbol,date_time_id
0,2025-04-10 15:55:00,88.7900,87.8769,88.7900,88.540,2588027,PLTR,PLTR_2025-04-10 15:55:00
1,2025-04-10 15:50:00,87.8277,87.8277,88.7300,88.715,2000806,PLTR,PLTR_2025-04-10 15:50:00
2,2025-04-10 15:45:00,86.9250,86.7600,87.3200,87.315,1444357,PLTR,PLTR_2025-04-10 15:45:00
3,2025-04-10 15:40:00,87.7250,86.8781,87.7300,86.880,1528924,PLTR,PLTR_2025-04-10 15:40:00
4,2025-04-10 15:35:00,87.8550,87.6500,88.2099,87.710,1109924,PLTR,PLTR_2025-04-10 15:35:00
...,...,...,...,...,...,...,...,...
73,2025-04-10 09:50:00,89.9400,89.3100,90.3500,89.470,2159591,PLTR,PLTR_2025-04-10 09:50:00
74,2025-04-10 09:45:00,88.6950,88.6950,90.1800,89.925,2372610,PLTR,PLTR_2025-04-10 09:45:00
75,2025-04-10 09:40:00,89.5795,88.3501,89.5795,88.830,2679434,PLTR,PLTR_2025-04-10 09:40:00
76,2025-04-10 09:35:00,89.4600,89.1200,90.1300,89.525,3073888,PLTR,PLTR_2025-04-10 09:35:00


In [22]:
def transform_and_commit_data_2(
    response: object
) -> None:
    """
    DEPS:
        pandas, established connection to a sql database, in this case a mysql data base
    DEF:
        transform response json to df, make a primary key transformation and then write to a sql table
    ARGS:
        reponse: response object from an api in json format
    RETURNS:
        nothing, commits to the db or throws an error, the data should be appended if already exists
    """
    df = pd.DataFrame(response)
    df['symbol'] = f'{symbol}'
    df['date_time_id'] = df['symbol'] + '_' + df['date']
    df.to_sql(
        name='raw_ingestion_test_5_min',
        con=rds_engine,
        if_exists='append',
        index=False,
        chunksize=1000
    )
    conn.commit()

symbol = 'pltr'.upper()
response = call_eod_5_min(symbol)
transform_and_commit_data_2(response)

In [69]:
nas = []
with open(nas_path, 'r', newline='') as f:
    csv_reader = csv.reader(f)
    
    for row in csv_reader:
        nas.append(row[0])

snp = []
with open(snp_path, 'r', newline='') as f:
    csv_reader = csv.reader(f)
    
    for row in csv_reader:
        snp.append(row[0])

not_in = []
for q in nas:
    if q not in snp:
        not_in.append(q)
    else:
        continue

In [None]:

count = 1
start_ = time.time()
for s in snp:
    start_time = time.time()

    raw_json_reponse = call_eod(s)
    df = transform_and_commit_data(raw_json_reponse)
    
    elapsed_time = time.time() - start_time
    if len(s) == 1:
        print(f'commited {s}     to table -- time: {elapsed_time:.2f} -- count {count}')
    elif len(s) == 2:
        print(f'commited {s}    to table -- time: {elapsed_time:.2f} -- count {count}')
    elif len(s) == 3:
        print(f'commited {s}   to table -- time: {elapsed_time:.2f} -- count {count}')
    elif len(s) == 4:
        print(f'commited {s}  to table -- time: {elapsed_time:.2f} -- count {count}')
    else:
        print(f'commited {s} to table -- time: {elapsed_time:.2f} -- count {count}')
    count += 1

whole_loop = time.time() - start_ 
print(f'whole loop has taken {round(whole_loop, 2)}')

start_ = time.time()
for s in not_in:
    start_time = time.time()

    raw_json_reponse = call_eod(s)
    df = transform_and_commit_data(raw_json_reponse)
    
    elapsed_time = time.time() - start_time
    if len(s) == 3:
        print(f'commited {s}   to table -- time: {elapsed_time:.2f} -- count {count}')
    if len(s) == 4:
        print(f'commited {s}  to table -- time: {elapsed_time:.2f} -- count {count}')
    else:
        print(f'commited {s} to table -- time: {elapsed_time:.2f} -- count {count}')
    count += 1

whole_loop = time.time() - start_ 
print(f'whole loop has taken {round(whole_loop, 2)}')

NameError: name 'snp' is not defined

In [76]:
pd.read_sql(f""" 

SELECT COUNT(date_time_id)
FROM raw_ingestion_test

""", conn)

Unnamed: 0,COUNT(date_time_id)
0,1293836


In [77]:
pd.read_sql(f""" 
SELECT 
    table_name AS `Table`,
    round(((data_length + index_length) / 1024 / 1024), 2) `Size (MB)`
FROM information_schema.TABLES
WHERE table_schema = "{db_name}"
AND table_name = "raw_ingestion_test";
            """, conn)

Unnamed: 0,Table,Size (MB)
0,raw_ingestion_test,154.7


In [86]:
pd.read_sql(f""" 
select * 
from raw_ingestion_test
where 
    changePercent >= 20.0 
    and symbol = "PLTR"
                    """, conn)

Unnamed: 0,date_time_id,symbol,date,open,high,low,close,volume,change,changePercent,vwap
0,PLTR_2022-02-24,PLTR,2022-02-24,9.75,11.87,9.74,11.83,118793047,2.08,21.33,11.04
1,PLTR_2021-01-22,PLTR,2021-01-22,25.85,32.65,25.82,32.58,157240221,6.73,26.03,29.79
2,PLTR_2020-11-25,PLTR,2020-11-25,23.96,29.75,23.69,29.05,203256795,5.09,21.24,27.19
3,PLTR_2022-02-24,PLTR,2022-02-24,9.75,11.87,9.74,11.83,118793047,2.08,21.33,11.04
4,PLTR_2021-01-22,PLTR,2021-01-22,25.85,32.65,25.82,32.58,157240221,6.73,26.03,29.79
5,PLTR_2020-11-25,PLTR,2020-11-25,23.96,29.75,23.69,29.05,203256795,5.09,21.24,27.19


In [7]:
import boto3
import json

aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region_name = 'us-east-1' 

s3 = boto3.client(
    's3',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

In [8]:
symbol = 'pltr'.upper()
response = call_eod(symbol)

In [None]:
def upload_to_s3(data):
    bucket_name = os.getenv('S3_BUCKET_NAME')
    file_name = 'pltr-5-eod-data.json'
    s3.put_object(
        Bucket=bucket_name,
        Key=file_name,
        Body=json.dumps(data),
        ContentType='application/json'
    )
    print('Data uploaded successfully to S3')

In [10]:
response

[{'symbol': 'PLTR',
  'date': '2025-04-10',
  'open': 88.4,
  'high': 90.78,
  'low': 84.14,
  'close': 88.59,
  'volume': 123826122,
  'change': 0.19,
  'changePercent': 0.21493,
  'vwap': 87.98},
 {'symbol': 'PLTR',
  'date': '2025-04-09',
  'open': 78.19,
  'high': 93.33,
  'low': 77.27,
  'close': 92.01,
  'volume': 187244300,
  'change': 13.82,
  'changePercent': 17.67,
  'vwap': 85.2},
 {'symbol': 'PLTR',
  'date': '2025-04-08',
  'open': 83.29,
  'high': 86.11,
  'low': 75.22,
  'close': 77.32,
  'volume': 135345600,
  'change': -5.97,
  'changePercent': -7.17,
  'vwap': 80.49},
 {'symbol': 'PLTR',
  'date': '2025-04-07',
  'open': 66.65,
  'high': 81.8,
  'low': 66.12,
  'close': 77.84,
  'volume': 169083700,
  'change': 11.19,
  'changePercent': 16.79,
  'vwap': 73.1},
 {'symbol': 'PLTR',
  'date': '2025-04-04',
  'open': 80.07,
  'high': 80.98,
  'low': 71.93,
  'close': 74.01,
  'volume': 147323200,
  'change': -6.06,
  'changePercent': -7.57,
  'vwap': 76.75},
 {'symbol': '

In [11]:
upload_data_to_s3(response)

Data uploaded successfully to S3
