### Import

In [1]:
# Cell 1: Imports & Settings
import re
import json
import random
import warnings
from datetime import datetime
from typing import List, Dict, Optional
from collections import defaultdict, Counter

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

sns.set_style("whitegrid")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="pymongo")

print("Libraries loaded.")

Libraries loaded.


### Main analysis class

In [2]:
# Cell 2: CombinedParliamentaryAnalyzer Class
class CombinedParliamentaryAnalyzer:
    def __init__(self, honorific_dict: Dict):
        self.honorific_dict = honorific_dict
        self.pattern_discoveries = {
            'decade_headers': defaultdict(Counter),
            'honorific_variations': defaultdict(Counter),
            'language_code_switching': defaultdict(Counter),
        }
        self.completion_stats = {
            'header': {'found': 0, 'created': 0, 'failed': 0},
            'attendance': {'found': 0, 'created': 0, 'failed': 0},
            'discussion_start': {'found': 0, 'created': 0, 'failed': 0}
        }

    def run_complete_analysis(self, docs: List[Dict], sample_size: int = 500) -> Dict:
        completed = self.complete_all_documents(docs)
        patterns = self.analyze_document_patterns(completed, sample_size)
        return {
            'data_completion': {
                'original_document_count': len(docs),
                'completed_document_count': len(completed),
                'completion_statistics': self.completion_stats,
                'field_coverage': self.verify_field_coverage(completed)
            },
            'pattern_analysis': patterns
        }

    def complete_all_documents(self, docs: List[Dict]) -> List[Dict]:
        result = []
        for i, doc in enumerate(docs):
            if i % 100 == 0:
                print(f"  Completing {i+1}/{len(docs)}")
            d = doc.copy()
            d['header'] = self.ensure_header(doc)
            d['attendance'] = self.ensure_attendance(doc)
            d['discussion_start'] = self.ensure_discussion_start(doc)
            result.append(d)
        self.print_completion_summary(len(docs))
        return result

    def ensure_header(self, doc: Dict) -> str:
        if (h := doc.get('header', '')) and len(h.strip()) > 5:
            self.completion_stats['header']['found'] += 1
            return h
        if (txt := doc.get('full_text', '')):
            if (ex := self.extract_header_from_text(txt)):
                self.completion_stats['header']['created'] += 1
                return ex
        fb = self.create_header_from_metadata(doc) or "DEWAN RAKYAT"
        self.completion_stats['header']['created'] += 1
        return fb

    def ensure_attendance(self, doc: Dict) -> List[Dict]:
        if doc.get('attendance'):
            self.completion_stats['attendance']['found'] += 1
            return doc['attendance']
        for field in ['full_text', 'content_text', 'text']:
            if (txt := doc.get(field, '')):
                if (ex := self.extract_attendance_from_text(txt)):
                    self.completion_stats['attendance']['created'] += 1
                    return ex
        self.completion_stats['attendance']['created'] += 1
        return [{'note': 'Attendance not found'}]

    def ensure_discussion_start(self, doc: Dict) -> str:
        if (s := doc.get('discussion_start', '')) and len(s.strip()) > 5:
            self.completion_stats['discussion_start']['found'] += 1
            return s
        if (txt := doc.get('full_text', '')):
            if (ex := self.extract_discussion_start_from_text(txt)):
                self.completion_stats['discussion_start']['created'] += 1
                return ex
        date = doc.get('hansardDate', '')
        fb = f"Mesyuarat dimulakan pada {date}" if date else "Mesyuarat dimulakan"
        self.completion_stats['discussion_start']['created'] += 1
        return fb

    def extract_header_from_text(self, txt: str) -> Optional[str]:
        lines = txt.split('\n')[:20]
        header = []
        for line in lines:
            line = line.strip()
            if not line: continue
            if (len(line) < 120 and
                (line.isupper() or
                 any(k in line.upper() for k in ['DEWAN', 'PARLIMEN', 'MESYUARAT', 'BIL']) or
                 re.match(r'^(Bil|DR|Page|No)[\.\s]*\d+', line, re.I))):
                header.append(line)
            elif ':' in line and any(t in line for t in ['Yang Berhormat', 'Dato']):
                break
        return '\n'.join(header) if header else None

    def create_header_from_metadata(self, doc: Dict) -> str:
        parts = ['DEWAN RAKYAT']
        if (date := doc.get('hansardDate')):
            try:
                d = datetime.strptime(date[:10], '%Y-%m-%d')
                malay_days = ['Isnin', 'Selasa', 'Rabu', 'Khamis', 'Jumaat', 'Sabtu', 'Ahad']
                malay_months = ['Januari', 'Februari', 'Mac', 'April', 'Mei', 'Jun',
                                'Julai', 'Ogos', 'September', 'Oktober', 'November', 'Disember']
                parts.append(f"{malay_days[d.weekday()]}, {d.day} {malay_months[d.month-1]} {d.year}")
            except: parts.append(str(date))
        return '\n'.join(parts)

    def extract_discussion_start_from_text(self, txt: str) -> Optional[str]:
        lines = txt.split('\n')
        for i, line in enumerate(lines):
            line = line.strip()
            if not line or len(line) < 20 or any(k in line.upper() for k in ['DEWAN', 'KEHADIRAN']): continue
            if any(p in line.lower() for p in ['yang berhormat', 'tuan speaker']) and ':' in line:
                return '\n'.join(lines[max(0,i-1):i+2]).strip()
        return None

    def extract_attendance_from_text(self, txt: str) -> Optional[List[Dict]]:
        modern = [
            r'KEHADIRAN\s+AHLI[\s-]*AHLI\s+PARLIMEN[\s\S]*?(?=Ahli[\s-]*Ahli\s+Yang\s+Tidak|Senator|PERTANYAAN)',
            r'Ahli[\s-]*Ahli\s+Yang\s+Hadir\s*[:\-][\s\S]*?(?=Ahli[\s-]*Ahli\s+Yang\s+Tidak|Senator)',
        ]
        for p in modern:
            if (m := re.search(p, txt, re.I)):
                parsed = self._parse_modern_attendance(m.group(0))
                if len(parsed) >= 50: return parsed

        historical = [
            r'PRESENT\s*[:\-][\s\S]*?(?=ABSENT|QUESTIONS|The sitting)',
            r'MEMBERS\s+PRESENT[\s\S]*?(?=MEMBERS\s+ABSENT|ABSENT)',
        ]
        for p in historical:
            if (m := re.search(p, txt, re.I)):
                parsed = self._parse_historical_attendance(m.group(0))
                if len(parsed) >= 30: return parsed
        return None

    def _parse_modern_attendance(self, block: str) -> List[Dict]:
        entries = []
        lines = [l.strip() for l in block.split('\n') if l.strip()]
        i = 0
        while i < len(lines):
            line = lines[i]
            if any(h in line.upper() for h in ['KEHADIRAN', 'AHLI', 'HADIR']): 
                i += 1; continue
            m = re.match(r'(\d+)\.\s*(.+)', line)
            if not m: 
                i += 1; continue
            num, content = m.groups()
            i += 1
            while i < len(lines) and not re.match(r'^\d+\.', lines[i]):
                content += " " + lines[i].strip()
                i += 1
            const = re.search(r'[\[\(]([^]\)]+)[\]\)]', content)
            constituency = const.group(1).strip() if const else ""
            name_part = re.sub(r'[\[\(][^]\)]+[\]\)]', '', content).strip()
            title, name = self._split_title_name(name_part)
            entries.append({
                'number': int(num),
                'title': title,
                'name': name or name_part,
                'constituency': constituency,
                'party': self._extract_party(constituency),
                'format': 'modern'
            })
        return self._deduplicate_entries(entries)

    def _parse_historical_attendance(self, block: str) -> List[Dict]:
        entries = []
        for line in block.split('\n'):
            line = line.strip()
            if any(skip in line.upper() for skip in ['PRESENT', 'MEMBERS']): continue
            m = re.match(r"(?:The Honourable )?([A-Z]['A-Z\s]+)\s*\(([^)]+)\)", line, re.I)
            if not m:
                m = re.match(r"([A-Z]['A-Z\s]+)\s*\(([^)]+)\)", line, re.I)
            if not m: continue
            name_part, constituency = m.groups()
            title, name = self._split_title_name(name_part)
            entries.append({
                'number': len(entries) + 1,
                'title': title,
                'name': name or name_part,
                'constituency': constituency,
                'party': None,
                'format': 'historical'
            })
        return self._deduplicate_entries(entries)

    def _split_title_name(self, text: str) -> tuple[str, str]:
        matched_titles = []
        remaining = text
        for raw, std in sorted(self.honorific_dict.items(), key=lambda x: len(x[0]), reverse=True):
            pattern = re.escape(raw)
            if re.search(rf'\b{pattern}\b', text, re.I):
                matched_titles.append(std)
                remaining = re.sub(rf'\b{pattern}\b', '', remaining, flags=re.I)
        title_str = ' '.join(matched_titles)
        name = re.sub(r'\s+', ' ', remaining.strip())
        return title_str, name

    def _extract_party(self, const: str) -> Optional[str]:
        m = re.search(r'-\s*([A-Z]{2,6})\b', const) or re.search(r'\[([A-Z]{2,6})\]', const)
        return m.group(1) if m else None

    def _deduplicate_entries(self, entries: List[Dict]) -> List[Dict]:
        seen = set()
        unique = []
        for e in entries:
            key = f"{e['name'].lower()}_{e['constituency'].lower()}"
            if key not in seen:
                seen.add(key)
                unique.append(e)
        return unique

    def analyze_document_patterns(self, docs: List[Dict], sample_size: int = 500) -> Dict:
        sample = random.sample(docs, min(sample_size, len(docs)))
        processed = 0
        decade_dist = defaultdict(int)
        for doc in sample:
            txt = doc.get('full_text', '')
            if not txt: continue
            if not self._assess_document_quality(txt)['usable']: continue
            decade = self._extract_decade_from_document(doc)
            decade_dist[decade] += 1
            self._analyze_patterns(decade, txt)
            processed += 1
        return {
            'processing_summary': {
                'successfully_processed': processed,
                'decade_distribution': dict(decade_dist)
            },
            'discovered_patterns': dict(self.pattern_discoveries)
        }

    def _analyze_patterns(self, decade: str, txt: str):
        lines = txt.split('\n')
        for line in lines[:25]:
            line = line.strip()
            if not line: continue
            if self._is_header(line):
                self.pattern_discoveries['decade_headers'][decade][line] += 1
        for line in self._extract_speakers(txt)[:15]:
            for raw, std in self.honorific_dict.items():
                if re.search(rf'\b{re.escape(raw)}\b', line, re.I):
                    self.pattern_discoveries['honorific_variations'][decade][std] += 1
        self._analyze_language(txt, decade)

    def _is_header(self, line: str) -> bool:
        return len(line) < 120 and (line.isupper() or any(k in line.upper() for k in ['DEWAN','PARLIMEN','BIL']))

    def _extract_speakers(self, txt: str) -> List[str]:
        lines = txt.split('\n')
        speaker_lines = []
        for line in lines:
            line = line.strip()
            if not line or len(line) > 250 or line.startswith('('): continue
            if ':' not in line: continue
            if any(re.search(rf'\b{re.escape(raw)}\b', line, re.I) for raw in self.honorific_dict.keys()):
                speaker_lines.append(line)
        return speaker_lines

    def _analyze_language(self, txt: str, decade: str):
        malay = sum(len(re.findall(rf'\b{w}\b', txt.lower())) for w in ['yang','dan','adalah','akan'])
        eng = sum(len(re.findall(rf'\b{w}\b', txt.lower())) for w in ['the','and','is','with'])
        total = malay + eng
        if total > 10:
            if malay > eng * 1.5:
                self.pattern_discoveries['language_code_switching'][decade]['malay'] += 1
            elif eng > malay * 1.5:
                self.pattern_discoveries['language_code_switching'][decade]['english'] += 1
            else:
                self.pattern_discoveries['language_code_switching'][decade]['mixed'] += 1

    def _assess_document_quality(self, txt: str) -> Dict:
        wc = len(txt.split())
        rel = sum(1 for w in ['yang berhormat','dewan rakyat','parlimen','soalan'] if w in txt.lower()) / 4.0
        score = max(0.0, min(1.0, wc/2000)*0.4 + rel*0.6)
        return {'usable': score > 0.25 and wc > 50}

    def _extract_decade_from_document(self, doc: Dict) -> str:
        date = doc.get('hansardDate')
        if not date:
            return "unknown"
        try:
            match = re.search(r'(19[5-9]\d|20[0-2]\d)', str(date))
            if match:
                year = int(match.group(1))
                return f"{(year // 10) * 10}s"
        except (ValueError, AttributeError):
            pass
        return "unknown"

    def print_completion_summary(self, total: int):
        print("\nFIELD COMPLETION")
        for f, s in self.completion_stats.items():
            cov = (s['found'] + s['created']) / total * 100
            print(f"{f.title():15}: {cov:5.1f}%")

    def verify_field_coverage(self, docs: List[Dict]) -> Dict:
        return {f: {'percentage': sum(1 for d in docs if d.get(f))/len(docs)*100} for f in ['header','attendance','discussion_start']}

    def display_combined_results(self, res: Dict):
        print("\n" + "="*60)
        print("ANALYSIS SUMMARY")
        print("="*60)
        c = res['data_completion']
        print(f"Docs: {c['original_document_count']}")
        for f, cov in c['field_coverage'].items():
            print(f"  {f:18}: {cov['percentage']:5.1f}%")
        print(f"Pattern docs: {res['pattern_analysis']['processing_summary']['successfully_processed']}")

### EDA

In [3]:
# Cell 3: EDA Function
def run_eda(results: Dict, out_dir: str = "eda_plots"):
    import os
    os.makedirs(out_dir, exist_ok=True)
    p = results['pattern_analysis']['discovered_patterns']
    s = results['pattern_analysis']['processing_summary']

    plt.figure(figsize=(8,4))
    sns.barplot(x=list(s['decade_distribution'].keys()), y=list(s['decade_distribution'].values()))
    plt.title("Documents per Decade")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f"{out_dir}/decade.png"); plt.close()

    data = [(d, h[:60], f) for d, c in p['decade_headers'].items() for h, f in c.most_common(5)]
    if data:
        df = pd.DataFrame(data, columns=['Decade','Header','Freq'])
        plt.figure(figsize=(10,6))
        sns.barplot(data=df, x='Freq', y='Header', hue='Decade')
        plt.title("Top Headers")
        plt.tight_layout()
        plt.savefig(f"{out_dir}/headers.png"); plt.close()

    data = [(d, h, f) for d, c in p['honorific_variations'].items() for h, f in c.most_common(5)]
    if data:
        df = pd.DataFrame(data, columns=['Decade','Honorific','Freq'])
        plt.figure(figsize=(9,5))
        sns.barplot(data=df, x='Freq', y='Honorific', hue='Decade')
        plt.title("Top Honorifics")
        plt.tight_layout()
        plt.savefig(f"{out_dir}/honorifics.png"); plt.close()

    data = [(d, c.most_common(1)[0][0] if c else 'unknown') for d, c in p['language_code_switching'].items()]
    if data:
        df = pd.DataFrame(data, columns=['Decade','Lang'])
        plt.figure(figsize=(7,4))
        sns.countplot(data=df, x='Decade', hue='Lang')
        plt.title("Language Dominance")
        plt.tight_layout()
        plt.savefig(f"{out_dir}/language.png"); plt.close()

    print(f"EDA saved to {out_dir}/")

### Config

In [None]:
# Cell 4: Main Execution
import traceback
import os
from dotenv import load_dotenv
def run_combined_analysis():
    print("HANSARD ANALYZER (with honorific_dictionary)")
    print("="*55)
    try:
        from pymongo import MongoClient
        # Find the correct .env path relative to this notebook
        env_path = os.path.abspath(os.path.join(os.getcwd(), "../3_app_system/backend/.env"))
        if not os.path.exists(env_path):
            raise FileNotFoundError(f".env file not found at {env_path}")
        load_dotenv(env_path)

        URI = os.getenv("MONGODB_URI")
        if not URI:
            raise ValueError("MONGODB_URI not found in .env file")
        client = MongoClient(URI)
        db = client["MyParliament"]

        # === Load honorific_dictionary ===
        honorific_docs = list(db["honorific_dictionary"].find({}))
        honorific_dict = {}
        for doc in honorific_docs:
            categories = doc.get("categories", {})
            for cat_name, titles in categories.items():
                for title in titles:
                    honorific_dict[title] = title 
        print(f"Loaded {len(honorific_dict)} honorifics from categories")

        # === Load hansard ===
        docs = list(db["hansard_core500"].find({}))
        print(f"Loaded {len(docs)} hansard docs")

        analyzer = CombinedParliamentaryAnalyzer(honorific_dict)
        results = analyzer.run_complete_analysis(docs)
        analyzer.display_combined_results(results)

        # === Save ===
        with open("combined_parliament_analysis.json", "w", encoding="utf-8") as f:
            json.dump({
                "data_completion": results["data_completion"],
                "pattern_analysis": results["pattern_analysis"]
            }, f, indent=2, default=str, ensure_ascii=False)
        print("Saved: combined_parliament_analysis.json")

        run_eda(results)

        print("\nDone. Ready for cleaning.")
        return results
    except Exception as e:
        print(f"Error: {e}")
        traceback.print_exc()
        return None

In [10]:
# Cell 5: Run Analysis
results = run_combined_analysis()

HANSARD ANALYZER (with honorific_dictionary)
Loaded 20 honorifics from categories
Loaded 500 hansard docs
  Completing 1/500
  Completing 101/500
  Completing 201/500
  Completing 301/500
  Completing 401/500

FIELD COMPLETION
Header         : 100.0%
Attendance     : 100.0%
Discussion_Start: 100.0%

ANALYSIS SUMMARY
Docs: 500
  header            : 100.0%
  attendance        : 100.0%
  discussion_start  : 100.0%
Pattern docs: 500
Saved: combined_parliament_analysis.json
EDA saved to eda_plots/

Done. Ready for cleaning.
