In [185]:
from typing import Dict, List, Optional
from functools import partial
from collections import defaultdict

import requests
import pandas as pd
from tqdm import tqdm
import pickle

# API_KEY = '990QAJUB6HL7TQOR' # for alpha vantage
API_KEY = None
TICKERS = ["TSLA", "IBM"]
RANGE = ["2023-07-01", "2023-08-31"]
INTERVAL = "DAILY"
CALCULATIONS = ["MEAN", "STDDEV", "CORRELATION"]

# API functions

In [200]:
class APIs:

    base_url = "https://www.alphavantage.co/query"

    @staticmethod
    def _get_comma_seperated(_list: List[str]):
        """
        Helper function for turning list of strings into comma seperated single strings for API call
        """
        if len(_list) == 0:
            _list = _list[0]
        elif len(_list) > 0:
            _list = ",".join(_list)
        else:
            raise ReferenceError(f"Need to specify at least one item. Received {_list}.")
        return _list

    @staticmethod
    def get_news_sentiment(api: str, tickers: List[str]) -> Dict[str, dict]:
        """
        Returns a data JSON of the format:

        {
        "items": "50",
        "sentiment_score_definition": "x <= -0.35: Bearish; -0.35 < x <= -0.15: Somewhat-Bearish; -0.15 < x <= 0.15: Neutral; 0.15 < x <= 0.35: Somewhat-Bullish; x > 0.35: Bullish",
        "relevance_score_definition": "0 < x <= 1, with a higher score indicating higher relevance.",
        "feed": [
            {
            "title": "Sample News Title",
            "url": "https://www.example.com/news-article",
            "time_published": "20250328T200000",
            "authors": ["Author Name"],
            "summary": "Brief summary of the news article.",
            "banner_image": "https://www.example.com/image.jpg",
            "source": "News Source",
            "category_within_source": "Finance",
            "source_domain": "example.com",
            "topics": [
                {
                "topic": "Financial Markets",
                "relevance_score": "0.9"
                }
            ],
            "overall_sentiment_score": "0.25",
            "overall_sentiment_label": "Somewhat-Bullish",
            "ticker_sentiment": [
                {
                "ticker": "AAPL",
                "relevance_score": "0.8",
                "ticker_sentiment_score": "0.3",
                "ticker_sentiment_label": "Somewhat-Bullish"
                }
            ]
            }
            // Additional articles...
        ]
        }
        """
        tickers = APIs._get_comma_seperated(tickers)
        params = {
            "apikey": api,
            "symbol": tickers,
            "function": "NEWS_SENTIMENT"
        }
        return requests.get(APIs.base_url, params=params).json()

    @staticmethod
    def get_insider_moves(api: str, tickers: List[str]) -> Dict[str, dict]:
        """
        Returns a data JSON of the format:

        {
        "symbol": "AAPL",
        "insider_trades": [
            {
            "name": "John Doe",
            "relationship": "CEO",
            "transaction_date": "2025-03-27",
            "transaction_code": "P",
            "transaction_amount": "1000",
            "transaction_price": "150.00",
            "transaction_share": "100",
            "post_transaction_share": "10100"
            }
            // Additional transactions...
        ]
        }

        """
        tickers = APIs._get_comma_seperated(tickers)
        params = {
            "apikey": api,
            "symbol": tickers,
            "function": "INSIDER_TRANSACTIONS"
        }
        return requests.get(APIs.base_url, params=params).json()

    @staticmethod
    def get_window_analytics(
        api: str, 
        tickers: List[str],
        n_month: int,
        calculations: List[str],
        window: Optional[int] = 20,
        interval: Optional[str] = "DAILY",
    ) -> Dict[str, dict]:
        """
        Returns a data JSON of the form:

        {
        "Meta Data": {
            "1. Information": "Intraday (5min) open, high, low, close prices and volume",
            "2. Symbol": "AAPL",
            "3. Last Refreshed": "2025-03-28 20:00:00",
            "4. Interval": "5min",
            "5. Output Size": "Compact",
            "6. Time Zone": "US/Eastern"
        },
        "Time Series (5min)": {
            "2025-03-28 20:00:00": {
            "1. open": "150.00",
            "2. high": "151.00",
            "3. low": "149.50",
            "4. close": "150.50",
            "5. volume": "1000"
            }
            // Additional time intervals...
        }
        }

        """
        tickers = APIs._get_comma_seperated(tickers)
        calculations = APIs._get_comma_seperated(calculations)
        params = {
            "SYMBOLS": tickers,
            "RANGE": f"{n_month}month",
            "INTERVAL": interval,
            "OHLC": "close",
            "WINDOW_SIZE": window,
            "CALCULATIONS": calculations,
            "apikey": api,
            "function": "ANALYTICS_SLIDING_WINDOW",
        }
        return requests.get(APIs.base_url, params=params).json()
    
    @staticmethod
    def get_technical_indicator(
            api: str,
            tickers: List[str],
            indicator: str = "RSI", 
            interval: str = "daily", 
            time_period: int = 14,
        ) -> Dict[str, dict]:
        """
        Returns a JSON of the form:

        {
            "Meta Data": {
                "1: Symbol": "IBM",
                "2: Indicator": "Relative Strength Index (RSI)",
                "3: Last Refreshed": "2025-03-28 20:00:00",
                "4: Interval": "daily",
                "5: Time Period": 14,
                "6: Series Type": "close",
                "7: Time Zone": "US/Eastern Time"
            },
            "Technical Analysis: RSI": {
                "2025-03-28": {
                    "RSI": "56.1234"
                },
                "2025-03-27": {
                    "RSI": "54.9876"
                }
                // Additional data points...
            }
        }
        """
        output = defaultdict(dict)
        for ticker in tickers:
            params = {
                "function": indicator,
                "symbol": ticker,
                "interval": interval,
                "time_period": time_period,
                "series_type": "close",
                "apikey": api,
            }
            output[ticker] = requests.get(APIs.base_url, params=params).json()
        return output

    @staticmethod
    def get_fundamentals(api: str, tickers: List[str]) -> Dict[str, dict]:
        """
        Returns a JSON of the form:

        {
            "Symbol": "IBM",
            "AssetType": "Common Stock",
            "Name": "International Business Machines Corporation",
            "Description": "IBM is a global technology company...",
            "CIK": "0000051143",
            "Exchange": "NYSE",
            "Currency": "USD",
            "Country": "USA",
            "Sector": "Technology",
            "Industry": "Information Technology Services",
            "Address": "1 New Orchard Road, Armonk, NY, USA",
            "FiscalYearEnd": "December",
            "LatestQuarter": "2025-03-31",
            "MarketCapitalization": "120000000000",
            "EBITDA": "15000000000",
            "PERatio": "14.50",
            "PEGRatio": "1.20",
            "BookValue": "20.50",
            "DividendPerShare": "6.00",
            "DividendYield": "0.045",
            "EPS": "10.00",
            "RevenuePerShareTTM": "50.00",
            "ProfitMargin": "0.15",
            "OperatingMarginTTM": "0.18",
            "ReturnOnAssetsTTM": "0.05",
            "ReturnOnEquityTTM": "0.20",
            "RevenueTTM": "75000000000",
            "GrossProfitTTM": "35000000000",
            "DilutedEPSTTM": "9.50",
            "QuarterlyEarningsGrowthYOY": "0.05",
            "QuarterlyRevenueGrowthYOY": "0.03",
            "AnalystTargetPrice": "140.00",
            "TrailingPE": "14.00",
            "ForwardPE": "13.00",
            "PriceToSalesRatioTTM": "1.60",
            "PriceToBookRatio": "6.80",
            "EVToRevenue": "1.80",
            "EVToEBITDA": "10.00",
            "Beta": "1.10",
            "52WeekHigh": "150.00",
            "52WeekLow": "100.00",
            "50DayMovingAverage": "120.00",
            "200DayMovingAverage": "125.00",
            "SharesOutstanding": "1000000000",
            "DividendDate": "2025-04-15",
            "ExDividendDate": "2025-03-30"
        }
        """
        output = defaultdict(dict)

        for ticker in tickers:
            params = {
                "function": "OVERVIEW",
                "symbol": ticker,
                "apikey": api,
            }
            ticker_feed = requests.get(APIs.base_url, params=params).json()
            output[ticker] = ticker_feed
        
        return output

# Information processing classes

In [None]:
class INPUT_FIELDS:
    """Class for keeping track field names"""
    ticker = "ticker"
    sentiment_score = "sentiment_score"
    direction = "direction"
    size = "size"
    conviction = "conviction"
    execution_time = "execution_time"
    valid_period = "valid_period"

In [None]:
class InformationCollationPipeline:
    
    def __init__(self, api_key: str, tickers: List[str]) -> None:
        self.api_key = api_key
        self.tickers = tickers

        self.signals_df = pd.DataFrame(columns=[
            INPUT_FIELDS.ticker,
            INPUT_FIELDS.sentiment_score,
            INPUT_FIELDS.direction,
            INPUT_FIELDS.size,
            INPUT_FIELDS.conviction,
            INPUT_FIELDS.execution_time,
            INPUT_FIELDS.valid_period,
        ])
        self.signals_df[INPUT_FIELDS.ticker] = pd.Series(tickers)
        self.signals_df.set_index(INPUT_FIELDS.ticker, inplace=True)

    def run(self) -> pd.DataFrame:
        ...

    def process_analytics(self) -> None:
        if self.api_key:
            feed_json = APIs.get_window_analytics(self.api_key, self.tickers)
        else:
            with open("pickled_feeds/sliding_window_analytics.pkl", 'rb') as f:
                feed_json = pickle.load(f)

        ...

    def process_fundamentals(self) -> None:
        if self.api_key:
            feed_json = APIs.get_window_analytics(self.api_key, self.tickers)
        else:
            with open("pickled_feeds/fundamentals.pkl", 'rb') as f:
                feed_json = pickle.load(f)

        ...

    def process_sentiment(self) -> None:
        if self.api_key:
            feed_json = APIs.get_news_sentiment(self.api_key, self.tickers)
        else:
            with open("pickled_feeds/news_sentiment.pkl", 'rb') as f:
                feed_json = pickle.load(f)
        
        self.signals_df["_news_occurences"] = self._scrape_feed(feed_json)
        self.signals_df[["_all_sentiment_scores", "_all_relevance_scores"]] = self.signals_df.apply(
            lambda row: self._collect_sentiment_scores(feed_json, row),
            axis=1,
        )

        self.signals_df[INPUT_FIELDS.sentiment_score] = self._aggregate_scores(self.signals_df)
        self.signals_df[[INPUT_FIELDS.direction, INPUT_FIELDS.conviction]] = (
            self.signals_df[INPUT_FIELDS.sentiment_score]
            .dropna()
            .apply(self._convert_score)
        )

        self.signals_df.drop(columns=[
            "_news_occurences",
            "_all_sentiment_scores",
            "_all_relevance_scores",
        ], inplace=True)
    
    @staticmethod
    def _scrape_feed(feed_json: Dict[str, dict]) -> pd.Series:
        ticker_occurrences = defaultdict(list)
        for i, article_dict in enumerate(tqdm(feed_json["feed"])):
            for ticker_dict in article_dict["ticker_sentiment"]:
                ticker = ticker_dict["ticker"]
                ticker_occurrences[ticker].append(i)

        occurrences_series = pd.Series(ticker_occurrences, name="_news_occurences", dtype=object)
        return occurrences_series
    
    @staticmethod
    def _collect_sentiment_scores(feed_json: Dict[str, dict], row: pd.Series) -> pd.Series:
        tot_sentiment = []
        tot_relevance = []

        occurrences = row["_news_occurences"]
        if not isinstance(occurrences, list):
            occurrences = []

        for i in occurrences:
            for ticker_dict in feed_json["feed"][i]["ticker_sentiment"]:
                if row.name == ticker_dict["ticker"]:
                    tot_sentiment += [float(ticker_dict["ticker_sentiment_score"])]
                    tot_relevance += [float(ticker_dict["relevance_score"])]

        return pd.Series({
            "_all_sentiment_scores": tot_sentiment,
            "_all_relevance_scores": tot_relevance
        })
    
    @staticmethod
    def _aggregate_scores(row: pd.Series) -> pd.Series:
        df_exploded: pd.DataFrame = (
            row
            .explode(["_all_sentiment_scores", "_all_relevance_scores"])
        )
        df_exploded = (
            df_exploded
            .assign(**{
                "_weighted_sentiment": lambda x: x["_all_sentiment_scores"] * x["_all_relevance_scores"]
            })
            .reset_index()
            .groupby(INPUT_FIELDS.ticker)
            [["_weighted_sentiment", "_all_relevance_scores"]]
            .sum()
            .replace({0: None})
            .dropna()
        )
        return df_exploded["_weighted_sentiment"].divide(df_exploded["_all_relevance_scores"])
    
    @staticmethod
    def _convert_score(sentiment_score: float) -> pd.Series:
        
        if sentiment_score > 0.35:
            return pd.Series({INPUT_FIELDS.direction: "buy", INPUT_FIELDS.conviction: "very strong"})
        elif 0.15 < sentiment_score <= 0.35:
            return pd.Series({INPUT_FIELDS.direction: "buy", INPUT_FIELDS.conviction: "good"})
        elif -0.15 <= sentiment_score <= 0.15:
            return pd.Series({INPUT_FIELDS.direction: "hold", INPUT_FIELDS.conviction: "small"})
        elif -0.35 <= sentiment_score < -0.15:
            return pd.Series({INPUT_FIELDS.direction: "sell", INPUT_FIELDS.conviction: "good"})
        else:
            return pd.Series({INPUT_FIELDS.direction: "sell", INPUT_FIELDS.conviction: "very strong"})
        

with open("pickled_feeds/news_sentiment.pkl", 'rb') as f:
    feed_json = pickle.load(f)

_obj = InformationCollationPipeline(api_key=API_KEY, tickers=TICKERS)
_obj.process_sentiment()
_obj.signals_df

100%|██████████| 36/36 [00:00<00:00, 692637.36it/s]


Unnamed: 0_level_0,sentiment_score,direction,size,conviction,execution_time,holding_period
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
TSLA,0.128741,hold,,small,,
IBM,,,,,,


In [152]:
with open("pickled_feeds/insider_moves.pkl", 'rb') as f:
    it_json = pickle.load(f)

it_json

{}