In [9]:
!pip install pandas numpy pdfplumber
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os
import re



In [12]:
DIR_WORKSPACE = Path.cwd().parents[0]
DIR_DATA = DIR_WORKSPACE / "data"
DIR_REPORTS_CSV = DIR_DATA / "mse-daily-data"
DIR_OUTPUT = DIR_DATA / "combined_output_data"

In [None]:
# Collect and combine all CSV files
all_csv_files = DIR_REPORTS_CSV.glob("*.csv")  # all CSVs in that directory
df_list = [pd.read_csv(f) for f in all_csv_files]

if df_list:  # make sure it's not empty
    combined_df = pd.concat(df_list, ignore_index=True)
    combined_df['counter_id'] = range(1, len(combined_df['counter_id']) + 1)
    print("Combined shape:", combined_df.shape)

    # Save to a single CSV
    output_file = DIR_OUTPUT / "combined_reports.csv"
    combined_df.to_csv(output_file, index=False)
    print("Combined CSV saved at:", output_file)
else:
    print("No CSV files found in", DIR_REPORTS_CSV)


Combined shape: (17872, 19)
Combined CSV saved at: d:\Documents\AIMS_DSCBI_Training\mse-api-assignment\data\combined_output_data\combined_reports.csv


In [None]:
combined_df.head(100)
combined_df.shape
combined_df.info()
combined_df.describe()
combined_df['counter'].value_counts()
combined_df.shape

In [None]:
# Load the combined CSV file into a DataFrame
combined_stock_df = pd.read_csv(output_file)
print(combined_stock_df.head())
print(combined_stock_df["counter"].unique())

company_map = {
    "AIRTEL": "AIRTEL MALAWI PLC",
    "BHL": "BLANTYRE HOTELS PLC",
    "FDHB": "FDH BANK PLC",
    "FMBCH": "FMB CAPITAL HOLDINGS PLC",
    "ICON": "ICON PROPERTIES PLC",
    "ILLOVO": "ILLOVO SUGAR MALAWI PLC",
    "MPICO": "MPICO PLC",
    "NBM": "NATIONAL BANK OF MALAWI",
    "NBS": "NBS BANK PLC",
    "NICO": "NICO HOLDINGS PLC",
    "NITL": "NATIONAL INVESTMENT TRUST PLC",
    "OMU": "OLD MUTUAL LIMITED",
    "PCL": "PRESS CORPORATION PLC",
    "STANDARD": "STANDARD BANK MALAWI PLC",
    "SUNBIRD": "SUNBIRD TOURISM PLC",
    "TNM": "TELEKOM NETWORKS MALAWI PLC"
}
# Add a new column with the full name
combined_stock_df["name"] = combined_stock_df["counter"].map(company_map)
combined_stock_df = combined_stock_df.rename(columns={
    "counter": "ticker",
    "trade_date": "date_listed",
    "buy_price": "listing_price"
})

In [2]:
from dotenv import load_dotenv
import os

load_dotenv()

PGHOST = os.getenv("PGHOST", "").strip()
PGPORT = os.getenv("PGPORT", "").strip()
PGPORT = int(''.join(filter(str.isdigit, PGPORT))) if PGPORT else 5432
PGDATABASE = os.getenv("PGDATABASE", "").strip()
PGUSER = os.getenv("PGUSER", "").strip()

# Optional: print to confirm
print(PGHOST, PGPORT, PGDATABASE, PGUSER)


localhost 5432 mse_database postgres


# sql tools for create and link database and tables
D:\Documents\AIMS_DSCBI_Training\mse-api-assignment>psql -U postgres
CREATE DATABASE mse_database; # Create a database
postgres=# \c mse_database # connect to database or postgres=# \connect mse_database
mse_database=# \dt # check content(tables) of the db
## Create tables under database (mse_database)
CREATE TABLE IF NOT EXISTS counters (
    counter_id TEXT PRIMARY KEY,
    ticker TEXT NOT NULL,
    name TEXT NOT NULL,
    date_listed DATE,
    listing_price NUMERIC(10,2)
);

CREATE TABLE IF NOT EXISTS prices_daily (
    counter_id TEXT REFERENCES counters(counter_id),
    trade_date DATE,
    open_mwk NUMERIC(10,2),
    high_mwk NUMERIC(10,2),
    low_mwk NUMERIC(10,2),
    close_mwk NUMERIC(10,2),
    volume BIGINT,
    PRIMARY KEY (counter_id, trade_date)
);
# populate the tables(counters) of database 
INSERT INTO counters (counter_id, ticker, name, date_listed, listing_price)
OVERRIDING SYSTEM VALUE
VALUES
  (1,  'AIRTEL',   'Airtel Malawi plc',                          '2020-02-24', 12.69),
  (2,  'BHL',      'Blantyre Hotels plc',                        '1997-03-25',  0.84),
  (3,  'FDHB',     'FDH Bank plc',                               '2020-08-03', 10.00),
  (4,  'FMBCH',    'FMB Capital Holdings plc',                   '2017-09-18', 45.01),
  (5,  'ICON',     'Icon Properties plc',                        '2019-01-21',  8.75),
  (6,  'ILLOVO',   'Illovo Sugar Malawi plc',                    '1997-11-10',  2.25),
  (7,  'MPICO',    'Malawi Property Investment Company plc',     '2007-08-28',  1.00),
  (8,  'NBM',      'National Bank of Malawi plc',                '2000-08-21',  4.00),
  (9,  'NBS',      'NBS Bank plc',                               '2007-06-25',  2.60),
  (10, 'NICO',     'NICO Holdings plc',                          '1996-11-11',  2.00),
  (11, 'NITL',     'National Investment Trust plc',              '2005-03-21',  2.65),
  (12, 'OMU',      'Old Mutual Limited',                         '2018-06-26', 1580.22),
  (13, 'PCL',      'Press Corporation plc',                      '1998-09-09', 14.89),
  (14, 'STANDARD', 'Standard Bank Malawi plc',                   '1998-06-29',  3.25),
  (15, 'SUNBIRD',  'Sunbird Tourism plc',                        '2002-08-21',  2.60),
  (16, 'TNM',      'Telekom Networks Malawi plc',                '2008-11-25',  5.00);
## check content of counter table
  mse_database=# \d counters
## drop tables from database
mse_database=# DROP TABLE counter CASCADE; # drop all relations/references
mse_database=# DROP TABLE counter;
  
## To browse the data from table counters
mse_database=# SELECT * FROM counters;
mse_database-# limit(10);
mse_database-# SELECT counter_id, trade_date FROM counters;
mse_database=# \d+ prices_daily #check general information of table


## Update existing counter_id values in counters table
UPDATE counters SET counter_id = 'MWAIRT001156' WHERE ticker = 'AIRTEL';
UPDATE counters SET counter_id = 'MWBHL0010029' WHERE ticker = 'BHL';
UPDATE counters SET counter_id = 'MWFDHB001166' WHERE ticker = 'FDH';
UPDATE counters SET counter_id = 'MWFMB0010138' WHERE ticker = 'FMB';
UPDATE counters SET counter_id = 'MWICON001146' WHERE ticker = 'ICON';
UPDATE counters SET counter_id = 'MWILLV010032' WHERE ticker = 'ILLVO';
UPDATE counters SET counter_id = 'MWMPI0010116' WHERE ticker = 'MPI';
UPDATE counters SET counter_id = 'MWNBM0010074' WHERE ticker = 'NBM';
UPDATE counters SET counter_id = 'MWNBS0010105' WHERE ticker = 'NBS';
UPDATE counters SET counter_id = 'MWNICO010014' WHERE ticker = 'NICO';
UPDATE counters SET counter_id = 'MWNITL010091' WHERE ticker = 'NITL';
UPDATE counters SET counter_id = 'ZAE000255360' WHERE ticker = 'OMU';
UPDATE counters SET counter_id = 'MWPCL0010053' WHERE ticker = 'PCL';
UPDATE counters SET counter_id = 'MWSTD0010041' WHERE ticker = 'STANDARD';
UPDATE counters SET counter_id = 'MWSTL0010085' WHERE ticker = 'SUNBIRD';
UPDATE counters SET counter_id = 'MWTNM0010126' WHERE ticker = 'TNM';


Step 1: Drop existing table (optional)
DROP TABLE IF EXISTS prices_daily;

CREATE TABLE IF NOT EXISTS prices_daily (
    counter_id BIGINT,
    counter TEXT,
    daily_range_high NUMERIC(15,2),
    daily_range_low NUMERIC(15,2),
    buy_price NUMERIC(15,2),
    sell_price NUMERIC(15,2),
    previous_closing_price NUMERIC(15,2),
    today_closing_price NUMERIC(15,2),
    volume_traded NUMERIC(15,2),
    dividend_mk NUMERIC(15,2),
    dividend_yield_pct NUMERIC(15,2),
    earnings_yield_pct NUMERIC(15,2),
    pe_ratio NUMERIC(15,2),
    pbv_ratio NUMERIC(15,2),
    market_capitalization_mkmn NUMERIC(20,2),
    profit_after_tax_mkmn NUMERIC(20,2),
    num_shares_issue NUMERIC(20,2),
    trade_date DATE,
    print_time TEXT,
    PRIMARY KEY (counter_id, trade_date)
);
## add variable column into table
ALTER TABLE prices_daily
ADD COLUMN sell_price NUMERIC(10,2);

# link whole csv file to sql database directly
COPY prices_daily FROM 'D:\\Documents\\AIMS_DSCBI_Training\\mse-api-assignment\\data\\combined_output_data\\combined_reports.csv' DELIMITER ',' CSV HEADER;

# link csv file to sql database (directly import specific variables)
\copy prices_daily(counter_id, counter, daily_range_high, daily_range_low, buy_price, sell_price, previous_closing_price, today_closing_price, volume_traded, dividend_mk, dividend_yield_pct, earnings_yield_pct, pe_ratio, pbv_ratio, market_capitalization_mkmn, profit_after_tax_mkmn, num_shares_issue, trade_date, print_time) FROM 'D:/Documents/AIMS_DSCBI_Training/mse-api-assignment/data combined_output_data/combined_reports.csv' CSV HEADER;


### change the decimal formats of figures
ALTER TABLE prices_daily
ALTER COLUMN column volume_traded TYPE NUMERIC(15,2);


In [4]:
import pandas as pd
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
from sqlalchemy import text

In [None]:

# # Read CSV 
df = pd.read_csv(r"D:\Documents\AIMS_DSCBI_Training\mse-api-assignment\data\misc\listing.csv")

# # Convert to datetime type (from DD-MM-YY to YYYY-MM-DD)
df['date_listed'] = pd.to_datetime(df['date_listed'], errors='coerce')
df['date_listed'] = df['date_listed'].dt.strftime('%Y-%m-%d')

# Create PostgreSQL connection
## Create SQLAlchemy engine (no password needed for local connections)
engine = create_engine("postgresql+psycopg2://postgres:admin@localhost:5432/mse_database")
# Write to SQL
mse_counters = df.to_sql("tickers", engine, if_exists="replace", index=False)
print("✅ Data successfully written to SQL table tickers")
print(mse_counters)

# Verify by querying the table
with engine.connect() as conn:
     result = conn.execute(text("SELECT * FROM tickers LIMIT 5;"))
     for row in result:
         print(row)
         print(result)

In [6]:
# read table from sql database as dataframe
df = pd.read_sql("SELECT * FROM tickers", engine)
print(df.head())

     counter_id   ticker                      name date_listed  listing_price
0  MWAIRT001156  AIRTELL         AIRTEL MALAWI PLC  2020-02-24          12.69
1  MWBHL0010029      BHL       BLANTYRE HOTELS PLC  1997-03-25           0.84
2  MWFDHB001166     FDHB              FDH BANK PLC  2020-08-03          10.00
3  MWFMB0010138    FMBCH  FMB CAPITAL HOLDINGS PLC  2017-09-18          45.01
4  MWICON001146     ICON       ICON PROPERTIES PLC  2019-01-21           8.75


In [None]:
# Classify sectors based on company names
# add a new column 'Sector' in the database (tickers) based on the company name

Sector = []
for x in df["name"].values:
    name = x.upper()  # Convert to uppercase for case-insensitive matching
    if re.findall("BANK", name):
        Sector.append("Banking and Financial")
    elif re.findall("HOTELS", name):
        Sector.append("Tourism and Hospitality")
    elif re.findall("HOLDING", name):
        Sector.append("Capital & Stock Market")
    elif re.findall("AIRTEL", name):
        Sector.append("Telecommunication")
    elif re.findall("SUGAR", name):
        Sector.append("Manufacturing Industry")
    elif re.findall("NETWORKS", name):
        Sector.append("Telecommunication")
    elif re.findall("PRESS", name):
        Sector.append("Media")
    elif re.findall("MUTUAL", name):
        Sector.append("Insurance")
    elif re.findall("PROPERTIES", name):
        Sector.append("Real Estate")
    elif re.findall("TOURISM", name):
        Sector.append("Tourism")
    else:
        Sector.append("Not Classified")  # Default value if no match found

df["Sector"] = Sector
df.head()



Unnamed: 0,counter_id,ticker,name,date_listed,listing_price,Sector
0,MWAIRT001156,AIRTELL,AIRTEL MALAWI PLC,2020-02-24,12.69,Telecommunication
1,MWBHL0010029,BHL,BLANTYRE HOTELS PLC,1997-03-25,0.84,Tourism and Hospitality
2,MWFDHB001166,FDHB,FDH BANK PLC,2020-08-03,10.0,Banking and Financial
3,MWFMB0010138,FMBCH,FMB CAPITAL HOLDINGS PLC,2017-09-18,45.01,Capital & Stock Market
4,MWICON001146,ICON,ICON PROPERTIES PLC,2019-01-21,8.75,Real Estate


In [26]:
with engine.connect() as conn:
    conn.execute(text("ALTER TABLE tickers ADD COLUMN IF NOT EXISTS sector TEXT;"))
    conn.commit()

with engine.begin() as conn:
    for _, row in df.iterrows():
        conn.execute(
            text("UPDATE tickers SET sector = :sector WHERE ticker = :ticker"),
            {"sector": row["Sector"], "ticker": row["ticker"]}
        )


In [None]:
# export the updated dataframe back to the sql table
df.to_sql("tickers", engine, if_exists="replace", index=False)

16

In [33]:
# Read CSV 
df = pd.read_csv(r"D:\Documents\AIMS_DSCBI_Training\mse-api-assignment\data\combined_output_data\combined_reports.csv")
# Convert to datetime type (from DD-MM-YY to YYYY-MM-DD)

headers_vars = [
    'counter_id', 'counter', 'daily_range_high', 'daily_range_low', 'buy_price',
    'sell_price', 'previous_closing_price', 'today_closing_price', 'volume_traded', 'dividend_mk',
    'dividend_yield_pct', 'earnings_yield_pct', 'pe_ratio', 'pbv_ratio',
    'market_capitalization_mkmn', 'profit_after_tax_mkmn', 'num_shares_issue',
    'trade_date', 'print_time']

df = df[headers_vars]

# Create PostgreSQL connection
## Create SQLAlchemy engine (no password needed for local connections)
engine = create_engine("postgresql+psycopg2://postgres:admin@localhost:5432/mse_database")

# Write to SQL
mse_dailyPrices = df.to_sql("daily_prices", engine, if_exists="replace", index=False)
print(mse_dailyPrices)
print("✅ Data successfully written to SQL table 'daily_prices'")


872
✅ Data successfully written to SQL table 'daily_prices'


# Week 3: API Development
** Implement all 3-5 required API endpoints
** Add input validation using Pydantic models
** Add query parameters for filtering

### create and activate a virtual environment 
** python -m venv venv
** .\venv\Scripts\activate
## install required packages
** pip install -r requirements.txt
## run py.script
** (venv) D:\Documents\AIMS_DSCBI_Training\mse-api-assignment>python -m uvicorn api_access:app --reload

In [None]:
# install required packages
import os
from typing import Optional, List
import pandas as pd
import psycopg2
from fastapi import FastAPI, Query, Path, HTTPException
from dotenv import load_dotenv
from pathlib import Path
import numpy as np
load_dotenv()
app = FastAPI()

In [None]:
# ==========================================================
# RETURNING DATA FROM POSTGRESQL MSE_DATABASE
# =========================================================

# =============================================================
# Load environment variables from .env file
# =============================================================
load_dotenv()
# Get database connection details from environment variables
PGHOST = os.getenv("PGHOST", "").strip()
PGPORT = os.getenv("PGPORT", "").strip()
PGPORT = int(''.join(filter(str.isdigit, PGPORT))) if PGPORT else 5432
PGDATABASE = os.getenv("PGDATABASE", "").strip()
PGUSER = os.getenv("PGUSER", "").strip()

# =============================================================
# HELPER FUNCTION TO CONNECT QUERY to sql database
# =============================================================
def run_query(sql: str, params: tuple = ()):
    conn = psycopg2.connect(
        host=PGHOST,
        port=PGPORT,
        dbname=PGDATABASE,
        user=PGUSER,  
    )
    try:
        df = pd.read_sql(sql, conn, params=params)
        df = df.replace({np.nan: None, np.inf: None, -np.inf: None}) # convert NaN to none
    finally:
        conn.close()
    return df.to_dict(orient = "records")

# =============================================================
# set ENDPOINTS and use app (fastAPI) to create a link form endpoints and convert sql database and retrieve API data
# ===========================================================================================

@app.get("/")
def Home():
    return {"message":"WELCOME TO MALAWI STOCK EXCHANGE DATABASE"}

@app.get("/companies")
def companies():
    sql = "SELECT * FROM tickers"
    return run_query(sql)

# Get companies by sector
@app.get("/companies/{sector}")
def get_companies(sector: str):
    """These are all company related data including counter_id, counter, listing price and Listing date
    To retrieve the sector related info, plase do the following....
    Company/sector=?"""

  # Get all API from tickers database
    sql = 'SELECT * FROM tickers WHERE LOWER("Sector") = %s'
    return run_query(sql, (sector.lower(),))
    
@app.get("/companies/{counter}")
def get_company_prices(counter: str):
    sql = """
    SELECT
        t.counter,
        t.name,
        t."Sector",
        t."Date Listed",
        COUNT(d.counter_id) AS price_entries,
        d.counter_id AS price_counter_id
    FROM tickers AS t
    LEFT JOIN Daily_prices AS d 
        ON t.counter_id = d.counter_id
    WHERE LOWER(t.counter) = LOWER(%s)
    GROUP BY
        t.counter,
        t.name,
        t."Sector",
        t."Date Listed"
    LIMIT 50;
    """
    return run_query(sql, (counter,))

@app.get("/daily_prices/{counter}")
def get_counter(counter: str):
    sql = """
    SELECT
        t.counter_id
        t.counter,
        t."daily_range_high",
        t."daily_range_low",
        t."buy_price",
        t."sell_price",
        t."previous_closing_price",
        t."today_closing_price",
        t."volume_traded",
        COUNT(d.counter_id) AS price_entries,
        d.counter_id AS price_counter_id
    FROM daily_prices AS t
    LEFT JOIN Daily_prices AS d 
        ON t.counter_id = d.counter_id
    WHERE LOWER(t.counter) = LOWER(%s)
    GROUP BY
        t.counter_id,
        t.counter,
    LIMIT 50;
    """
    return run_query(sql, (counter,))