## Build Seed Data

Use this notebook to build initial seed data for populating DB.

Requires:
- `hansards-dataproc` repo, with tabulated CSVs
- `hansards-analysis` repo

Run on `hansard-env` kernel

Outputs:
- Meetings Calendar: `df_meetings` to `seed\meetings.parquet` for ParliamentaryCycles
- Sittings: `df_sittings` for Sitting
- Speeches: `df_all` to `seed\speeches.parquet` speech text and tokens
  - `df_sittings` to `seed\sittings.parquet` JSON-formatted speech for each Sitting

In [2]:
import json
import gzip
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
from tqdm import tqdm

import malaya
import nltk
import string
from collections import Counter
import random
import matplotlib.colors as mcolors
from bs4 import BeautifulSoup
import dateparser

from sklearn.feature_extraction.text import CountVectorizer
from malaya.text.function import get_stopwords

# from thefuzz import fuzz
# from thefuzz import process


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

Cannot import beam_search_ops from Tensorflow Addons, ['malaya.jawi_rumi.deep_model', 'malaya.phoneme.deep_model', 'malaya.rumi_jawi.deep_model', 'malaya.stem.deep_model'] will not available to use, make sure Tensorflow Addons version >= 0.12.0
check compatible Tensorflow version with Tensorflow Addons at https://github.com/tensorflow/addons/releases


In [3]:
ANALYSIS_REPO = Path("/Users/alifaiman/Documents/Work/hansards/hansards-analysis")
DATAPROC_REPO = Path("/Users/alifaiman/Documents/Work/hansards/hansards-dataproc")

def process_line_breaks(speech):
    """Processes stray line breaks, while detecting and preserving valid paragraph breaks."""
    if type(speech) is not str:
        return speech
    # List of sentence-ending punctuation
    punctuation = [".", "!", "?", "...", ":", ":-"]

    lines = speech.split("\n")
    processed_lines = []

    for i in range(len(lines)):
        # Check if next line starts with bullet point pattern eg (a), (b)
        is_next_bullet = i < len(lines) - 1 and lines[i + 1].startswith(("(", ") "))

        # Check if line is part of a markdown table
        is_markdown_table = re.match(r"\|\s*[^|]+\s*\|\s*[^|]+\s*\|", lines[i])

        # Check if line is the end of a markdown table
        if "<END_TABLE_MARKER>" in lines[i]:
            lines[i] = lines[i].replace("<END_TABLE_MARKER>", "\n\n")

        # If the line ends with a punctuation mark, is the last line, or next line starts with bullet point, append with newline
        if (
            i == len(lines) - 1
            or lines[i].endswith(tuple(punctuation))
            or is_next_bullet
        ):
            processed_lines.append(lines[i])
            if i != len(lines) - 1:  # Don't add a newline after the last line
                processed_lines.append("\n\n")
        elif is_markdown_table:
            # If line is part of a markdown table, append it with a newline
            processed_lines.append("\n" + lines[i])
        else:
            # Otherwise, append it with a space (to merge with the next line)
            processed_lines.append(lines[i] + " ")

    # Join the processed lines back
    return "".join(processed_lines)


def list_parsed_sittings(root_path):
    """Returns a list of result.csv paths across folders."""
    csv_paths = []
    for path in Path(root_path).rglob("result.csv"):
        if path.is_file() and "archive" not in str(path):
            csv_paths.append(path)
    return csv_paths


def read_tabulated(csv_paths, house):
    """Read parsed and tabulated CSV files.

    Processing applied:
    - Process and fix line breaks
    - Replace table markers with double newlines
    - [TEMP] Fill NaN author with empty string
    - Fill empty
    - Tag annotations with 'is_annotation' column

    Returns:
    - DataFrame of all speeches in csv_paths
    New columns:
    - 'proc_speech': Processed speech text
    - 'house': House of the sitting
    - 'is_annotation': Boolean column to tag annotations
    """
    all_data = []
    for path in csv_paths:
        try:
            df_read = pd.read_csv(path, converters={"timestamp": str})
        except:
            print(f"Error reading {path}")
            continue
        df_read["date"] = Path(path).parent.stem
        #         df_read['date'] = Path(path).stem[3:]
        df_read.reset_index(inplace=True)
        all_data.append(df_read)
    df_all = pd.concat(all_data).reset_index(drop=True)
    df_all["length"] = df_all.speech.str.split().str.len()
    df_all["date"] = pd.to_datetime(df_all.date)
    df_all["proc_speech"] = df_all.speech.map(process_line_breaks)
    # TEMP
    df_all["author"] = df_all["author"].fillna("")

    df_all.level_1 = df_all.level_1.fillna("")
    df_all.level_2 = df_all.level_2.fillna("")
    df_all.level_3 = df_all.level_3.fillna("")

    # remove line breaks from all level headings
    df_all.level_1 = df_all.level_1.str.replace("\n", " ").str.strip()
    df_all.level_2 = df_all.level_2.str.replace("\n", " ").str.strip()
    df_all.level_3 = df_all.level_3.str.replace("\n", " ").str.strip()

    df_all["house"] = house

    # create new is_annotation column - to keep track of authored speeches that are pure annotations
    df_all["is_annotation"] = df_all.author.apply(
        lambda speech: True if speech == "ANNOTATION" else False
    )
    return df_all


def merge_authored_annotations(df_all):
    # fix NaN speeches - merge ANNOTATION rows with preceding row with NaN speaker/author
    df_all = df_all.reset_index(drop=True)
    index_to_remove = []
    rows_to_fill = []

    for index, row in df_all.iterrows():
        if pd.isna(row["proc_speech"]):
            index_to_remove.append(index + 1)
            next_index = index + 1

            if (
                next_index < len(df_all)
                and df_all.at[next_index, "author"] == "ANNOTATION"
            ):
                # print(f"Plan to fill {index} with {df_all.at[next_index, 'proc_speech']}")
                rows_to_fill.append(
                    (index, next_index, df_all.at[next_index, "proc_speech"])
                )

    # Fill the 'speech' column for the selected rows
    for fill_index, next_index, speech_value in rows_to_fill:
        df_all.at[fill_index, "proc_speech"] = speech_value
        df_all.at[fill_index, "is_annotation"] = True

    print(f"before drop: {df_all.shape[0]}")
    df_all = df_all.drop(index_to_remove)
    print(f"after drop: {df_all.shape[0]}")
    return df_all


# for speech data
def add_to_result(levels, data, result):
    # If no heading, append data directly to result
    if not any(levels):
        result.append(data)
        return

    # For cases with headings
    current_level = result
    for level in levels:
        if not level:  # Skip empty levels
            continue

        # Check if current level has the heading
        found = False
        for item in current_level:
            if level in item:
                current_level = item[level]
                found = True
                break

        # If not found, create new entry
        if not found:
            new_entry = {level: []}
            current_level.append(new_entry)
            current_level = new_entry[level]

    # Append the data
    current_level.append(data)


def speeches_to_json(df_sitting):
    result = []
    for index, row in df_sitting.iterrows():
        speech_dict = {
            "speech": row["proc_speech"],
            "author": row["author"],
            "timestamp": row["timestamp"],
            "is_annotation": row["is_annotation"],
            "index": row["index"],
        }

        # Create a list with levels
        levels = [
            level
            for level in [row["level_1"], row["level_2"], row["level_3"]]
            if pd.notna(level)
        ]

        # Add to result
        add_to_result(levels, speech_dict, result)
    return result

In [4]:
# NLP INITIALIZATION BLOCK - COPY (Remember to update this block!) - For tokenization
stopwords_mhc = [
    "ada",
    "adakah",
    "adakan",
    "adalah",
    "adanya",
    "adapun",
    "agak",
    "agar",
    "akan",
    "akhir",
    "aku",
    "akulah",
    "akupun",
    "al",
    "alangkah",
    "allah",
    "amat",
    "antara",
    "antaramu",
    "antaranya",
    "apa",
    "apa-apa",
    "apabila",
    "apakah",
    "apapun",
    "atas",
    "atasmu",
    "atasnya",
    "atau",
    "ataukah",
    "ataupun",
    "bagai",
    "bagaimana",
    "bagaimanakah",
    "bagaimanapun",
    "bagi",
    "bagimu",
    "baginya",
    "bagitu",
    "bahawa",
    "bahkan",
    "bahwa",
    "banyak",
    "banyaknya",
    "barangkali",
    "barangsiapa",
    "bawah",
    "beberapa",
    "begitu",
    "begitupun",
    "belaka",
    "beliau",
    "belum",
    "belumkah",
    "ber",
    "berada",
    "berapa",
    "berhormat",
    "berikan",
    "berikut",
    "berkaitan",
    "berkenaan",
    "berupa",
    "beserta",
    "biarpun",
    "bila",
    "bilakah",
    "bilamana",
    "bilangan",
    "bin",
    "binti",
    "bisa",
    "boleh",
    "bukan",
    "bukankah",
    "bukanlah",
    "che",
    "chuma",
    "cuma",
    "dah",
    "dahulu",
    "dalam",
    "dalamnya",
    "dan",
    "dapat",
    "dapati",
    "dapatkah",
    "dapatlah",
    "dari",
    "daripada",
    "daripadaku",
    "daripadamu",
    "daripadanya",
    "dato",
    "datuk",
    "demi",
    "demikian",
    "demikianlah",
    "dengan",
    "dengannya",
    "di",
    "dia",
    "dialah",
    "didapat",
    "didapati",
    "dimanakah",
    "dua",
    "empat",
    "enam",
    "enche",
    "engkau",
    "engkaukah",
    "engkaulah",
    "engkaupun",
    "hai",
    "hajah",
    "haji",
    "hal",
    "hampir",
    "sebagai",
    "hampir-hampir",
    "hanya",
    "hanyalah",
    "harus",
    "hendak",
    "hendaklah",
    "hingga",
    "ia",
    "iaitu",
    "ialah",
    "ianya",
    "ii",
    "ingin",
    "inginkah",
    "ini",
    "inikah",
    "inilah",
    "itu",
    "itukah",
    "itulah",
    "izin",
    "jadi",
    "jangan",
    "janganlah",
    "jika",
    "jikalau",
    "jua",
    "juapun",
    "juga",
    "jumlah",
    "ka",
    "kadang",
    "kah",
    "kalangan",
    "kalau",
    "kali",
    "kami",
    "kamikah",
    "kamipun",
    "kamu",
    "sentiasa",
    "kamukah",
    "kamupun",
    "kan",
    "kapada",
    "katakan",
    "ke",
    "kedua",
    "kemudian",
    "kenapa",
    "kepada",
    "kerajaan",
    "kerana",
    "ketawa",
    "ketiga",
    "ketika",
    "khusus",
    "kini",
    "kita",
    "ku",
    "kurang",
    "lagi",
    "lah",
    "lain",
    "lalu",
    "lamanya",
    "langsung",
    "lebeh",
    "lebih",
    "lima",
    "macam",
    "macham",
    "maha",
    "mahu",
    "mahukah",
    "mahupun",
    "maka",
    "makin",
    "malah",
    "mana",
    "manakah",
    "manakala",
    "manapun",
    "maseh",
    "masih",
    "masing",
    "masing-masing",
    "md",
    "melainkan",
    "mem",
    "memang",
    "mempunyai",
    "men",
    "mendapat",
    "mendapati",
    "mendapatkan",
    "mengadakan",
    "mengapa",
    "mengapakah",
    "mengenai",
    "menjadi",
    "menyebabkan",
    "menyebabkannya",
    "mereka",
    "merekalah",
    "merekapun",
    "meskipun",
    "mesti",
    "misalnya",
    "mu",
    "mungkin",
    "nak",
    "namun",
    "nanti",
    "nescaya",
    "niscaya",
    "nya",
    "okey",
    "olah",
    "oleh",
    "orang",
    "pada",
    "padahal",
    "padanya",
    "padamu",
    "paling",
    "para",
    "pasti",
    "patut",
    "patutkah",
    "pelbagai",
    "per",
    "pergilah",
    "perkara",
    "perkaranya",
    "perlu",
    "pernah",
    "pertama",
    "ptg",
    "puan",
    "pula",
    "pun",
    "punya",
    "ra",
    "ramai",
    "riuh",
    "sa",
    "sadikit",
    "sahaja",
    "saja",
    "saling",
    "sama",
    "samakah",
    "sama-sama",
    "sambil",
    "sampai",
    "samping",
    "sana",
    "sangat",
    "sangatlah",
    "saperti",
    "satu",
    "saya",
    "se",
    "seandainya",
    "sebab",
    "sebagaimana",
    "sebagainya",
    "sebanyak",
    "sebarang",
    "sebelum",
    "sebelummu",
    "sebelumnya",
    "sebenarnya",
    "sebuah",
    "secara",
    "sedang",
    "sedangkan",
    "sedikit",
    "sedikitpun",
    "segala",
    "sehingga",
    "sejak",
    "sejauh",
    "sekali",
    "sekalian",
    "sekalipun",
    "sekarang",
    "sekejap",
    "sekian",
    "sekiranya",
    "sekitar",
    "sekurang",
    "selain",
    "selalu",
    "selama",
    "selama-lamanya",
    "selepas",
    "seluruh",
    "seluruhnya",
    "semakin",
    "semasa",
    "sementara",
    "semua",
    "semuanya",
    "semula",
    "senantiasa",
    "sendiri",
    "seolah",
    "seolah-olah",
    "seorang",
    "seorangpun",
    "separuh",
    "sepatutnya",
    "seperti",
    "seraya",
    "seri",
    "sering",
    "serta",
    "seseorang",
    "sesiapa",
    "sesuatu",
    "sesudah",
    "sesudahnya",
    "sesungguhnya",
    "sesungguhnyakah",
    "setakat",
    "setelah",
    "seterusnya",
    "setiap",
    "siapa",
    "siapakah",
    "sikit",
    "sini",
    "situ",
    "situlah",
    "sri",
    "suatu",
    "sudah",
    "sudahkah",
    "sunggoh",
    "sungguhpun",
    "supaya",
    "ta",
    "tadi",
    "tadinya",
    "tahu",
    "tahukah",
    "tak",
    "tanpa",
    "tanya",
    "tanyakanlah",
    "tapi",
    "telah",
    "tentang",
    "tentu",
    "tepuk",
    "terdapat",
    "terhadap",
    "terhadapmu",
    "terlalu",
    "termasuk",
    "terpaksa",
    "tersebut",
    "tertentu",
    "terus",
    "terutama",
    "terutamanya",
    "tetapi",
    "tiada",
    "tiadakah",
    "tiadalah",
    "tiap",
    "tiap-tiap",
    "tidak",
    "tidakkah",
    "tidaklah",
    "tiga",
    "tuan",
    "turut",
    "umpama",
    "untok",
    "untuk",
    "untukmu",
    "wahai",
    "walau",
    "walaupun",
    "ya",
    "yaini",
    "yaitu",
    "yakni",
    "yang",
]

custom_sw = [
    "menteri",
    "kementerian",
    "malaysia",
    "negara",
    "di-pertua",
    "peratus",
    "pertua",
    "isu",
    "akta",
    "bahas",
    "rm",
    "di",
    "dasar",
    "dewan",
    "parlimen",
    "ahli",
    "mohon",
]
custom_sp = ["terima kasih", "di pertua"]

# most of the settings here doesn't actually remove - not working
tokenizer = malaya.tokenizer.Tokenizer(
    numbers=False,
    title=False,
    percents=False,
    money=False,
    date=False,
    time=False,
    pukul=False,
    distance=False,
    temperature=False,
    volume=False,
    duration=False,
    weight=False,
)

stopwords_malaya = get_stopwords()
stopwords_bm = list(set(stopwords_mhc + stopwords_malaya + custom_sw))
stopwords_en = nltk.corpus.stopwords.words("english")
stopwords = stopwords_bm + stopwords_en


def preprocess_malaya(speech):
    if pd.isnull(speech):
        return speech
    # remove numbers
    speech = re.sub(r"\b\d+\b", "", speech)
    # remove custom list of stop phrases
    for sp in custom_sp:
        speech = re.sub(sp, "", speech, flags=re.IGNORECASE)
    # remove stopwords, punctuation, tokenise then stem
    toks = [
        tok  # sastrawi.stem(tok)
        for tok in tokenizer.tokenize(speech, lowercase=True)
        if tok not in stopwords
        and all(subtok not in string.punctuation + "–" for subtok in tok)
    ]
    return [x for x in toks if x != ""]

## ParliamentaryCycles (Meetings)

TODO:
- Check if fresh up to July 2024

Data Sources (DR/DN):
- Parsed and cleaned sesions and dates scraped from parlimen website, stored as calendar CSV files in `hansard-analysis` repo: https://github.com/govtechmy/hansards-analysis/blob/main/src_calendar/arkib_dr.csv

Data Sources (KKDR):
- Parse dates calendar from archive HTML file saved from parlimen web. 

Output:
- meetings.parquet

In [5]:
# KKDR calendar pre-processing - dfkkdr
par_num_map = {
    "Parlimen Kelima Belas": 15,
    "Parlimen Keempat Belas": 14,
    "Parlimen Ketiga Belas": 13,
}
num_map = {
    "pertama": 1,
    "kedua": 2,
    "ketiga": 3,
    "keempat": 4,
    "kelima": 5,
    "keenam": 6,
    "ketujuh": 7,
    "kelapan": 8,
    "kesembilan": 9,
}

# expand all archives and save locally: https://www.parlimen.gov.my/hansard-dewan-khas.html?uweb=dr&arkib=yes
html_file = "./data/hansard-kkdr-2.html"
with open(html_file) as fopen:
    data = fopen.read()

soup = BeautifulSoup(data, "lxml")
spans = soup.find_all("span", {"class": "standartTreeRow"})
dates = [s.text for s in spans]
len(spans)

events = [date for date in dates if dateparser.parse(date) is None]

events_by_date = []
for event in events:
    if "Parlimen" in event:
        cur_parlimen = event
        parlimen_name = cur_parlimen.split("(")[0].strip()
    if "Penggal" in event:
        cur_penggal = event
    if "Mesyuarat" in event:
        cur_mesyuarat = event
        meeting_name = cur_mesyuarat.split("(")[0].strip()
        matches = re.findall(r"\d{2}/\d{2}/\d{4}", event)
        start_date, end_date = matches[0], matches[1]
        events_by_date.append(
            {
                "parlimen": parlimen_name,
                "penggal": cur_penggal,
                "mesyuarat": meeting_name,
                "date_start": start_date,
                "date_end": end_date,
            }
        )

dfkkdr = pd.DataFrame(events_by_date)
dfkkdr["parlimen_id"] = dfkkdr.parlimen.map(par_num_map)


# check partial match with keys
def get_partial_match(key, dictionary):
    return [value for k, value in dictionary.items() if k.lower() in key.lower()][0]


dfkkdr["penggal_id"] = dfkkdr.penggal.apply(
    lambda penggal: get_partial_match(penggal, num_map)
)
dfkkdr["mesyuarat_id"] = dfkkdr.mesyuarat.apply(
    lambda mes: get_partial_match(mes, num_map)
)

# add latest mesyuarat
# TODO: automate this
new_record = {
    "parlimen": "Parlimen Kelima Belas",
    "penggal": "Penggal Kedua",
    "mesyuarat": "Mesyuarat Ketiga",
    "date_start": "09/10/2023",
    "date_end": "30/11/2023",
    "parlimen_id": 15,
    "penggal_id": 2,
    "mesyuarat_id": 3,
}
dfkkdr = pd.concat([dfkkdr, pd.DataFrame([new_record])])

dfkkdr["date_start"] = pd.to_datetime(
    dfkkdr["date_start"], format="%d/%m/%Y"
).dt.date
dfkkdr["date_end"] = pd.to_datetime(
    dfkkdr["date_end"], format="%d/%m/%Y"
).dt.date

In [5]:
dfkkdr

Unnamed: 0,parlimen,penggal,mesyuarat,date_start,date_end,parlimen_id,penggal_id,mesyuarat_id
0,Parlimen Kelima Belas,Penggal Kedua,Mesyuarat Pertama,2023-02-13,2023-04-04,15,2,1
1,Parlimen Kelima Belas,Penggal Kedua,Mesyuarat Kedua,2023-05-22,2023-06-15,15,2,2
2,Parlimen Kelima Belas,Penggal Kedua,Mesyuarat Ketiga,2023-10-09,2023-11-30,15,2,3
3,Parlimen Kelima Belas,Penggal Ketiga,Mesyuarat Pertama,2024-02-26,2024-03-27,15,3,1
4,Parlimen Kelima Belas,Penggal Ketiga,Mesyuarat Kedua,2024-06-24,2024-07-18,15,3,2
5,Parlimen Keempat Belas,Penggal Pertama,Mesyuarat Pertama,2018-07-16,2018-08-16,14,1,1
6,Parlimen Keempat Belas,Penggal Pertama,Mesyuarat Kedua,2018-10-15,2018-12-11,14,1,2
7,Parlimen Keempat Belas,Penggal Kedua,Mesyuarat Pertama,2019-03-11,2019-04-11,14,2,1
8,Parlimen Keempat Belas,Penggal Kedua,Mesyuarat Kedua,2019-07-01,2019-07-18,14,2,2
9,Parlimen Keempat Belas,Penggal Kedua,Mesyuarat Ketiga,2019-10-07,2019-12-05,14,2,3


In [7]:
# Get list of all RAW PDFs
# TODO: Generalise this to list S3 bucket
# flat list of PDFs in DR-DDMMYY.pdf format
# on S3: s3://hansards-dataproc/raw/
PDF_DRDN_PATH = Path(
    "/Users/alifaiman/Documents/Work/hansards/hansards-pdf-raw"
)
PDF_KKDR_PATH = Path(
    "/Users/alifaiman/Documents/Work/hansards/hansards-kkdr-pdf"
)

filelist = []
for file in PDF_DRDN_PATH.rglob("*.pdf"):
    filelist.append(file)
for file in PDF_KKDR_PATH.rglob("*.pdf"):
    filelist.append(file)


def rename_pdf(filename):
    match = re.search(r"(DR|DN|KKDR)-(\d{2})(\d{2})(\d{4})", filename, re.IGNORECASE)
    house, day, month, year = match.groups()
    new_filename = f"{house.lower()}_{year}-{month}-{day}"
    date = pd.to_datetime(f"{year}-{month}-{day}")
    return pd.Series([house, new_filename, date])


df_pdfs = pd.DataFrame({"filepath": filelist})
df_pdfs["filename"] = df_pdfs.filepath.apply(lambda path: path.name)
df_pdfs["filename"] = df_pdfs.filename.str[:-4]  # expect original DR-01012020.pdf
df_pdfs[["house", "filename", "date"]] = df_pdfs.filename.apply(
    rename_pdf
)  # rename to dr_2020-01-01
df_pdfs.house = df_pdfs.house.map({"DR": 0, "DN": 1, "KKDR": 2})
df_pdfs.drop(columns=["filepath"], inplace=True)

# DR DN calendar from hansard-analysis repo
dfdr = pd.read_csv(ANALYSIS_REPO / "src_calendar/arkib_dr.csv")
dfdr["house"] = 0
dfdn = pd.read_csv(ANALYSIS_REPO / "src_calendar/arkib_dn.csv")
dfdn["house"] = 1
# KKDR calendar prepared earlier
dfkkdr["house"] = 2
dfkkdr["date_start"] = pd.to_datetime(dfkkdr["date_start"], format="%d/%m/%Y")
dfkkdr["date_end"] = pd.to_datetime(dfkkdr["date_end"], format="%d/%m/%Y")

df_meetings = pd.concat([dfdr, dfdn])
df_meetings["date_start"] = pd.to_datetime(df_meetings["date_start"], format="%d/%m/%Y")
df_meetings["date_end"] = pd.to_datetime(df_meetings["date_end"], format="%d/%m/%Y")
df_meetings = pd.concat([df_meetings, dfkkdr])
df_meetings["key"] = df_meetings.apply(
    lambda sit: f"{sit.parlimen_id}-{sit.penggal_id}-{sit.mesyuarat_id}", axis=1
)

# save to file
df_meetings.to_parquet("../api/seed/meetings.parquet", index=False, compression="gzip")

## Sittings

Data Sources:
- PDF file lists (df_pdf)
- Meetings Calendar (df_meetings)

Output:
- df_sittings

Caveats:
- 2 KKDR sittings fall outside of meeting ranges `["kkdr_2023-12-08", "kkdr_2023-09-05"]`

In [8]:
# merge meetings and PDFs to generate sittings
df_pdfs_dr = df_pdfs[df_pdfs.house == 0]
df_pdfs_dn = df_pdfs[df_pdfs.house == 1]
df_pdfs_kkdr = df_pdfs[df_pdfs.house == 2]
df_meetings_dr = df_meetings[df_meetings.house == 0]
df_meetings_dn = df_meetings[df_meetings.house == 1]
df_meetings_kkdr = df_meetings[df_meetings.house == 2]
print(
    "PDF files DR",
    len(df_pdfs_dr),
    "DN",
    len(df_pdfs_dn),
    "KKDR",
    len(df_pdfs_kkdr),
    "| Total",
    len(df_pdfs),
)

dr_tmp = (
    df_meetings_dr.assign(key=1)
    .merge(df_pdfs_dr.assign(key=1), on="key")
    .drop("key", axis=1)
)
df_sittings_dr = dr_tmp[
    (dr_tmp["date"] >= dr_tmp["date_start"]) & (dr_tmp["date"] <= dr_tmp["date_end"])
]

dn_tmp = (
    df_meetings_dn.assign(key=1)
    .merge(df_pdfs_dn.assign(key=1), on="key")
    .drop("key", axis=1)
)
df_sittings_dn = dn_tmp[
    (dn_tmp["date"] >= dn_tmp["date_start"]) & (dn_tmp["date"] <= dn_tmp["date_end"])
]

kkdr_tmp = (
    df_meetings_kkdr.assign(key=1)
    .merge(df_pdfs_kkdr.assign(key=1), on="key")
    .drop("key", axis=1)
)
df_sittings_kkdr = kkdr_tmp[
    (kkdr_tmp["date"] >= kkdr_tmp["date_start"])
    & (kkdr_tmp["date"] <= kkdr_tmp["date_end"])
]

# manually add KKDR sittings outside of parliamentary cycle range
sittings_out_of_range = {
    "kkdr_2023-12-08": {
        "parlimen": "Parlimen Kelima Belas",
        "parlimen_id": 15,
        "penggal": "Penggal Kedua",
        "penggal_id": 2,
        "mesyuarat": "Mesyuarat Ketiga",
        "mesyuarat_id": 3,
    },
    "kkdr_2023-09-05": {
        "parlimen": "Parlimen Kelima Belas",
        "parlimen_id": 15,
        "penggal": "Penggal Kedua",
        "penggal_id": 2,
        "mesyuarat": "Mesyuarat Kedua",
        "mesyuarat_id": 2,
    },
}
sittings_to_append = []
# for oor_filename, oor_data in sittings_out_of_range.items():
#     oor_filemeta = df_pdfs[df_pdfs.filename == oor_filename].iloc[0]
#     oor_data["house"] = oor_filemeta.house
#     oor_data["date"] = oor_filemeta.date
#     oor_data["filename"] = oor_filename
#     sittings_to_append.append(oor_data)

df_append = pd.DataFrame(sittings_to_append)
df_sittings_kkdr = pd.concat([df_sittings_kkdr, df_append]).reset_index(drop=True)

print(
    "Combined files DR",
    len(df_sittings_dr),
    "DN",
    len(df_sittings_dn),
    "KKDR",
    len(df_sittings_kkdr),
    "| Total",
    len(df_sittings_dr) + len(df_sittings_dn) + len(df_sittings_kkdr),
)

df_sittings_dr["house"] = 0
df_sittings_dn["house"] = 1
df_sittings_kkdr["house"] = 2
df_sittings = pd.concat([df_sittings_dr, df_sittings_dn, df_sittings_kkdr])
# df_sittings["date"] = df_sittings.date.dt.date

PDF files DR 34 DN 20 KKDR 34 | Total 88
Combined files DR 34 DN 19 KKDR 33 | Total 86


## Speeches

Data Sources:
- Parsed and tabulated `result.csv` files in `hansard-dataproc` repo

Outputs:
- `speeches.parquet`: All speeches for `Speech` data model, with speech_tokens
- `sittings.parquet`: Sittings from df_sittings with Pre-formatted speeches in JSON by Sitting (`speech_data`)

In [9]:
# runtime: ~30s
# read both DR and DN
dr_tabulated = DATAPROC_REPO / "src/tabulated/DR"
dn_tabulated = DATAPROC_REPO / "src/tabulated/DN"
kkdr_tabulated = DATAPROC_REPO / "src/tabulated/KKDR"

# get list of sittings
dr_csv_paths = list_parsed_sittings(dr_tabulated)
dn_csv_paths = list_parsed_sittings(dn_tabulated)
kkdr_csv_paths = list_parsed_sittings(kkdr_tabulated)

# read speeches
df_speech_dr = read_tabulated(dr_csv_paths, 0)
df_speech_dn = read_tabulated(dn_csv_paths, 1)
df_speech_kkdr = read_tabulated(kkdr_csv_paths, 2)
df_all = pd.concat([df_speech_dr, df_speech_dn, df_speech_kkdr])
df_all.reset_index(inplace=True)

df_all = merge_authored_annotations(df_all)
df_all.proc_speech = df_all.proc_speech.fillna("")

before drop: 1069974
after drop: 1051645


In [10]:
# # tokenize speeches into speech_tokens

# without parallelization: takes 15-20 mins
df_all["speech_tokens"] = df_all.proc_speech.apply(lambda text: preprocess_malaya(text))
# import dask.dataframe as dd
# from dask.distributed import Client, LocalCluster
# import dask

# dask.config.set({"dataframe.convert-string": False})

# cluster = LocalCluster()  # Launches a scheduler and workers locally
# client = Client(cluster)  # Connect to distributed cluster and override default
# print(f"Started cluster at {cluster.dashboard_link}")

# # Shut down the Dask client and cluster
# # client.close()
# # cluster.close()

In [10]:
# # runtime: ~5 mins
# ddf = dask.dataframe.from_pandas(
#     df_all, npartitions=6
# )
# # Apply the function using Dask's map_partitions
# ddf["speech_tokens"] = ddf["proc_speech"].map_partitions(
#     lambda df: df.apply(preprocess_malaya), meta=("speech_tokens", "object")
# )

# # Compute the results and convert back to pandas DataFrame
# df_all = ddf.compute()

In [11]:
df_sittings

Unnamed: 0,parlimen,parlimen_id,penggal,penggal_id,mesyuarat,mesyuarat_id,date_start,date_end,house_x,filename,house_y,date,house
4896,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Pertama,1,2024-02-26,2024-03-27,0,dr_2024-02-27,0,2024-02-27,0
4897,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Pertama,1,2024-02-26,2024-03-27,0,dr_2024-03-19,0,2024-03-19,0
4899,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Pertama,1,2024-02-26,2024-03-27,0,dr_2024-03-21,0,2024-03-21,0
4901,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Pertama,1,2024-02-26,2024-03-27,0,dr_2024-03-14,0,2024-03-14,0
4903,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Pertama,1,2024-02-26,2024-03-27,0,dr_2024-03-12,0,2024-03-12,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
28,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Kedua,2,2024-06-24,2024-07-18,2,kkdr_2024-07-03,2,2024-07-03,2
29,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Kedua,2,2024-06-24,2024-07-18,2,kkdr_2024-07-02,2,2024-07-02,2
30,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Kedua,2,2024-06-24,2024-07-18,2,kkdr_2024-07-10,2,2024-07-10,2
31,Parlimen Kelima Belas,15,Penggal Ketiga,3,Mesyuarat Kedua,2,2024-06-24,2024-07-18,2,kkdr_2024-07-17,2,2024-07-17,2


In [12]:
# pre-formatted speech data
for house, date in df_all.groupby(["house", "date"]).groups:
    one_sitting = df_all[(df_all.house == house) & (df_all.date == date)]
    speech_data = speeches_to_json(
        one_sitting
    )
    # save to df_sittings date, house
    matching_sitting = df_sittings.loc[
        (df_sittings.house == house) & (df_sittings.date == date), :
    ]
    if len(matching_sitting) == 1:
        df_sittings.loc[
            (df_sittings.house == house) & (df_sittings.date == date), "speech_data"
        ] = json.dumps(speech_data).encode("utf-8")
    else:
        print(f"Error: Matching: {len(matching_sitting)}: {house} {date}")



Error: Matching: 0: 0 2008-04-28 00:00:00
Error: Matching: 0: 0 2008-04-29 00:00:00
Error: Matching: 0: 0 2008-04-30 00:00:00
Error: Matching: 0: 0 2008-05-05 00:00:00
Error: Matching: 0: 0 2008-05-06 00:00:00
Error: Matching: 0: 0 2008-05-07 00:00:00
Error: Matching: 0: 0 2008-05-08 00:00:00
Error: Matching: 0: 0 2008-05-12 00:00:00
Error: Matching: 0: 0 2008-05-13 00:00:00
Error: Matching: 0: 0 2008-05-14 00:00:00
Error: Matching: 0: 0 2008-05-15 00:00:00
Error: Matching: 0: 0 2008-05-20 00:00:00
Error: Matching: 0: 0 2008-05-21 00:00:00
Error: Matching: 0: 0 2008-05-22 00:00:00
Error: Matching: 0: 0 2008-05-26 00:00:00
Error: Matching: 0: 0 2008-05-27 00:00:00
Error: Matching: 0: 0 2008-05-28 00:00:00
Error: Matching: 0: 0 2008-05-29 00:00:00
Error: Matching: 0: 0 2008-06-23 00:00:00
Error: Matching: 0: 0 2008-06-24 00:00:00
Error: Matching: 0: 0 2008-06-25 00:00:00
Error: Matching: 0: 0 2008-06-26 00:00:00
Error: Matching: 0: 0 2008-06-30 00:00:00
Error: Matching: 0: 0 2008-07-01 0

In [13]:
# save sittings to parquet
df_sittings.to_parquet("../api/seed/sittings.parquet", compression="gzip", index=False)
df_all.to_parquet("../api/seed/speeches.parquet", compression="gzip", index=False)

## Author/AuthorHistory

Should be seeded from parliament DB, this seed generation is currently manual from open data.

Config required:
- Start dates of each GE

Data Sources:
- `candidates_master.csv` - from thevesh with UID generated

Outputs:
- `authors.parquet`
- `author_history.parquet`

In [14]:
# switch to candidate master CSV file generated on 7th Feb 2024
cand_master = pd.read_csv(
    "data/candidates_master.csv"
)
cand_master["name"] = cand_master["name"].str.upper()
# only winners
cand_master = cand_master[cand_master["result"] == 1]
# trim party names to abbreviations
cand_master.party = cand_master.party.apply(
    lambda x: re.search(r"\((.*?)\)", x).group(1)
)
# area names for matching to attendance
cand_master["area_name"] = cand_master.parlimen.str[5:].str.strip()

# unique authors
df_authors = cand_master[
    ["uid", "name", "sex", "birth_year", "ethnicity"]
].drop_duplicates(subset="uid", keep="last")
df_authors["sex"] = df_authors.sex.map({"male": "m", "female": "f"})
df_authors = df_authors.rename(columns={"uid": "new_author_id"})

df_authors.to_parquet("../api/seed/authors.parquet", compression="gzip", index=False)

In [15]:
# generate author history
start_dates = {
    12: pd.to_datetime("2008-03-08"),
    13: pd.to_datetime("2013-05-05"),
    14: pd.to_datetime("2018-05-09"),
    15: pd.to_datetime("2022-11-19"),
}

cand_master_merged = cand_master.copy()

# Add start date based on GE
cand_master_merged["start_date"] = cand_master_merged["ge"].map(start_dates)

# For end_date, we need to shift the start_date to the next term for each constituency.
# If a rep continues, the end_date will be the start of their next term; otherwise, it's their current term's end.
# First, sort values to ensure correct shifting
winners_df = cand_master_merged.sort_values(by=["parlimen", "ge"])

# Generate end_date by shifting start_date. The shift needs to be done within each group of 'parlimen'.
cand_master_merged["end_date"] = cand_master_merged.groupby("parlimen")[
    "start_date"
].shift(-1)


# Check if we need to merge continuous terms for the same rep in the same constituency
# This involves grouping by 'parlimen' and 'name' and checking if consecutive terms exist without a break.
def merge_continuous_terms(group):
    continuous_terms = []
    for _, row in group.iterrows():
        if (
            not continuous_terms
            or row["start_date"] != continuous_terms[-1]["end_date"]
        ):
            continuous_terms.append(
                {
                    "ge": row["ge"],
                    "parlimen": row["parlimen"],
                    "uid": row["uid"],
                    "party": row["party"],
                    "start_date": row["start_date"],
                    "end_date": row["end_date"],
                }
            )
        else:
            continuous_terms[-1]["end_date"] = row["end_date"]
    return pd.DataFrame(continuous_terms)


# Function to merge continuous terms, now considering party changes as a break in continuity.
def merge_terms_with_party_switches(group):
    terms_with_party_switches = []
    for _, row in group.iterrows():
        # Check for continuous terms and party consistency
        if (
            not terms_with_party_switches
            or row["start_date"] != terms_with_party_switches[-1]["end_date"]
            or row["party"] != terms_with_party_switches[-1]["party"]
        ):
            terms_with_party_switches.append(
                {
                    "ge": row["ge"],
                    "parlimen": row["parlimen"],
                    "name": row["name"],
                    "uid": row["uid"],
                    "party": row["party"],
                    "start_date": row["start_date"],
                    "end_date": row["end_date"],
                }
            )
        else:
            terms_with_party_switches[-1]["end_date"] = row["end_date"]
    return pd.DataFrame(terms_with_party_switches)


# Apply the function to each group of 'parlimen', 'name', and 'party'
merged_terms_df = pd.concat(
    [
        merge_terms_with_party_switches(group)
        for _, group in cand_master_merged.groupby(["parlimen", "uid", "party"])
    ]
)
merged_terms_df.reset_index(drop=True, inplace=True)

# party, area (parlimen), start_date, end_date, current
df_author_hist = merged_terms_df[["uid", "parlimen", "start_date", "end_date", "party"]]
df_author_hist = df_author_hist.rename(
    columns={"uid": "new_author_id", "parlimen": "area"}
)

df_author_hist.to_parquet(
    "../api/seed/author_history.parquet", compression="gzip", index=False
)

## Attendance

Data Source:
- Parsed attendance txt files in `hansards-dataproc` (only for DR)
- Candidate master list from Author seed
- Prod: From parliament DB

Outputs:
- `attendance.parquet`

In [16]:
# read parsed attendance txt files
dr_tabulated = DATAPROC_REPO / "src/tabulated/DR"
all_attn_paths = list(Path(dr_tabulated).rglob("*.txt"))

df_attn_files = pd.DataFrame({"paths": all_attn_paths})
df_attn_files["date"] = df_attn_files.paths.apply(lambda path: path.parts).str[-2]
df_attn_files["type"] = (
    df_attn_files.paths.apply(lambda path: path.parts).str[-1].str[:-4]
)
df_attn_files["house"] = df_attn_files.paths.apply(lambda path: path.parts).str[-3]

# setup area names from GE15 ballots data (only works for 15th parliament)
df15_names = cand_master[cand_master["ge"] >= 14]
area_names = [
    "(" + "".join(x.split()).lower() + ")" for x in df15_names.area_name.tolist()
]

attendance = {}
absence = {}

for date in df_attn_files.date.unique():
    attn_path = df_attn_files[
        (df_attn_files.date == date) & (df_attn_files.type == "attended")
    ].paths.iloc[0]
    abs_path = df_attn_files[
        (df_attn_files.date == date) & (df_attn_files.type == "absent")
    ].paths.iloc[0]
    with open(attn_path, "r") as f:
        attended_text = f.read()
    with open(abs_path, "r") as f:
        absent_text = f.read()

    attendance[date] = [1 if area in attended_text else 0 for area in area_names]
    absence[date] = [1 if area in absent_text else 0 for area in area_names]

# check results
# check = pd.DataFrame(attendance) + pd.DataFrame(absence)
# # names that have no attendance/absent records
# check[check.sum(axis=1) < len(check.columns)]

df_attendance = pd.DataFrame(attendance)
df_attendance["name"] = df15_names.name.tolist()
df_attendance["parlimen"] = df15_names.parlimen.tolist()
df_attendance["party"] = df15_names.party.tolist()
df_attendance = df_attendance.melt(
    id_vars=["name", "parlimen", "party"], var_name="date", value_name="attendance"
)
df_attendance["date"] = pd.to_datetime(df_attendance.date)
# join with demog to get uid for linking to correct AuthorHistory
df_attendance = df_attendance.merge(
    cand_master[cand_master["ge"] == 15][["uid", "parlimen"]], on="parlimen"
)

# drop duplicates due to multiple names per uid (find out how this happens?)
df_attendance = df_attendance.drop_duplicates(subset=["uid", "date"])

df_attendance.to_parquet(
    "../api/seed/attendance.parquet", compression="gzip", index=False
)