In [15]:
#import needed libraries 
from fastapi import FastAPI, Query, Path,HTTPException
from typing import Optional
from datetime import date
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os

In [3]:
#load environment variables
load_dotenv()

PGHOST = os.getenv("PGHOST")
PGPORT = os.getenv("PGPORT", "5432")
PGDATABASE = os.getenv("PGDATABASE")
PGUSER = os.getenv("PGUSER")
PGPASSWORD = os.getenv("PGPASSWORD")

In [4]:
# Create SQLAlchemy engine 
connection_string = f"postgresql+psycopg2://{PGUSER}:{PGPASSWORD}@{PGHOST}:{PGPORT}/{PGDATABASE}"
print("Connection psql string:", connection_string)

engine = create_engine(connection_string, pool_pre_ping=True)

Connection psql string: postgresql+psycopg2://kir2351763:password@localhost:5432/mse_db


In [5]:
company_sector_mapping = {
    'AIRTEL': 'Telecommunication',
    'BHL': 'Hospitality',
    'FDHB': 'Finance',
    'FMBCH': 'Finance',
    'ICON': 'Construction',
    'ILLOVO': 'Agriculture',
    'MPICO': 'Construction',
    'NBM': 'Finance',
    'NBS': 'Finance',
    'NICO': 'Finance',
    'NITL': 'Finance',
    'OMU': 'Finance',
    'PCL': 'Investments',
    'STANDARD': 'Finance',
    'SUNBIRD': 'Hospitality',
    'TNM': 'Telecommunication'
}

In [6]:
# Initialize FastAPI app
app = FastAPI()

In [7]:
# Home end-point
@app.get("/")
def home():
    return {"message": "Welcome to the MSE API!"}

In [8]:
#First end-point: Call with Query parameters
@app.get("/companies")
def get_companies(sector: Optional[str] = Query(None, description="Filter companies by sector")):
    """Get list of companies with optional sector filtering."""
    df=pd.read_sql("SELECT * FROM tickers", con=engine)
    df['Sector'] = df['ticker'].map(company_sector_mapping)
    df=df[['ticker','name','Sector','date_listed']]
    if sector:
        df=df[df['Sector']==sector]
    data=df.to_dict(orient='records')
    return {'count':len(data), 'data':data}


In [9]:
#Second end-point: Call with Path parameters
@app.get("/companies/{ticker}")
def get_company_details(ticker:str):
    """Get company details and total records for a given ticker symbol."""
    #counter details
    df=pd.read_sql("SELECT * FROM tickers t JOIN daily_prices p ON t.counter_id=p.counter_id", con=engine)
    df['Sector'] = df['ticker'].map(company_sector_mapping)
    df=[df['ticker']==ticker]
    df=df[['ticker','name','Sector','date_listed']]
    company_details=df.to_dict(orient='records')
    records=len(df)
    return {'Company details':company_details,'Total records':records}

In [18]:
def get_company_details(ticker:str):
    """Get company details and total records for a given ticker symbol."""
    #ticker details
    query = text("""
    SELECT t.ticker, t.name, t.date_listed, p.counter_id
    FROM tickers t
    JOIN daily_prices p ON t.counter_id = p.counter_id
    WHERE t.ticker = :ticker
    """)
    df = pd.read_sql(query, con=engine, params={"ticker": ticker})
    if df.empty:
        raise HTTPException(status_code=404, detail=f"No company found for ticker '{ticker}'.")
    df['Sector'] = df['ticker'].map(company_sector_mapping)
    df = df[df['ticker'] == ticker]
    df=df[['ticker','name','Sector','date_listed']]
    company_details = df[['ticker', 'name', 'Sector', 'date_listed']].drop_duplicates().to_dict(orient='records')
    total_records = len(df)
    return {'Company details':company_details,'Total records':total_records} 

In [19]:
get_company_details("AIRTEL")

{'Company details': [{'ticker': 'AIRTEL',
   'name': 'AIRTEL MALAWI PLC',
   'Sector': 'Telecommunication',
   'date_listed': datetime.date(2020, 2, 24)}],
 'Total records': 1209}

In [None]:
#Third end-point
@app.get("/prices/daily")
def get_daily_prices(
    ticker: str = Query(..., description="Stock ticker symbol"),
    start_date: Optional[date] = Query(None, description="Start date (YYYY-MM-DD)"),
    end_date: Optional[date] = Query(None, description="End date (YYYY-MM-DD)"),
    limit: Optional[int] = Query(100, description="Maximum records to return")
    ): 
    """ Get daily price data for a given ticker symbol within an optional date range and limit."""
    #fetch counter name from counter table
    df=pd.read_sql("SELECT * FROM tickers t JOIN daily_prices p ON t.counter_id=p.counter_id", con=engine)
    df=df[df['ticker']==ticker]
    df.rename(columns={['open_mwk','high_mwk','low_mwk','close_mwk','volume','trade_date']:['open',' high','low','close','volume','trade_date']}, inplace=True)

    # Filter by date range
    if start_date:
        df = df[df['trade_date'] >= start_date]
    if end_date:
        df = df[df['trade_date'] <= end_date]

    # Apply limit (max 1000)
    limit = min(limit or 100, 1000)
    df = df.head(limit)
    df = df.fillna('') 
    return {"Company": ticker, "data":df.to_dict(orient='records')}

In [None]:
def get_daily_prices_by_date_range(
    ticker: str = Query(..., description="Stock ticker symbol"),
    start_date: Optional[date] = None,
    end_date: Optional[date] = None,
    limit: Optional[int] = 100):
    """ Get daily price data for a given ticker symbol within an optional date range and limit."""
    #fetch counter name from counter table
    query = text("""
    SELECT  p.trade_date, p.open_mwk, p.high_mwk, p.low_mwk, p.close_mwk, p.volume
    FROM tickers t
    JOIN daily_prices p ON t.counter_id = p.counter_id
    WHERE t.ticker = :ticker
    """)
    df = pd.read_sql(query, con=engine, params={'ticker': ticker})
    df.rename(columns={
    'open_mwk': 'open',
    'high_mwk': 'high',
    'low_mwk': 'low',
    'close_mwk': 'close',
    'volume': 'volume',
    'trade_date': 'trade_date'
    }, inplace=True)
    print(df)
    df['trade_date'] = pd.to_datetime(df['trade_date'])
    # Filter by date range
    
    if start_date:
        df = df[df['trade_date'] >=start_date]
    if end_date:
        df = df[df['trade_date'] <=end_date]

    # Apply limit (max 1000)
    limit = min(limit or 100, 1000)
    df = df.head(limit)
    df = df.fillna('') 
    return {"Company": ticker, "data":df.to_dict(orient='records')}
get_daily_prices_by_date_range("NICO")

      trade_date  open     high      low    close    volume
0     2025-09-19   2.0  1739.93  1739.91  1739.92  102670.0
1     2025-09-18   2.0  1739.98  1739.92  1739.94   94047.0
2     2025-09-17   2.0  1739.98  1739.78  1739.96   32696.0
3     2025-09-15   2.0  1740.00  1739.97  1739.99   28044.0
4     2025-09-12   2.0  1740.01  1739.99  1740.00  165086.0
...          ...   ...      ...      ...      ...       ...
1771  2017-07-10   2.0  1350.00  1350.00  1350.00  150000.0
1772  2017-07-07   2.0      NaN      NaN      NaN       NaN
1773  2017-07-04   2.0      NaN      NaN      NaN       NaN
1774  2017-06-30   2.0      NaN      NaN      NaN       NaN
1775  2017-06-29   2.0      NaN      NaN      NaN       NaN

[1776 rows x 6 columns]
ggoing for date range
start date None


{'Company': 'NICO',
 'data': [{'trade_date': Timestamp('2025-09-19 00:00:00'),
   'open': 2.0,
   'high': 1739.93,
   'low': 1739.91,
   'close': 1739.92,
   'volume': 102670.0},
  {'trade_date': Timestamp('2025-09-18 00:00:00'),
   'open': 2.0,
   'high': 1739.98,
   'low': 1739.92,
   'close': 1739.94,
   'volume': 94047.0},
  {'trade_date': Timestamp('2025-09-17 00:00:00'),
   'open': 2.0,
   'high': 1739.98,
   'low': 1739.78,
   'close': 1739.96,
   'volume': 32696.0},
  {'trade_date': Timestamp('2025-09-15 00:00:00'),
   'open': 2.0,
   'high': 1740.0,
   'low': 1739.97,
   'close': 1739.99,
   'volume': 28044.0},
  {'trade_date': Timestamp('2025-09-12 00:00:00'),
   'open': 2.0,
   'high': 1740.01,
   'low': 1739.99,
   'close': 1740.0,
   'volume': 165086.0},
  {'trade_date': Timestamp('2025-09-11 00:00:00'),
   'open': 2.0,
   'high': 1740.01,
   'low': 1740.0,
   'close': 1740.01,
   'volume': 78775.0},
  {'trade_date': Timestamp('2025-09-10 00:00:00'),
   'open': 2.0,
   'hi

In [None]:
#Fourth end-point
@app.get("/prices/range")
def get_daily_prices(
    ticker: str = Query(..., description="Stock ticker symbol"),
    year: int = Query(..., description="Year"),
    month: Optional[int] = Query(None, description="Month of the year"),
    ):
    """ Get daily price data for a given ticker symbol within a specified year and optional month."""
    #fetch counter name from counter table
    df=pd.read_sql("SELECT p.trade_date,p.open_mwk,p.high_mwk,p.low_mwk,p.close_mwk,p.volume FROM tickers t JOIN daily_prices p ON t.counter_id=p.counter_id", con=engine)
    df.columns=['Period','open',' high','low','close','Total Volume']

    df['Period'] = pd.to_datetime(df['Period'])
    df = df[df['Period'].dt.year == year]
    if month:
        df = df[df['Period'].dt.month == month]

    df = df.fillna('') 
    return {"Company": ticker, "data":df.to_dict(orient='records')}


In [44]:
def get_daily_prices_by_period(
    ticker: str = Query(..., description="Stock ticker symbol"),
    year: int = Query(..., description="Year"),
    month: Optional[int] = Query(default=None, description="Month of the year"),
):
    """Get daily price data for a given ticker symbol within a specified year and optional month."""
    
    query = text("""
        SELECT p.trade_date, p.open_mwk, p.high_mwk, p.low_mwk, p.close_mwk, p.volume
        FROM tickers t
        JOIN daily_prices p ON t.counter_id = p.counter_id
        WHERE t.ticker = :ticker
    """)
    
    df = pd.read_sql(query, con=engine, params={"ticker": ticker})
    
    df.columns = ['Period', 'Open', 'High', 'Low', 'Close', 'TotalVolume']
    df['Period'] = pd.to_datetime(df['Period'])
    
    df = df[df['Period'].dt.year == year]
    print(df)
    print("month",month)
    print("*********************")
    if month:
        df = df[df['Period'].dt.month == month]
    print(df)
    df['Period'] = df['Period'].dt.strftime('%Y-%m-%d')
    df = df.fillna('')
    
    return {"Company": ticker, "data": df.to_dict(orient='records')}

get_daily_prices_by_period('NICO',2025)

        Period  Open     High      Low    Close  TotalVolume
0   2025-09-19   2.0  1739.93  1739.91  1739.92     102670.0
1   2025-09-18   2.0  1739.98  1739.92  1739.94      94047.0
2   2025-09-17   2.0  1739.98  1739.78  1739.96      32696.0
3   2025-09-15   2.0  1740.00  1739.97  1739.99      28044.0
4   2025-09-12   2.0  1740.01  1739.99  1740.00     165086.0
..         ...   ...      ...      ...      ...          ...
171 2025-01-08   2.0   419.00   419.00   419.00      11933.0
172 2025-01-07   2.0   419.02   419.00   419.00      12885.0
173 2025-01-06   2.0   419.02   419.01   419.01       7503.0
174 2025-01-03   2.0   419.02   419.02   419.02      86914.0
175 2025-01-02   2.0   419.02   419.01   419.02      20714.0

[176 rows x 6 columns]
month annotation=NoneType required=False default=None description='Month of the year' json_schema_extra={}
*********************
Empty DataFrame
Columns: [Period, Open, High, Low, Close, TotalVolume]
Index: []


{'Company': 'NICO', 'data': []}

In [None]:
#Fifth end-point
@app.get("/prices/latest")
def get_recent_prices(
    ticker: Optional[str] = Query(None, description="Stock ticker symbol"),
    ):
    """ Get the most recent price data for a given ticker symbol, including latest price,
    previous price, and percentage change."""
     #fetch counter name from counter table
    df=pd.read_sql("SELECT  p.trade_date,p.open_mwk,p.high_mwk,p.low_mwk,p.close_mwk,p.volume FROM tickers t JOIN daily_prices p ON t.counter_id=p.counter_id", con=engine)
    df.columns=['trade_date','open',' high','low','close','Total Volume']

    df['trade_date'] = pd.to_datetime(df['trade_date'])
    df_sorted = df.sort_values(by='trade_date', ascending=False).reset_index(drop=True)

    # Latest price
    latest = df_sorted.iloc[0]
    latest_date = latest['trade_date']
    latest_price = latest['close']
    
    # Previous price
    if len(df_sorted) > 1:
        prev_price = df_sorted.iloc[1]['close']
        change = latest_price - prev_price
        change_percentage = (change / prev_price) * 100 if prev_price != 0 else 0
    else:
        prev_price = None
        change = None
        change_percentage = None
    return {
        "ticker": ticker,
        "latest_date": latest_date,
        "latest_price": latest_price,
        "previous_price": prev_price,
        "change": change,
        "change_percentage": str(round(change_percentage,3))+'%'
    }

In [14]:

import uvicorn
uvicorn.run("app", host="127.0.0.1", port=8000, reload=True)

INFO:     Will watch for changes in these directories: ['/home/kir2351763/RRA/other trainings/DSCBI/assignments/mse-api-assignment/notebooks']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [12585] using WatchFiles
ERROR:    Error loading ASGI app. Import string "app" must be in format "<module>:<attribute>".
INFO:     Stopping reloader process [12585]
