In [1]:
import os
import requests
import json
from datetime import datetime
from typing import Union, List
import pandas as pd
from dotenv import load_dotenv
import sqlalchemy as sa
load_dotenv()


True

In [2]:
class APIRequestError(Exception):
    def __init__(self, status_code, message, function_name):
        self.status_code = status_code
        self.message = message
        self.function_name = function_name
        super().__init__(f"HTTP error {self.status_code} occurred in {self.function_name}: {self.message}")


In [3]:
#Primero: funcion para traer serie intradiaria en 60 min del stock que necesito

def intraday_stock_serie(symbol:str, interval:str):   
    endpoint = 'TIME_SERIES_INTRADAY'
    adjusted=True
    extended_hours=False
    size = 'compact'
    parameters_market_data = {'function':endpoint, 'symbol':symbol, 'interval':interval, 'extended_hours':extended_hours,
            'adjusted':adjusted,'outputsize':size,'apikey':token }
    try:
        r = requests.get(base_url, params=parameters_market_data)
        r.raise_for_status()  
        data = r.json()
        if "Error Message" in data:
            error_message = data["Error Message"]
            raise APIRequestError(r.status_code, error_message, "intraday_stock_serie")
        else:
            data = data[f'Time Series ({interval})']
            return data
    except requests.exceptions.HTTPError as http_err:
        raise APIRequestError(http_err.response.status_code, http_err, "intraday_stock_serie")
    except Exception as err:
        raise APIRequestError(500, str(err), "intraday_stock_serie")

In [9]:
#Segundo: funcion para traer noticias relacionadas a ese stock

def getSentiment(
    symbol: str,
    topics: Union[str, List[str]]
):
    
    # Convierto topics en un solo string si vino en una lista de strings
    if isinstance(topics, list):
        topics = ','.join(topics)
    
    parameters_news_sentiment_data = {
        'function': 'NEWS_SENTIMENT',
        'tickers': symbol,
        'topics': topics,
        'apikey': token
    }
    
    try:
        r = requests.get(base_url, params=parameters_news_sentiment_data)
        r.raise_for_status() 
        data = r.json()
        data_feed = data['feed']
        data_sentiment = []
        for i in data_feed:
            for item in i['ticker_sentiment']:
                if item['ticker'] == symbol:
                    # Formateo time_published 
                    time_published = datetime.strptime(i['time_published'], '%Y%m%dT%H%M%S')
                    formatted_time_published = time_published.strftime('%Y-%m-%d %H:%M')
                    data_sentiment.append({
                        'ticker': item['ticker'],
                        'time_published': formatted_time_published,
                        'source_domain': i['source_domain'],
                        'relevance_score': item['relevance_score'],
                        'ticker_sentiment_label': item['ticker_sentiment_label']
                    })
        return data_sentiment
    except requests.exceptions.HTTPError as http_err:
        raise APIRequestError(http_err.response.status_code, http_err, "getSentiment")
    except Exception as err:
        raise APIRequestError(500, str(err), "getSentiment")



In [10]:
# 3 Unifico las funciones de market data y news data en una sola 
def get_stock_data(tickers, interval, topics):
    stock_data_frames = {}
    # Get database connection parameters
    DB_NAME = os.environ.get('DB_NAME')
    DB_USER = os.environ.get('DB_USER')
    DB_PWD = os.environ.get('DB_PWD')
    DB_PORT = os.environ.get('DB_PORT')
    DB_HOST = os.environ.get('DB_HOST')
    dbschema = f'{DB_USER}'

    # Create the connection engine outside the loop
    conn = sa.create_engine(
        f"postgresql://{DB_USER}:{DB_PWD}@{DB_HOST}:{DB_PORT}/{DB_NAME}",
        connect_args={'options': f'-csearch_path={dbschema}'}
    )

    for ticker in tickers:
        try:
            # Traigo intraday stock data
            intraday_data = intraday_stock_serie(ticker, interval)
            
            # Traigo sentiment data
            sentiment_data = getSentiment(ticker, topics)
            
            # Convierto en pandas dataframe los precios de intraday stock
            df_intraday = pd.DataFrame.from_dict(intraday_data, orient='index')
            df_intraday.columns = ['open_price', 'high_price', 'low_price', 'close_price', 'volume']
            df_intraday.reset_index(inplace=True)
            df_intraday.rename(columns={'index': 'date'}, inplace=True)
            df_intraday['date'] = pd.to_datetime(df_intraday['date'])
            
            intraday_table_name = f'stock_intraday_prices_{ticker}'
            
            #Creo tabla de precios en redshift
            conn.execute(f"""
                CREATE TABLE {intraday_table_name} (
                    date TIMESTAMP,
                    open_price FLOAT,
                    high_price FLOAT,
                    low_price FLOAT,
                    close_price FLOAT,
                    volume INT
                )
                DISTKEY(date);
            """)
    

            #Voy con el dataframe de news sentiment
            df_sentiment = pd.DataFrame(sentiment_data)
            sentiment_table_name = f'stock_sentiment_{ticker}'
            
            #Creo tabla de sentiment en redshift
            conn.execute(f"""
            CREATE TABLE {sentiment_table_name}  (
                ticker VARCHAR,
                time_published TIMESTAMP,
                source_domain VARCHAR,
                relevance_score VARCHAR,
                tiker_sentiment_label VARCHAR
            );
            """)

            
            # Guardo los df en diccionarios
            stock_data_frames[ticker] = {
                'intraday_data': df_intraday,
                'sentiment_data': df_sentiment
            }
            
        except APIRequestError as api_err:
            print(f"{api_err.function_name}: API Request Error - Status Code {api_err.status_code}: {api_err.message}")
            # You can handle the error based on the status code here.
            # For example, you may choose to skip the stock if the error is not recoverable.
            continue
    return stock_data_frames

In [11]:
#Advantage api configuration parameters
base_url = os.environ.get('BASE_URL')
token = os.environ.get('API_TOKEN')
tickers = ['AAPL','IBM']
interval = '60min'
topics = 'technology, manufacturing, financial_markets'


In [12]:
data_frames_by_ticker = get_stock_data(tickers, interval, topics)

In [14]:
data_frames_by_ticker['AAPL']['intraday_data']

Unnamed: 0,date,open_price,high_price,low_price,close_price,volume
0,2023-07-28 19:00:00,195.8500,195.9500,195.7800,195.9500,20654
1,2023-07-28 18:00:00,195.8000,195.8600,195.7200,195.8400,22108
2,2023-07-28 17:00:00,195.6800,195.8300,195.6200,195.7750,359879
3,2023-07-28 16:00:00,195.8400,195.8700,195.3000,195.6700,15958267
4,2023-07-28 15:00:00,195.9250,196.1500,195.4200,195.8400,8229424
...,...,...,...,...,...,...
95,2023-07-21 04:00:00,192.9800,193.4000,192.9800,193.1700,28633
96,2023-07-20 19:00:00,192.9900,193.1200,192.5500,192.9900,84071
97,2023-07-20 18:00:00,192.9200,193.1700,192.8000,192.9850,87339
98,2023-07-20 17:00:00,195.1000,195.1000,189.9220,192.9000,112415
