# Fetch Companies Listed on PSX

In [21]:
from dataclasses import dataclass
from time import perf_counter
from typing import Optional

import requests
from bs4 import BeautifulSoup

## Config

In [22]:
PSX_COMPANY_LISTINGS_URL = "https://www.psx.com.pk/psx/resources-and-tools/listings/listed-companies"
SECTOR_CONTAINER_CLASS = "notice-update-div"

## Fetch Sectors

In [49]:
@dataclass
class Sector:
    id: str
    name: str


@dataclass
class ListedCompany:
    sector_id: str
    symbol: str
    name: str


@dataclass
class AddressBook:
    company_symbol: str
    company_name: str
    representative: str
    designation: str
    address: str
    phone: str
    phone2: str
    fax: str
    date_of_listing: str
    email: str
    url: str
    registrar: str

In [24]:
def http_get_sync(
        session: requests.Session, 
        url: str, 
        headers: Optional[dict[str, str]] = None, 
        params: Optional[dict[str, str]] = None, 
    ) -> requests.Response:
    try:
        response = session.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response
    except requests.exceptions.RequestException as err:
        raise ValueError(f"Error! Could not fetch the page from {url}.") from err


def fetch_page_content(response: requests.Response, parser="lxml") -> BeautifulSoup:
    return BeautifulSoup(response.text, parser)


def get_element_by_class(soup: BeautifulSoup, class_name: str) -> BeautifulSoup:
    element = soup.find(class_=class_name)
    if element is None:
        raise ValueError(f"Class '{class_name}' not found in the provided HTML.")
    return element


def get_xid(soup: BeautifulSoup) -> str:
    xid_container = soup.find("input", attrs={"id": "XID"})
    if not xid_container:
        raise ValueError("Error! Could not find the 'XID' input field.")

    xid = xid_container.get("value")
    if not xid:
        raise ValueError("Error! 'XID' input field is present but has no value.")

    return xid


def extract_sectors(sector_container: BeautifulSoup) -> list[Sector]:
    sector_options = sector_container.find("select", attrs={"name": "sector"})
    if not sector_options:
        raise ValueError("Error! Could not find the sector options.")

    sectors = sector_options.find_all("option")
    if not sectors:
        raise ValueError("Error! There aren't any sectors in the sector options.")

    sectors = [
        Sector(id=sector.get("value"), name=sector.text.strip())
        for sector in sectors
        if sector.get("value") != "0"
    ]
    return sectors

In [5]:
with requests.Session() as session:
    soup = fetch_page_content(http_get_sync(session, url=PSX_COMPANY_LISTINGS_URL))

In [6]:
sector_container = get_element_by_class(soup, class_name=SECTOR_CONTAINER_CLASS)
xid = get_xid(sector_container)
sectors = extract_sectors(sector_container)

## Fetch Companies

In [None]:
SEARCH_COMPANY_URL = "https://www.psx.com.pk/psx/custom-templates/companiesSearch-sector"

HEADERS = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "application/json",
    "X-Requested-With": "XMLHttpRequest",
    "Referer": PSX_COMPANY_LISTINGS_URL,
}

In [8]:
def fetch_listed_companies(
    session: requests.Session,
    sector: Sector,
    search_url: str,
    headers: dict[str, str],
    xid: str,
) -> list[ListedCompany]:
    params = {"sector": sector.id, "XID": xid}
    response = http_get_sync(session, search_url, headers, params)
    companies_list = response.json()
    return [
        ListedCompany(sector_id=sector.id, symbol=company["symbol_code"], name=company["company_name"])
        for company in companies_list
    ]
    
    

In [None]:
companies = []
with requests.Session() as session:
    for sector in sectors:
        companies.extend(fetch_listed_companies(session, sector, SEARCH_COMPANY_URL, HEADERS, xid))

## Fetch Addresses

In [26]:
SEARCH_COMPANY_ADDRESS_URL = "https://www.psx.com.pk/psx/resources-and-tools/Address-Book"

In [42]:
def extract_address_book(address_details: BeautifulSoup, company_symbol: str) -> AddressBook:
    rows = address_details.find_all("tr")

    details = {}
    for row in rows:
        cols = row.find_all("td")
        if len(cols) == 2:
            key = cols[0].get_text(strip=True)
            value = cols[1].get_text(strip=True)
            details[key] = value

    return AddressBook(
        company_symbol=company_symbol,
        company_name=details.get("Company", ""),
        representative=details.get("Representative", ""),
        designation=details.get("Designation", ""),
        address=details.get("Address", ""),
        phone=details.get("Phone", ""),
        phone2=details.get("Phone 2", ""),
        fax=details.get("Fax", ""),
        date_of_listing=details.get("Date of Listing", ""),
        email=details.get("Email", ""),
        url=details.get("URL", ""),
        registrar=details.get("Registrar", ""),
    )

In [50]:
address_books = []
with requests.Session() as session:
    for company in companies:
        params = {"adrress": company.symbol, "XID": xid}
        address_details = fetch_page_content(
            http_get_sync(session, url=SEARCH_COMPANY_ADDRESS_URL, headers=HEADERS, params=params)
        )
        address_books.extend([extract_address_book(address_details, company.symbol)])

In [None]:
start_time = perf_counter()