<a href="https://colab.research.google.com/github/geethasinghekavini/-NextJS-Project/blob/main/ColomboStockExchange.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
import requests
import csv
from datetime import datetime

BASE = "https://www.cse.lk/api/"
CSV_OUT = f"cse_company_info_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

SESSION = requests.Session()
SESSION.headers.update({
    "Accept": "application/json, text/javascript, */*; q=0.01",
    "User-Agent": "Mozilla/5.0",
    "Origin": "https://www.cse.lk",
    "Referer": "https://www.cse.lk/",
})

SYMBOL_KEYS = ("symbol", "Symbol", "symbolCode", "tradingCode", "ticker")

def extract_symbols(obj):
    """
    Extract symbols from either:
      - a list of dicts/strings
      - a dict that *contains* such a list under some key (e.g., 'data', 'content', etc.)
    """
    symbols = set()

    def from_list(lst):
        for item in lst:
            if isinstance(item, dict):
                for k in SYMBOL_KEYS:
                    if k in item and item[k]:
                        symbols.add(str(item[k]))
                        break
            elif isinstance(item, str):
                # Some APIs return pure symbol strings
                symbols.add(item)

    if isinstance(obj, list):
        from_list(obj)
    elif isinstance(obj, dict):
        # If it’s a dict, search for the first list-like value(s)
        for v in obj.values():
            if isinstance(v, list):
                from_list(v)
            elif isinstance(v, dict):
                # nested dicts may contain lists too
                for vv in v.values():
                    if isinstance(vv, list):
                        from_list(vv)

    return sorted(symbols)

def get_symbols(letter):
    r = SESSION.post(BASE + "alphabetical", data={"alphabet": letter})
    if r.status_code != 200:
        print(f"[{letter}] HTTP {r.status_code}")
        return []
    try:
        data = r.json()
    except Exception as e:
        print(f"[{letter}] JSON error: {e}")
        return []

    syms = extract_symbols(data)
    if not syms:
        # Debug hint: show the top-level keys so you can see the shape
        if isinstance(data, dict):
            print(f"[{letter}] No symbols found. Top-level keys: {list(data.keys())}")
        else:
            print(f"[{letter}] No symbols found. Type: {type(data).__name__}")
    return syms

def get_company_info(symbol):
    r = SESSION.post(BASE + "companyInfoSummery", data={"symbol": symbol})
    if r.status_code != 200:
        print(f"{symbol}: HTTP {r.status_code}")
        return None
    try:
        data = r.json()
        if not isinstance(data, dict):
            data = {"raw": data}
        data.setdefault("symbol", symbol)
        return data
    except Exception as e:
        print(f"{symbol}: JSON error {e}")
        return None

def main():
    # 1) gather symbols A–Z
    all_symbols = []
    for L in LETTERS:
        syms = get_symbols(L)
        print(f"{L}: {len(syms)} symbols")
        all_symbols.extend(syms)
    all_symbols = sorted(set(all_symbols))
    print(f"Total unique symbols: {len(all_symbols)}")

    # 2) fetch company info
    rows, keys = [], set()
    for sym in all_symbols:
        info = get_company_info(sym)
        if info:
            rows.append(info)
            keys.update(info.keys())

    if not rows:
        print("No data fetched.")
        return

    # 3) write CSV
    fieldnames = ["symbol"] + sorted(k for k in keys if k != "symbol")
    with open(CSV_OUT, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
        w.writeheader()
        for row in rows:
            w.writerow(row)

    print(f"Wrote {len(rows)} rows → {CSV_OUT}")

if __name__ == "__main__":
    main()


A: 32 symbols
B: 14 symbols
C: 48 symbols
D: 10 symbols
E: 10 symbols
F: 2 symbols
G: 4 symbols
H: 25 symbols
I: 1 symbols
J: 7 symbols
K: 11 symbols
L: 27 symbols
M: 15 symbols
N: 7 symbols
O: 4 symbols
P: 11 symbols
[Q] No symbols found. Top-level keys: ['reqAlphabetical']
Q: 0 symbols
R: 16 symbols
S: 31 symbols
T: 19 symbols
U: 6 symbols
V: 5 symbols
W: 3 symbols
[X] No symbols found. Top-level keys: ['reqAlphabetical']
X: 0 symbols
Y: 1 symbols
[Z] No symbols found. Top-level keys: ['reqAlphabetical']
Z: 0 symbols
Total unique symbols: 309
Wrote 309 rows → cse_company_info_20250903_043140.csv


In [25]:
import requests
import csv
from datetime import datetime

# OPTIONAL: write an .xlsx alongside the CSV if pandas is available
try:
    import pandas as pd
    PANDAS_OK = True
except Exception:
    PANDAS_OK = False

BASE = "https://www.cse.lk/api/"
CSV_OUT = f"cse_company_info_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
XLSX_OUT = CSV_OUT.replace(".csv", ".xlsx")
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

SESSION = requests.Session()
SESSION.headers.update({
    "Accept": "application/json, text/javascript, */*; q=0.01",
    "User-Agent": "Mozilla/5.0",
    "Origin": "https://www.cse.lk",
    "Referer": "https://www.cse.lk/",
})

SYMBOL_KEYS = ("symbol", "Symbol", "symbolCode", "tradingCode", "ticker")


def flatten(obj, parent_key="", sep="."):
    """
    Flatten nested dict/list structures to a single dict of scalar values.
    - dicts -> key paths joined by `sep`
    - lists -> indexed keys like key.0, key.1, ...
    Scalars pass through unchanged.
    """
    items = []
    if isinstance(obj, dict):
        for k, v in obj.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else str(k)
            items.extend(flatten(v, new_key, sep=sep).items())
    elif isinstance(obj, list):
        for i, v in enumerate(obj):
            new_key = f"{parent_key}{sep}{i}" if parent_key else str(i)
            items.extend(flatten(v, new_key, sep=sep).items())
    else:
        items.append((parent_key if parent_key else "value", obj))
    return dict(items)


def extract_symbols(obj):
    """
    Extract symbols from either:
      - a list of dicts/strings
      - a dict that *contains* such a list under some key (e.g., 'data', 'content', etc.)
    """
    symbols = set()

    def from_list(lst):
        for item in lst:
            if isinstance(item, dict):
                for k in SYMBOL_KEYS:
                    if k in item and item[k]:
                        symbols.add(str(item[k]))
                        break
            elif isinstance(item, str):
                symbols.add(item)

    if isinstance(obj, list):
        from_list(obj)
    elif isinstance(obj, dict):
        for v in obj.values():
            if isinstance(v, list):
                from_list(v)
            elif isinstance(v, dict):
                for vv in v.values():
                    if isinstance(vv, list):
                        from_list(vv)

    return sorted(symbols)


def get_symbols(letter):
    r = SESSION.post(BASE + "alphabetical", data={"alphabet": letter})
    if r.status_code != 200:
        print(f"[{letter}] HTTP {r.status_code}")
        return []
    try:
        data = r.json()
    except Exception as e:
        print(f"[{letter}] JSON error: {e}")
        return []

    syms = extract_symbols(data)
    if not syms:
        if isinstance(data, dict):
            print(f"[{letter}] No symbols found. Top-level keys: {list(data.keys())}")
        else:
            print(f"[{letter}] No symbols found. Type: {type(data).__name__}")
    return syms


def get_company_info(symbol):
    r = SESSION.post(BASE + "companyInfoSummery", data={"symbol": symbol})
    if r.status_code != 200:
        print(f"{symbol}: HTTP {r.status_code}")
        return None
    try:
        data = r.json()
        if not isinstance(data, dict):
            data = {"raw": data}
        # guarantee symbol and a friendly name field exist
        data.setdefault("symbol", symbol)
        # some payloads may use different keys for name; try to unify
        if "name" not in data:
            for k in ("companyName", "CompanyName", "Name"):
                if k in data:
                    data["name"] = data[k]
                    break
        return data
    except Exception as e:
        print(f"{symbol}: JSON error {e}")
        return None


def main():
    # 1) gather symbols A–Z
    all_symbols = []
    for L in LETTERS:
        syms = get_symbols(L)
        print(f"{L}: {len(syms)} symbols")
        all_symbols.extend(syms)
    all_symbols = sorted(set(all_symbols))
    print(f"Total unique symbols: {len(all_symbols)}")

    # 2) fetch + flatten company info
    flat_rows = []
    for idx, sym in enumerate(all_symbols, 1):
        info = get_company_info(sym)
        if not info:
            continue
        flat = flatten(info)  # <— ensures no JSON blobs end up in a single cell
        # keep simple top-level 'symbol' & 'name' too (nice for Excel filters)
        flat.setdefault("symbol", info.get("symbol"))
        flat.setdefault("name", info.get("name"))
        flat_rows.append(flat)

        # progress line with name if available
        print(f"[{idx}/{len(all_symbols)}] {sym}  {info.get('name','')}")

    if not flat_rows:
        print("No data fetched.")
        return

    # 3) make a stable header across all rows
    all_keys = set()
    for r in flat_rows:
        all_keys.update(r.keys())
    # put the most useful columns first
    preferred = [k for k in ("symbol", "name", "lastTradedPrice", "closingPrice",
                             "marketCap", "turnover", "tdyTurnover", "tdyShareVolume")
                 if k in all_keys]
    other_cols = sorted(k for k in all_keys if k not in preferred)
    fieldnames = preferred + other_cols

    # 4) write CSV
    with open(CSV_OUT, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
        w.writeheader()
        for row in flat_rows:
            w.writerow(row)
    print(f" Wrote {len(flat_rows)} rows → {CSV_OUT}")

    # 5) optional: write XLSX if pandas is available
    if PANDAS_OK:
        df = pd.DataFrame(flat_rows, columns=fieldnames)
        df.to_excel(XLSX_OUT, index=False)
        print(f" Also wrote Excel file → {XLSX_OUT}")
    else:
        print(" pandas not installed — skipped .xlsx export (CSV is ready).")


if __name__ == "__main__":
    main()


A: 32 symbols
B: 14 symbols
C: 48 symbols
D: 10 symbols
E: 10 symbols
F: 2 symbols
G: 4 symbols
H: 25 symbols
I: 1 symbols
J: 7 symbols
K: 11 symbols
L: 27 symbols
M: 15 symbols
N: 7 symbols
O: 4 symbols
P: 11 symbols
[Q] No symbols found. Top-level keys: ['reqAlphabetical']
Q: 0 symbols
R: 16 symbols
S: 31 symbols
T: 19 symbols
U: 6 symbols
V: 5 symbols
W: 3 symbols
[X] No symbols found. Top-level keys: ['reqAlphabetical']
X: 0 symbols
Y: 1 symbols
[Z] No symbols found. Top-level keys: ['reqAlphabetical']
Z: 0 symbols
Total unique symbols: 309
[1/309] AAF.N0000  
[2/309] AAF.P0000  
[3/309] AAIC.N0000  
[4/309] ABAN.N0000  
[5/309] ABL.N0000  
[6/309] ACAP.N0000  
[7/309] ACL.N0000  
[8/309] ACME.N0000  
[9/309] AEL.N0000  
[10/309] AFS.N0000  
[11/309] AFSL.N0000  
[12/309] AGAL.N0000  
[13/309] AGPL.N0000  
[14/309] AGST.N0000  
[15/309] AGST.X0000  
[16/309] AHPL.N0000  
[17/309] AHUN.N0000  
[18/309] AINS.N0000  
[19/309] ALHP.N0000  
[20/309] ALLI.N0000  
[21/309] ALUM.N0000  
[2

In [1]:
import requests, time, csv, json, ast
from datetime import datetime

BASE = "https://www.cse.lk/api/"
CSV_OUT = f"cse_company_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

SESSION = requests.Session()
SESSION.headers.update({
    "Accept": "application/json, text/javascript, */*; q=0.01",
    "User-Agent": "Mozilla/5.0",
    "Origin": "https://www.cse.lk",
    "Referer": "https://www.cse.lk/",
})

# -------------------------------------------------------------------
# Helpers for extracting nested or stringified JSON blocks
# -------------------------------------------------------------------
TARGET_HINTS = {
    "name","companyName","Name",
    "lastTradedPrice","closingPrice","previousClose",
    "tdyTurnover","turnover",
    "tdyShareVolume","shareVolume",
    "tdyTradeVolume","tradeVolume",
    "hiTrade","lowTrade",
    "marketCap","marketCapitalization","marketCapPercentage",
    "betaValueAspi","betaValuesAspi","betaAspi",
    "betaValueSL20","betaValuesSL20","betaSl20",
    "isin","symbol"
}

def try_parse_maybe_json(x):
    """If a value is a string that looks like JSON or a Python dict, parse it."""
    if not isinstance(x, str):
        return x
    s = x.strip()
    if not s:
        return x
    # JSON-style
    if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
        try:
            return json.loads(s)
        except Exception:
            pass
    # Python literal-style (what we saw in your earlier CSV)
    try:
        return ast.literal_eval(s)
    except Exception:
        return x

def deep_find_detail_block(obj):
    """
    Walk the structure, return the FIRST dict that contains our target keys.
    Auto-parses stringified dicts along the way.
    """
    obj = try_parse_maybe_json(obj)

    if isinstance(obj, dict):
        if any(k in obj for k in TARGET_HINTS):
            return obj
        for v in obj.values():
            found = deep_find_detail_block(v)
            if isinstance(found, dict):
                return found

    elif isinstance(obj, list):
        for it in obj:
            found = deep_find_detail_block(it)
            if isinstance(found, dict):
                return found

    return None

# -------------------------------------------------------------------
# API calls
# -------------------------------------------------------------------
def extract_symbols(payload):
    """Get symbols from /alphabetical response."""
    out = set()
    if isinstance(payload, list):
        for it in payload:
            if isinstance(it, dict) and "symbol" in it:
                out.add(it["symbol"])
            elif isinstance(it, str):
                out.add(it)
    elif isinstance(payload, dict):
        for v in payload.values():
            if isinstance(v, list):
                for it in v:
                    if isinstance(it, dict) and "symbol" in it:
                        out.add(it["symbol"])
    return sorted(out)

def get_symbols(letter):
    r = SESSION.post(BASE + "alphabetical", data={"alphabet": letter})
    if r.status_code != 200:
        print(f"[{letter}] HTTP {r.status_code}")
        return []
    try:
        data = r.json()
    except Exception:
        print(f"[{letter}] JSON parse error")
        return []
    syms = extract_symbols(data)
    print(f"{letter}: {len(syms)} symbols")
    return syms

def get_company_summary(symbol):
    r = SESSION.post(BASE + "companyInfoSummery", data={"symbol": symbol})
    if r.status_code != 200:
        print(f"{symbol}: HTTP {r.status_code}")
        return None
    try:
        payload = r.json()
    except Exception:
        print(f"{symbol}: JSON parse error")
        return None

    # Dive into nested or stringified structures
    data = deep_find_detail_block(payload) or (payload if isinstance(payload, dict) else None)
    if not isinstance(data, dict):
        return {"symbol": symbol}

    # unify name
    name = data.get("name") or data.get("companyName") or data.get("Name")

    return {
        "name": name,
        "symbol": data.get("symbol", symbol),
        "isin": data.get("isin"),

        "lastTradedPrice": data.get("lastTradedPrice"),
        "previousClose": data.get("previousClose"),

        "turnover": data.get("tdyTurnover") or data.get("turnover"),
        "shareVolume": data.get("tdyShareVolume") or data.get("shareVolume"),
        "tradeVolume": data.get("tdyTradeVolume") or data.get("tradeVolume"),

        "dayHigh": data.get("hiTrade"),
        "dayLow": data.get("lowTrade"),

        "marketCap": data.get("marketCap") or data.get("marketCapitalization"),
        "marketCapPercentage": data.get("marketCapPercentage"),
    }

# -------------------------------------------------------------------
# Main
# -------------------------------------------------------------------
def main():
    # 1) Gather all symbols A–Z
    symbols = []
    for L in LETTERS:
        symbols.extend(get_symbols(L))
    symbols = sorted(set(symbols))
    print(f"✅ Total symbols found: {len(symbols)}")

    # 2) Fetch summaries
    rows = []
    for i, sym in enumerate(symbols, 1):
        info = get_company_summary(sym)
        if info:
            rows.append(info)
        if i % 20 == 0:
            print(f"   …fetched {i}/{len(symbols)}")

    if not rows:
        print("❌ No data fetched.")
        return

    # 3) Save to CSV
    cols = ["name","symbol","isin","lastTradedPrice","previousClose",
            "turnover","shareVolume","tradeVolume","dayHigh","dayLow",
            "marketCap","marketCapPercentage"]

    with open(CSV_OUT, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=cols)
        w.writeheader()
        w.writerows(rows)

    print(f"🎉 Done! Wrote {len(rows)} rows → {CSV_OUT}")

if __name__ == "__main__":
    main()


A: 32 symbols
B: 14 symbols
C: 48 symbols
D: 10 symbols
E: 10 symbols
F: 2 symbols
G: 4 symbols
H: 25 symbols
I: 1 symbols
J: 7 symbols
K: 11 symbols
L: 27 symbols
M: 15 symbols
N: 7 symbols
O: 4 symbols
P: 11 symbols
Q: 0 symbols
R: 16 symbols
S: 31 symbols
T: 19 symbols
U: 6 symbols
V: 5 symbols
W: 3 symbols
X: 0 symbols
Y: 1 symbols
Z: 0 symbols
✅ Total symbols found: 309
   …fetched 20/309
   …fetched 40/309
   …fetched 60/309
   …fetched 80/309
   …fetched 100/309
   …fetched 120/309
   …fetched 140/309
   …fetched 160/309
   …fetched 180/309
   …fetched 200/309
   …fetched 220/309
   …fetched 240/309
   …fetched 260/309
   …fetched 280/309
   …fetched 300/309
🎉 Done! Wrote 309 rows → cse_company_summary_20250904_034334.csv
