In [9]:
from fileinput import filename
from typing import List, Optional
from pathlib import Path
import pandas as pd
import numpy as np
import pdfplumber
import re
import os
from datetime import datetime
import logging
import sys

In [5]:

DIR_DATA = Path.cwd().parents[0] / "data"
DIR_REPORTS_PDF = DIR_DATA / "mse-daily-reports"
DIR_REPORTS_CSV = DIR_DATA / "mse-daily-data"

FILE_PDF = DIR_REPORTS_PDF / "mse-daily-09-05-2025.pdf"
FILE_CSV = DIR_REPORTS_CSV / "mse-daily-09-05-2025.csv"

In [35]:
import re
from datetime import date, time, datetime
from pathlib import Path
import pdfplumber

# Month map (handles "Sep" and "Sept")
_MONTHS = {
    'jan':1,'january':1,'feb':2,'february':2,'mar':3,'march':3,'apr':4,'april':4,
    'may':5,'jun':6,'june':6,'jul':7,'july':7,'aug':8,'august':8,
    'sep':9,'sept':9,'september':9,'oct':10,'october':10,
    'nov':11,'november':11,'dec':12,'december':12
}

def _mkdate(y, m, d):  # y,m,d may be str
    return date(int(y), int(m), int(d))

def _norm_text(s: str) -> str:
    return re.sub(r'\s+', ' ', s or '').strip()

def _parse_date_str(s: str, day_first: bool = True):
    """Parse a date from free text. Returns datetime.date or None."""
    s = _norm_text(s)

    # 1) 5 September 2025 | 05 Sep 2025 | 5 Sept, 2025 | 5th September 2025
    m = re.search(r'(?i)\b(\d{1,2})(?:st|nd|rd|th)?\s+([A-Za-z]{3,9}),?\s+(20\d{2})\b', s)
    if m:
        d, mon, y = m.groups()
        mon_num = _MONTHS.get(mon.lower())
        if mon_num:
            return _mkdate(y, mon_num, d)

    # 2) September 5, 2025 | Sep 05 2025 | Sept 5th 2025
    m = re.search(r'(?i)\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?,?\s+(20\d{2})\b', s)
    if m:
        mon, d, y = m.groups()
        mon_num = _MONTHS.get(mon.lower())
        if mon_num:
            return _mkdate(y, mon_num, d)

    # 3) ISO-like: 2025-09-05 / 2025/09/05 / 2025.09.05
    m = re.search(r'\b(20\d{2})[-/.](\d{1,2})[-/.](\d{1,2})\b', s)
    if m:
        y, mth, d = m.groups()
        try: return _mkdate(y, mth, d)
        except ValueError: pass

    # 4) Numeric: 05-09-2025 | 05/09/2025 | 5.9.2025
    m = re.search(r'\b(\d{1,2})[-/.](\d{1,2})[-/.](20\d{2})\b', s)
    if m:
        a, b, y = m.groups()
        # day-first by default (MSE style)
        d, mth = (a, b) if day_first else (b, a)
        try: return _mkdate(y, mth, d)
        except ValueError: pass

    return None

def _parse_time_str(s: str):
    """Parse a time from free text. Returns datetime.time or None."""
    s = _norm_text(s)

    # 12-hour with seconds or without (e.g., 02:39:52 pm, 2:39 pm)
    m = re.search(r'(?i)\b(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(am|pm)\b', s)
    if m:
        hh, mm, ss, ap = m.groups()
        hh, mm, ss = int(hh), int(mm), int(ss or 0)
        ap = ap.lower()
        if hh == 12: hh = 0
        if ap == 'pm': hh += 12
        try: return time(hh, mm, ss)
        except ValueError: return None

    # 24-hour with optional seconds (e.g., 14:39:52 or 14:39)
    m = re.search(r'\b([01]?\d|2[0-3]):([0-5]\d)(?::([0-5]\d))\b', s)
    if m:
        hh, mm, ss = map(int, m.groups())
        try: return time(hh, mm, ss)
        except ValueError: return None

    m = re.search(r'\b([01]?\d|2[0-3]):([0-5]\d)\b', s)
    if m:
        hh, mm = map(int, m.groups())
        try: return time(hh, mm)
        except ValueError: return None

    return None

def extract_print_date_time(pdf_path: str | Path, search_pages: int = 2, day_first: bool = True):
    """
    Extract ONLY the 'Print Date' and 'Print Time' from the PDF text.

    Returns
    -------
    {
      'date': datetime.date | None,
      'time': datetime.time | None,
      'raw_date': str | None,  # snippet matched after the label (if any)
      'raw_time': str | None
    }
    """
    pdf_path = Path(pdf_path)
    raw_date_snip = raw_time_snip = None
    text = ""

    with pdfplumber.open(pdf_path) as pdf:
        n = min(max(search_pages, 1), len(pdf.pages))
        # Concatenate small chunks (keeps label context)
        page_texts = []
        for i in range(n):
            page_texts.append(pdf.pages[i].extract_text() or "")
        text = "\n".join(page_texts)

    # Prefer labeled fields
    m = re.search(r'(?is)Print\s*Date\s*:?\s*([^\n\r]+)', text)
    if m: raw_date_snip = m.group(1)
    m = re.search(r'(?is)Print\s*Time\s*:?\s*([^\n\r]+)', text)
    if m: raw_time_snip = m.group(1)

    d = _parse_date_str(raw_date_snip) if raw_date_snip else _parse_date_str(text)
    t = _parse_time_str(raw_time_snip) if raw_time_snip else _parse_time_str(text)

    return {'date': d, 'time': t, 'raw_date': (raw_date_snip or None), 'raw_time': (raw_time_snip or None)}

In [None]:
# --- Example usage ---
info = extract_print_date_time(FILE_PDF)
print("Date:", info['date'])
print("Time:", info['time'])


Date: 2025-09-05
Time: 14:39:52


In [26]:
def to_numeric_clean(val):
    """
    Clean and convert a value to numeric:
    - None/empty -> NaN
    - (123.45) -> -123.45
    - remove commas
    """
    if val is None:
        return np.nan
    val = str(val).strip()
    if val.lower() == "none" or val == "":
        return np.nan
    # Handle parentheses as negatives
    if val.startswith("(") and val.endswith(")"):
        val = "-" + val[1:-1]
    # Remove commas
    val = val.replace(",", "")
    try:
        return float(val)
    except ValueError:
        return np.nan

def clean_cell(x):
    if x is None:
        return None
    x = re.sub(r'\s+', ' ', str(x)).strip()
    x = x.replace('–', '-').replace('—', '-')
    return x if x else None

def is_numericish(s: Optional[str]) -> bool:
    if s is None:
        return False
    s = str(s).strip().replace(",", "")
    return bool(re.fullmatch(r"[-+]?(\d+(\.\d+)?|\.\d+)(%?)", s))

def is_header_like(row: list) -> bool:
    """Header-like = many text cells, few numeric cells."""
    cells = [c for c in row if c is not None and str(c).strip() != ""]
    if not cells:
        return False
    num_numeric = sum(1 for c in cells if is_numericish(c))
    num_alpha   = sum(1 for c in cells if re.search(r"[A-Za-z]", str(c)))
    return (num_alpha >= max(1, len(cells)//4)) and (num_numeric / len(cells) <= 0.5)

def normalize_to_width(rows: list[list], width: int) -> list[list]:
    out = []
    for r in rows:
        r = list(r)
        if len(r) < width:
            r = r + [None] * (width - len(r))
        elif len(r) > width:
            r = r[:width]
        out.append(r)
    return out

def extract_first_table(pdf_path: str | Path,
                        out_csv: Optional[str | Path] = None,
                        header: Optional[List[str]] = None,
                        skip_header_rows: int = 0,
                        auto_skip_header_like: bool = True) -> pd.DataFrame:
    """
    Extract the first table. If `header` is provided, we will:
      - optionally auto-skip any header-like rows at the top
      - then force DataFrame columns to `header`

    Parameters
    ----------
    pdf_path : str | Path
    out_csv : str | Path, optional
    header : List[str], optional
        Hardcoded column names to use.
    skip_header_rows : int
        Force skipping this many rows from the top of the table before data.
    auto_skip_header_like : bool
        If True, skip leading header-like rows automatically.

    Returns
    -------
    pandas.DataFrame
    """
    pdf_path = Path(pdf_path)
    out_csv  = Path(out_csv) if out_csv else None

    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            # Try a few strategies to find tables
            strategies = [
                dict(vertical_strategy="lines", horizontal_strategy="lines",
                     snap_tolerance=3, join_tolerance=3, edge_min_length=3),
                dict(vertical_strategy="lines_strict", horizontal_strategy="lines_strict"),
                dict(vertical_strategy="text", horizontal_strategy="text"),
            ]
            tables = []
            for ts in strategies:
                try:
                    t = page.extract_tables(table_settings=ts) or []
                    for raw in t:
                        if raw and len(raw) >= 2 and max(len(r) for r in raw) >= 2:
                            tables.append(raw)
                    if tables:
                        break
                except Exception:
                    continue

            if not tables:
                continue

            # Use the first table found
            raw = tables[0]
            rows = [[clean_cell(c) for c in row] for row in raw]
            rows = [r for r in rows if any(c for c in r)]
            if not rows:
                continue

            # Decide how many rows to skip from top if header is provided
            start_idx = 0
            if header:
                if auto_skip_header_like:
                    # Skip all consecutive header-like rows from the top
                    auto_skip = 0
                    for r in rows:
                        if is_header_like(r):
                            auto_skip += 1
                        else:
                            break
                    start_idx = auto_skip
                # Ensure at least skip_header_rows are skipped
                start_idx = max(start_idx, skip_header_rows)
                cols = list(header)
            else:
                # Fallback: auto-detect header = first non-empty row
                detected = rows[0]
                start_idx = 1
                cols = []
                seen = {}
                for i, name in enumerate(detected):
                    name = name or f"col_{i+1}"
                    name = re.sub(r'\s+', ' ', name).strip()
                    if name in seen:
                        seen[name] += 1
                        name = f"{name}_{seen[name]}"
                    else:
                        seen[name] = 1
                    cols.append(name)

            # Build DataFrame
            data_rows = normalize_to_width(rows[start_idx:], len(cols))
            df = pd.DataFrame(data_rows, columns=cols).dropna(how="all")

            # Drop last row as it contains weighted averages 
            df = df.iloc[:-1] if len(df) > 1 else df

            # Rearrange columns
            cols = ['counter', 'daily_range_high', 'daily_range_low', 
                    'buy', 'sell', 'previous_closing_price', 'today_closing_price',
                      'volume_traded', 'dividend_mk', 'dividend_yield_pct',
                      'earnings_yield_pct', 'pe_ratio', 'pbv_ratio', 'market_capitalization_mkmn',
                      'profit_after_tax_mkmn', 'num_shares_issue']
            df = df[cols]

            # Convert to numeric where possible
            for c in df.columns:
                if c != "counter":  # leave counter as string
                    df[c] = df[c].apply(to_numeric_clean)

            if out_csv:
                df.to_csv(out_csv, index=False)
                print(f"✅ First table extracted and saved to {out_csv}")
            return df

    print("⚠️ No table found in PDF.")
    return pd.DataFrame()

In [27]:
COLS = ['ser_no', 'daily_range_high', 'daily_range_low', 
         'counter', 'buy', 'sell', 'previous_closing_price', 
        'today_closing_price', 'volume_traded', 'dividend_mk', 'dividend_yield_pct',
        'earnings_yield_pct', 'pe_ratio', 'pbv_ratio', 'market_capitalization_mkmn',
        'profit_after_tax_mkmn', 'num_shares_issue']

In [28]:
# Run the function
df = extract_first_table(FILE_PDF, header=COLS)

In [30]:
def weighted_average(df: pd.DataFrame, value_col: str, weight_col: str) -> float:
    """
    Compute a weighted average for any numeric column in a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing the data.
    value_col : str
        Column name with the values of interest (e.g., 'dividend_mk', 'dividend_pct').
    weight_col : str
        Column name with the weights (e.g., 'volume_traded').

    Returns
    -------
    float
        Weighted average of the value_col, or NaN if weights sum to 0.
    """
    values = df[value_col].astype(float)
    weights = df[weight_col].astype(float)

    total_weight = weights.sum()
    if total_weight == 0:
        return np.nan

    return (values * weights).sum() / total_weight

In [31]:
df.dtypes

counter                        object
daily_range_high              float64
daily_range_low               float64
buy                           float64
sell                          float64
previous_closing_price        float64
today_closing_price           float64
volume_traded                 float64
dividend_mk                   float64
dividend_yield_pct            float64
earnings_yield_pct            float64
pe_ratio                      float64
pbv_ratio                     float64
market_capitalization_mkmn    float64
profit_after_tax_mkmn         float64
num_shares_issue              float64
dtype: object

In [32]:
cols_weighted = ['dividend_mk', 'dividend_yield_pct', 'earnings_yield_pct',
                 'pe_ratio', 'pbv_ratio', 'market_capitalization_mkmn',
                 'profit_after_tax_mkmn', 'num_shares_issue']

for col in cols_weighted:
    wa = weighted_average(df, col, 'volume_traded')
    print(f"Weighted average of {col}: {wa:.4f}")

Weighted average of dividend_mk: 8.2070
Weighted average of dividend_yield_pct: 0.3297
Weighted average of earnings_yield_pct: 2.9126
Weighted average of pe_ratio: 40.0991
Weighted average of pbv_ratio: 12.7717
Weighted average of market_capitalization_mkmn: 1119128.5596
Weighted average of profit_after_tax_mkmn: 27990.2145
Weighted average of num_shares_issue: 9210336168.6222


In [33]:
df

Unnamed: 0,counter,daily_range_high,daily_range_low,buy,sell,previous_closing_price,today_closing_price,volume_traded,dividend_mk,dividend_yield_pct,earnings_yield_pct,pe_ratio,pbv_ratio,market_capitalization_mkmn,profit_after_tax_mkmn,num_shares_issue
0,AIRTEL,138.83,138.82,0.0,138.83,138.84,138.82,53640.0,2.0,1.44,2.8,35.74,47.55,1527020.0,42722.11,11000000000.0
1,BHL,,,15.01,0.0,15.0,15.0,0.0,0.0,0.0,-1.55,-64.36,1.36,88173.82,-1370.11,5878255000.0
2,FDHB,619.93,619.87,619.84,619.87,619.97,619.9,133897.0,4.73,0.76,1.73,57.76,43.92,4277949.27,74063.0,6901031000.0
3,FMBCH,1650.41,1650.4,1650.41,0.0,1650.4,1650.41,8900.0,3.64,0.22,2.91,34.31,12.33,4057120.38,118254.74,2458250000.0
4,ICON,17.95,17.95,0.0,17.95,17.95,17.95,42822.0,0.29,1.62,20.37,4.91,0.82,119906.0,24424.49,6680000000.0
5,ILLOVO,1791.42,1791.42,1791.42,0.0,1791.41,1791.42,1440.0,5.0,0.28,1.77,56.47,8.59,1278078.55,22632.0,713444400.0
6,MPICO,,,19.51,0.0,19.51,19.51,0.0,0.43,2.2,19.05,5.25,0.69,44834.91,8540.17,2298047000.0
7,NBM,8200.0,7919.95,7919.95,7919.94,7919.93,8196.77,122414.0,126.35,1.54,2.67,37.42,14.26,3827332.06,102283.0,466931700.0
8,NBS,1024.89,1024.83,1024.84,1024.85,1024.91,1024.86,296586.0,10.9,1.06,2.45,40.87,26.61,2982930.21,72991.0,2910573000.0
9,NICO,1740.0,1739.99,1514.17,1740.0,1740.01,1740.0,106674.0,20.0,1.15,3.97,25.2,11.66,1814891.51,72009.0,1043041000.0


In [17]:
int(df.volume_traded.sum())

2769006

In [None]:
cols = ['ser_no', 'daily_range_high', 'daily_range_low', 
        'counter', 'buy', 'sell', 'previous_closing_price', 
        'today_closing_price', 'volume_traded', 'dividend_mk', 'dividend_yield_pct',
        'earnings_yield_pct', 'pe_ratio', 'pbv_ratio', 'market_capitalization_mkmn',
        'profit_after_tax_mkmn', 'num_shares_issue']

In [None]:
# Rename the DataFrame columns to use our predefined column names
# First, let's ensure we're using the right DataFrame and columns
# Our dataframe 'df' already exists, so we'll use it and rename columns to match 'cols'

# Check if columns match in length before renaming
if len(df.columns) == len(cols):
    df.columns = cols
else:
    # Handle the case where column counts don't match
    print(f"Column count mismatch: df has {len(df.columns)} columns, cols has {len(cols)} columns")
    # Assign columns up to the minimum length to avoid errors
    min_length = min(len(df.columns), len(cols))
    df.columns = cols[:min_length] + list(df.columns[min_length:]) if len(df.columns) > min_length else cols
    
# Preview the DataFrame with new column names
print("DataFrame with renamed columns:")
print(df.head())

In [None]:
print(df.head())

In [None]:
df

In [None]:
cols = 

In [None]:
# Extract the first table from the uploaded PDF and save it to CSV

import pdfplumber
import pandas as pd
import re
from pathlib import Path
# from caas_jupyter_tools import display_dataframe_to_user


def clean_cell(x):
    if x is None:
        return None
    # Normalize whitespace and remove stray newlines
    x = re.sub(r'\s+', ' ', str(x)).strip()
    # Replace weird unicode minus or similar artifacts if any
    x = x.replace('–', '-').replace('—', '-')
    return x if x != '' else None

def header_from_rows(rows):
    """
    Pick the first row that looks like a header (most non-empty cells).
    Return (header, start_index_for_data)
    """
    best_idx, best_count = None, -1
    for i, r in enumerate(rows[:5]):  # look at the first few rows
        non_empty = sum(1 for c in r if c is not None and str(c).strip() != '')
        if non_empty > best_count:
            best_count = non_empty
            best_idx = i
    return rows[best_idx], best_idx + 1

first_table_df = None



In [None]:
# Run the function
df = extract_first_table(pdf_path, out_csv)

In [None]:
with pdfplumber.open(FILE_PDF) as pdf:
    for page in pdf.pages:
        # Try multiple strategies to improve table detection
        strategies = [
            dict(vertical_strategy="lines", horizontal_strategy="lines"),
            dict(vertical_strategy="lines_strict", horizontal_strategy="lines_strict"),
            dict(vertical_strategy="text", horizontal_strategy="text"),
        ]
        tables = []
        for ts in strategies:
            try:
                t = page.extract_tables(table_settings=ts)
                if t:
                    tables.extend(t)
            except Exception as e:
                # Continue trying with other strategies
                pass
        if tables:
            # Use the first detected table on the first page that has any tables
            raw = tables[0]
            # Clean cells
            rows = [[clean_cell(c) for c in row] for row in raw]
            # Drop completely empty rows
            rows = [row for row in rows if any(cell is not None for cell in row)]
            if not rows:
                continue
            header, start_idx = header_from_rows(rows)
            # If header has duplicates or Nones, generate generic names
            cols = []
            seen = {}
            for i, name in enumerate(header):
                name = name or f"col_{i+1}"
                name = re.sub(r'\s+', ' ', name).strip()
                if name in seen:
                    seen[name] += 1
                    name = f"{name}_{seen[name]}"
                else:
                    seen[name] = 1
                cols.append(name)
            data = rows[start_idx:]
            # Normalize row lengths to header length
            norm_data = []
            for r in data:
                if len(r) < len(cols):
                    r = r + [None] * (len(cols) - len(r))
                elif len(r) > len(cols):
                    r = r[:len(cols)]
                norm_data.append(r)
            df = pd.DataFrame(norm_data, columns=cols)
            # Drop rows that are entirely NaN
            df = df.dropna(how="all")
            # Keep a copy of the first table only
            first_table_df = df
            break

# If we didn't manage to extract a table, create an empty placeholder DataFrame
if first_table_df is None:
    first_table_df = pd.DataFrame()


In [None]:
headers 

In [None]:
first_table_df.shape