In [3]:
import yfinance as yf
import datetime
import pandas as pd
import plotly.express as px
import requests
import os
import dotenv
import plotly.graph_objects as go
import numpy as np
import time
import datetime
import pandas as pd
from alpha_vantage.timeseries import TimeSeries

In [4]:
# Retrieve names of all stocks in the S&P 500
url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
html = requests.get(url).content
df_list = pd.read_html(html)
df = df_list[0]
sp_500 = df['Symbol'].tolist()

In [5]:
def calculate_stock_performance(previouse_price: float, current_price : float) -> float: 
    """
        Calculates the percent change between the open and close price of a stock
        - Params: 
            - previouse_price: float: open price of the stock
            - current_price: float: close price of the stock
        - Returns: float: percent change
    """
    return round((current_price - previouse_price) / previouse_price * 100, 2)



def get_percent_change(ticker) -> float | None:
    """
        Retrieves the percent change of a stock in the last trading day
        - Params:
            - ticker: str
        - Returns: float: percent change
    """
    try:
        ticker_data = yf.download(tickers=ticker, period='2d', interval='1d')
        ticker_data['percent_change'] = ticker_data['Close'].pct_change()
        current_price = ticker_data['Close'][1]
        previouse_price = ticker_data['Close'][0]
        return calculate_stock_performance(previouse_price, current_price)
    except:
        print('Could not retrieve data for ticker: ' + ticker)
        return None


In [6]:
info = yf.Ticker('AMZN').fast_info
for i in info: 
    print(i, info[i])

currency USD
exchange NMS
timezone America/New_York
shares 10247300096
market_cap 1046351819057.0156
last_price 102.11000061035156
previous_close 102.18000030517578
open 101.16999816894531
day_high 102.41000366210938
day_low 98.08000183105469
regular_market_previous_close 102.18000030517578
last_volume 119384500
fifty_day_average 92.66419998168945
two_hundred_day_average 111.45869525909424
ten_day_average_volume 97178990
three_month_average_volume 81600922
year_high 170.8314971923828
year_low 81.43000030517578
year_change -0.36740110385365643


In [7]:
def get_company_market_cap(ticker: str) -> float | None:
    """
        Retrieves the market cap of a company
        - Params:
            - ticker: str
        - Returns: float: market cap
    """
    try:
        info = yf.Ticker(ticker).fast_info
        return info['market_cap']
    except:
        print(f'Could not retrieve data for ticker: {ticker}')
        return None

In [8]:
def check_company_ticker(ticker: str) -> str:
    """
        Checks if a company ticker is a valid ticker
        - Params:
            - ticker: str: company ticker
        - Returns: str: company ticker
    """
    if ticker.endswith('.B'):
        return ticker[:-2]
    return ticker

In [9]:
def get_stock_data(tickers: list) -> list:
    """
        Retrieves the market cap and percent change of a list of stocks
        - Params:
            - tickers: list: list of stock tickers
        - Returns: list: list of stock data in the form of a dictionary
    """
    companies = []
    for ticker in tickers: 
        try: 
            ticker = check_company_ticker(ticker)
            market_cap = get_company_market_cap(ticker)
            performance = get_percent_change(ticker)
            stock = {
                'ticker': ticker,
                'market_cap': market_cap,
                'performance': performance
            }
            companies.append(stock)
        except:
            print(f'Could not retrieve data for ticker: {ticker}')
        

In [10]:
def write_into_excel(companies):
    """
        Writes the data into an excel file
        - Params:
            - companies: list
    """
    df = pd.DataFrame(companies)
    df.to_excel('companies.xlsx', index=False)

In [11]:
def get_sector(ticker: str)-> str:
    """
        Retrieves the sector of a company
        - Params:
            - ticker: str: company ticker
        - Returns: str: sector
    """
    try:
        info = yf.Ticker(ticker).info
        return info['sector']
    except:
        print(f'Could not retrieve data for ticker: {ticker}')
        return None

In [24]:
import pandas as pd
import os
import dotenv
import requests
import csv



class SecurityDataHandler:
    def __init__(self, ticker: str) -> None:
        self._ticker = ticker
        dotenv.load_dotenv()
        self._api_key = os.environ["ALPHA_VANTAGE_API_KEY"]

    def __str__(self) -> str:
        return f"SecurityDataHandler({self.ticker} {self.api_key})"

    def __repr__(self) -> str:
        return f"SecurityDataHandler({self.ticker}, {self.api_key})"

    @property
    def ticker(self) -> str:
        return self._ticker

    @ticker.setter
    def ticker(self, ticker: str) -> None:
        self._ticker = ticker

    @property
    def api_key(self) -> str:
        return self._api_key

    @api_key.setter
    def api_key(self, api_key: str) -> None:
        self._api_key = api_key

    def get_data(self, interval: str = "1min", outputsize: str = "compact") -> pd.DataFrame:
        url = f'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={self.ticker}&interval={interval}&apikey={self._api_key}&outputsize={outputsize}'
        response = requests.get(url)
        data = response.json()
        data = data["Time Series (1min)"]
        data = pd.DataFrame(data).T
        data.index = pd.to_datetime(data.index)
        data = data.astype(float)
        return data

    def get_company_info(self, parameter: str = None, function : str ="OVERVIEW")-> str:
        url = f'https://www.alphavantage.co/query?function={function}&symbol={self.ticker}&apikey={self._api_key}'
        response = requests.get(url)
        data = response.json()
        if parameter is None:
            return data
        return data[parameter]


    def get_full_name(self) -> str:
        return self.get_company_info(parameter="Name")

    def get_description(self) -> str:
        return self.get_company_info(parameter="Description")
    
    def get_exchange(self)-> str:
        return self.get_company_info(parameter="Exchange")
    
    def get_country(self)-> str:
        return self.get_company_info(parameter="Country")
    
    def get_sector(self)-> str:
        return self.get_company_info(parameter="Sector")
    
    def get_industry(self)-> str:
        return self.get_company_info(parameter="Industry")
    
    def get_market_cap(self)-> str:
        return self.get_company_info(parameter="MarketCapitalization")
    
    def get_ebitda(self)-> str:
        return self.get_company_info(parameter="EBITDA")
    
    def get_pe_ratio(self)-> str:
        return self.get_company_info(parameter="PERatio")
    
    def get_eps(self)-> str:
        return self.get_company_info(parameter="EPS")
    
    def get_peg_ratio(self)-> str:
        return self.get_company_info(parameter="PEGRatio")
    
    def get_book_value(self)-> str:
        return self.get_company_info(parameter="BookValue")
    
    def get_dividend_per_share(self)-> str:
        return self.get_company_info(parameter="DividendPerShare")
    
    def get_dividend_yield(self)-> str:
        return self.get_company_info(parameter="DividendYield")
    
    def geT_profit_margin(self)-> str:
        return self.get_company_info(parameter="ProfitMargin")
    
    def get_operating_margin(self)-> str:
        return self.get_company_info(parameter="OperatingMarginTTM")
    
    def get_return_on_assets(self)-> str:
        return self.get_company_info(parameter="ReturnOnAssetsTTM")
    
    def get_return_on_equity(self)-> str:
        return self.get_company_info(parameter="ReturnOnEquityTTM")

    def get_revenue(self)-> str:
        return self.get_company_info(parameter="RevenueTTM")
    
    def get_gross_profit(self)-> str:
        return self.get_company_info(parameter="GrossProfitTTM")
    
    def get_quarterly_earning_growth(self)-> str:
        return self.get_company_info(parameter="QuarterlyEarningsGrowthYOY")
    
    def get_quarterly_revenue_growth(self)-> str:
        return self.get_company_info(parameter="QuarterlyRevenueGrowthYOY")
    
    def get_analyst_target_price(self)-> str:
        return self.get_company_info(parameter="AnalystTargetPrice")
    
    def get_trailing_pe(self)-> str:
        return self.get_company_info(parameter="TrailingPE")
    
    def get_forward_pe(self)-> str:
        return self.get_company_info(parameter="ForwardPE")
    
    def get_price_to_sales_ratio(self)-> str:
        return self.get_company_info(parameter="PriceToSalesRatioTTM")
    
    def get_price_to_book_ratio(self)-> str:
        return self.get_company_info(parameter="PriceToBookRatio")
    
    def get_beta(self)-> str:
        return self.get_company_info(parameter="Beta")
    
    def get_52_week_high(self)-> str:
        return self.get_company_info(parameter="52WeekHigh")
    
    def get_52_week_low(self)-> str:
        return self.get_company_info(parameter="52WeekLow")
    
    def get_50_day_moving_average(self)-> str:
        return self.get_company_info(parameter="50DayMovingAverage")
    
    def get_200_day_moving_average(self)-> str:
        return self.get_company_info(parameter="200DayMovingAverage")
    
    def get_share_outstanding(self)-> str:
        return self.get_company_info(parameter="SharesOutstanding")
    
    def get_dividend_date(self)-> str:
        return self.get_company_info(parameter="DividendDate")
    
    def get_ex_dividend_date(self)-> str:
        return self.get_company_info(parameter="ExDividendDate")
    
    def get_income_statement(self):
        return self.get_company_info(function="INCOME_STATEMENT")
    
    def get_balance_sheet(self):
        return self.get_company_info(function="BALANCE_SHEET")
    
    def get_cash_flow(self):
        return self.get_company_info(function="CASH_FLOW")
    
    def get_earnings(self):
        """
            Get the earnings for a company.
            - returns:
                - dict: The earnings for the company.
        """
        return self.get_company_info(function="EARNINGS")
    
    def get_earnings_calendar(self, horizon: str="3month" ) -> dict | None:
        """
            Get the earnings calendar for a company for a given horizon. 
            - params: 
                - horizon (str): The horizon for which to get the earnings calendar. 
                    - 3month: 3 months
                    - 6month: 6 months
                    - 12month: 12 months
            - returns: 
                - dict: The earnings calendar for the company.
        """
        url = f'https://www.alphavantage.co/query?function=EARNINGS_CALENDAR&symbol={self.ticker}&horizon={horizon}&apikey={self.api_key}'
        with requests.Session() as s:
            download = s.get(url)
            decoded_content = download.content.decode('utf-8')
            cr = csv.reader(decoded_content.splitlines(), delimiter=',')
            my_list = list(cr)
            if len(my_list) > 1:
                return my_list
        return None

    
    def get_sma(self, interval: str = "daily", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Get the simple moving average for a given interval. 
            - params: 
                - interval (str): The interval for which to get the simple moving average. 
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the simple moving average. 
                - series_type (str): The series type for which to get the simple moving average. 
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns: 
                - dict: The simple moving average for the given interval.
        """
        url = f'https://www.alphavantage.co/query?function=SMA&symbol={self.ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    
    def get_ema(self, interval: str = "daily", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Get the exponential moving average for a given interval.
            - params: 
                - interval (str): The interval for which to get the exponential moving average. 
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the exponential moving average. 
                - series_type (str): The series type for which to get the exponential moving average. 
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The exponential moving average for the given interval.
        """
        url = f'https://www.alphavantage.co/query?function=EMA&symbol={self.ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    

class Economic_Calendar:
    def __init__(self, api_key: str) -> None:
        self._api_key = api_key
    
    def __str__(self) -> str:
        return f"api_key: {self._api_key}"
    
    def __repr__(self) -> str:
        return f"Economic_Calendar(api_key={self._api_key})"
    
    @property
    def api_key(self) -> str:
        return self._api_key
    
    @api_key.setter
    def api_key(self, api_key: str) -> None:
        self._api_key = api_key
    
    def filter_period(self, data: pd.DataFrame ,from_period : str, to_period: str, output_format: str)-> dict | pd.DataFrame | None:
        """
            This method will filter out the data for given period and reutrn the data.
            - params:
                - data (pd.DataFrame): The data to filter.
                - from_period (str): The start date.
                - to_period (str): The end date.
                - output_format (str): The output format for the data.
                    - dataframe: dataframe
                    - dict: dict
            - returns:
                - dict | pd.DataFrame: The filtered data.
        """
        if output_format == "dataframe":
            return data.loc[(data["date"] >= from_period) & (data["date"] <= to_period)]
        elif output_format == "dict":
            return data.loc[(data["date"] >= from_period) & (data["date"] <= to_period)].to_dict()
        else:
            return None

    def get_real_gdp(self, interval="annual") -> dict | None:
        """
            Get the real GDP for a given interval. 
            - params: 
                - interval (str): The interval for which to get the real GDP. 
                    - annual: annual
                    - quarterly: quarterly
            - returns: 
                - dict: The real GDP for the given interval.
        """
        url = f'https://www.alphavantage.co/query?function=REAL_GDP&interval={interval}&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_gdp_per_capita(self):
        url = f'https://www.alphavantage.co/query?function=REAL_GDP_PER_CAPITA&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_cpi(self, interval: str = "monthly") -> dict | None:
        """
            Get the consumer price index for a given interval. 
            - params: 
                - interval (str): The interval for which to get the consumer price index. 
                    - monthly: monthly
                    - semiannual: semiannual
            - returns: 
                - dict: The consumer price index for the given interval.
        """
        url = f'https://www.alphavantage.co/query?function=CPI&interval={interval}&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_retail_sales(self) -> dict:
        """
            Get the retail sales.
            - returns:
                - dict: The retail sales.
        """
        url = f'https://www.alphavantage.co/query?function=RETAIL_SALES&apikey={self.api_key}'
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_unemployment_rate(self) -> dict | None:
        """
            Get the unemployment rate.
            - returns:
                - dict: The unemployment rate.
        """
        url = f"https://www.alphavantage.co/query?function=UNEMPLOYMENT&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_nonfarm_payroll(self) -> dict | None:
        url = f"https://www.alphavantage.co/query?function=NONFARM_PAYROLL&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None


class Technical_Indicators:
    def __init__(self) -> None:
        dotenv.load_dotenv()
        self._api_key = os.getenv("ALPHA_VANTAGE_API_KEY")
    
    @property
    def api_key(self) -> str:
        return self._api_key
    
    @api_key.setter
    def api_key(self, api_key: str) -> None:
        self._api_key = api_key
    
    def __str__(self) -> str:
        return f"api_key: {self._api_key}"
    
    def __repr__(self) -> str:
        return f"Technical_Indicators(api_key={self._api_key})"
    
    def get_moving_average(self, ticker: str, moving_average_type: str= "SMA", interval : str ="1min", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Learn more about different moving averages at: https://www.investopedia.com/terms/m/movingaverage.asp
            Get the moving average for a given interval.
            - params:
                - ticker (str): The ticker for which to get the moving average.
                - moving_average_type (str): The moving average type for which to get the moving average.
                    - SMA: simple moving average
                    - EMA: exponential moving average
                    - WMA: weighted moving average
                    - DEMA: double exponential moving average
                    - TEMA: triple exponential moving average
                    - TRIMA: triangular moving average
                    - KAMA: Kaufman adaptive moving average
                    - MAMA: MESA adaptive moving average
                    - T3: triple exponential moving average (T3)
                - interval (str): The interval for which to get the moving average.
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the moving average.
                - series_type (str): The series type for which to get the moving average.
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The moving average for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function={moving_average_type}&symbol={ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_macd(self, ticker: str, interval: str = "1min", series_type: str = "close") -> dict | None:
        """
            Learn more about MACD at: https://www.investopedia.com/terms/m/macd.asp
            Get the moving average convergence divergence (MACD) for a given interval.
            - params:
                - ticker (str): The ticker for which to get the moving average convergence divergence (MACD).
                - interval (str): The interval for which to get the moving average convergence divergence (MACD).
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - series_type (str): The series type for which to get the moving average convergence divergence (MACD).
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The moving average convergence divergence (MACD) for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function=MACD&symbol={ticker}&interval={interval}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    
    def get_stoch(self, ticker: str, interval: str = "1min", series_type: str = "close") -> dict | None:
        """
            Learn more about stochastic oscillator at: https://www.investopedia.com/terms/s/stochasticoscillator.asp
            Get the stochastic oscillator for a given interval.
            - params:
                - ticker (str): The ticker for which to get the stochastic oscillator.
                - interval (str): The interval for which to get the stochastic oscillator.
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - series_type (str): The series type for which to get the stochastic oscillator.
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The stochastic oscillator for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function=STOCH&symbol={ticker}&interval={interval}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    
    def get_rsi(self, ticker: str, interval: str = "1min", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Learn more about relative strength index (RSI) at: https://www.investopedia.com/terms/r/rsi.asp
            Get the relative strength index (RSI) for a given interval.
            - params:
                - ticker (str): The ticker for which to get the relative strength index (RSI).
                - interval (str): The interval for which to get the relative strength index (RSI).
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the relative strength index (RSI).
                - series_type (str): The series type for which to get the relative strength index (RSI).
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The relative strength index (RSI) for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function=RSI&symbol={ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    
    def get_bolinger_bands(self, ticker, interval: str = "1min", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Learn more about bolinger bands at: https://www.investopedia.com/terms/b/bollingerbands.asp
            Get the bolinger bands for a given interval.
            - params:
                - ticker (str): The ticker for which to get the bolinger bands.
                - interval (str): The interval for which to get the bolinger bands.
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the bolinger bands.
                - series_type (str): The series type for which to get the bolinger bands.
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The bolinger bands for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function=BBANDS&symbol={ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None

    def get_atr(self, ticker: str, interval: str = "1min", time_period: int = 10, series_type: str = "close") -> dict | None:
        """
            Learn more about average true range (ATR) at: https://www.investopedia.com/terms/a/atr.asp \n
            Get the average true range (ATR) for a given interval.
            - params:
                - ticker (str): The ticker for which to get the average true range (ATR).
                - interval (str): The interval for which to get the average true range (ATR).
                    - daily: daily
                    - weekly: weekly
                    - monthly: monthly
                - time_period (int): The time period for which to get the average true range (ATR).
                - series_type (str): The series type for which to get the average true range (ATR).
                    - open: open
                    - high: high
                    - low: low
                    - close: close
            - returns:
                - dict: The average true range (ATR) for the given interval.
        """
        url = f"https://www.alphavantage.co/query?function=ATR&symbol={ticker}&interval={interval}&time_period={time_period}&series_type={series_type}&apikey={self.api_key}"
        r = requests.get(url)
        return requests.get(url).json() if r.status_code == 200 else None
    



In [26]:
economic_calendar = Economic_Calendar(api_key=os.environ.get("ALPHA_VANTAGE_API_KEY"))

print(economic_calendar.get_industrial_production())


{'Error Message': 'This API function (INDUSTRIAL_PRODUCTION) does not exist.'}
