In [None]:
from xml.etree.ElementTree import ElementTree

import duckdb
from datetime import datetime
from duckdb import FatalException
from pathlib import Path
from tqdm import tqdm

In [None]:
# Connect to sql database
con = duckdb.connect(database='../data/database/german-parliament.duckdb', read_only=False)

In [None]:
def create_db(reset_db=False):
    """
    Creates the necessary tables in the DuckDB database for storing plenary minutes.
    This function creates the 'party', 'speaker', and 'plenary_minute' tables, as well as sequences for party and speaker IDs.

    Args:
        reset_db (bool): If True, drops existing tables and sequences before creating new ones.
                          Defaults to False.

    Returns:
        None
    """
    if reset_db:
        con.execute("DROP TABLE IF EXISTS speech;")
        con.execute("DROP TABLE IF EXISTS speaker;")
        con.execute("DROP TABLE IF EXISTS party;")
        con.execute("DROP TABLE IF EXISTS plenary_minute;")
        con.execute("DROP TABLE IF EXISTS session;")


        con.execute("DROP SEQUENCE IF EXISTS party_id_seq;")
        con.execute("DROP SEQUENCE IF EXISTS speaker_id_seq;")
        con.execute("DROP SEQUENCE IF EXISTS legislative_period_id_seq;")
        con.execute("DROP SEQUENCE IF EXISTS session_id_seq;")
        con.execute("DROP SEQUENCE IF EXISTS speech_id_seq;")

    # Create a sequence for party IDs to ensure unique IDs for each party. It's like an auto-incrementing primary key.
    con.execute("CREATE SEQUENCE IF NOT EXISTS party_id_seq START 1;")
    # Create the party which will be used to identify a party
    con.execute(
        """
        CREATE TABLE IF NOT EXISTS party (
            id INTEGER DEFAULT nextval('party_id_seq') PRIMARY KEY,
            -- If we insert a party we just use DEFAULT to get the next value from the sequence.
            name VARCHAR UNIQUE NOT NULL
        );
        """
    )

    # Create a sequence for speaker IDs to ensure unique IDs for each speaker.
    con.execute("CREATE SEQUENCE IF NOT EXISTS speaker_id_seq START 1;")
    # Create the speaker which will be used to identify a speaker
    con.execute("""
        CREATE TABLE IF NOT EXISTS speaker (
            harmonised_id INTEGER DEFAULT nextval('speaker_id_seq') UNIQUE,
            -- harmonised_id is used to identify the speaker in the XML file of all periods. The speaker id provided by the Bundestag for periods >= 19 is not speaker unique!
            -- If we insert a speaker we just use DEFAULT to get the next value from the sequence.
            name VARCHAR NOT NULL,
            party_id INTEGER REFERENCES party(id) NOT NULL,
            PRIMARY KEY (name, party_id)
        );
        """
    )

    # Create a sequence for legislative periods to ensure unique IDs for each legislative period.
    con.execute("CREATE SEQUENCE IF NOT EXISTS legislative_period_id_seq START 1;")
    # Create the session table to store information about legislative periods and sessions.
    con.execute("""
        CREATE TABLE IF NOT EXISTS session (
            id INTEGER DEFAULT nextval('legislative_period_id_seq') UNIQUE NOT NULL,
            legislative_period INTEGER NOT NULL,
            session_no INTEGER NOT NULL,
            date DATE NOT NULL,
            primary key (legislative_period, session_no)
        );
        """
    )

    # Create a sequence for plenary minute IDs to ensure unique IDs for each plenary minute.
    con.execute("CREATE SEQUENCE IF NOT EXISTS speech_id_seq START 1;")
    con.execute("""
        CREATE TABLE IF NOT EXISTS speech (
            id VARCHAR DEFAULT nextval('speech_id_seq') PRIMARY KEY,
            -- id is a unique identifier for each speech, generated from the sequence for speeches for periods < 19.
            -- For newer periods the bundestag provides a unique id for each speech. Those begin with "ID"
            session_id INTEGER REFERENCES session(id) NOT NULL,
            speaker_harmonised_id INTEGER REFERENCES speaker(harmonised_id), -- we can use harmonised_id, since this id is available for all periods
            role VARCHAR, -- Role of the speaker, e.g. 'mp'
            position VARCHAR, -- Position of the speaker, e.g. 'Präsident', 'Alterspräsident', etc. | In newer periods this is named Rolle!
            content TEXT NOT NULL
        );
        """
    )
    con.commit()

In [None]:
def parse_xml(file_path):
    """
    Parses an XML file and returns the root element.

    Args:
        file_path (str): The path to the XML file.

    Returns:
        xml.etree.ElementTree.Element: The root element of the parsed XML tree,
                                         or None if parsing fails.
    """
    try:
        tree = ElementTree.parse(file_path)
        root = tree.getroot()
        return root
    except ElementTree.ParseError as e:
        print(f"Error parsing XML file: {e}")
        return None
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None

In [None]:
def insert_session(legislative_period, session, date):
    """ Inserts session information into the session table.

    Args:
        meta_data (xml.etree.ElementTree.Element): The title statement element containing legislative period and session information, as well as the date.

    Raises:
        Exception: If the titleStmt is None or if the publicationStmt is not found in the XML file.

    Returns:
        None
    """

    sql = """
        INSERT INTO session (legislative_period, session_no, date)
        VALUES (?, ?,?)
        ON CONFLICT DO NOTHING;
    """

    params = (
        legislative_period,
        session,
        date
    )
    con.execute(sql, params)


def insert_party(attributes):
    """ Inserts party information into the party table. If the party already exists, it does nothing.

    Args:
        attributes (dict): A dictionary containing the party name under the key "party".

    """
    sql = "INSERT INTO party (name) VALUES (?) ON CONFLICT DO NOTHING;" # ID is automatically generated by the sequence.
    con.execute(sql, (attributes["party"],))


def insert_speaker(attributes):
    """ Inserts speaker information into the speaker table. If the speaker already exists, it does nothing.

    Args:
        attributes (dict): A dictionary containing the speaker's name and party under the keys "name" and "party".

    """
    sql = """
            INSERT INTO speaker (name, party_id)
            VALUES (
                ?,                                    -- speaker’s full name
                (SELECT id FROM party WHERE name = ?) -- look-up party once, safely
            )
            ON CONFLICT DO NOTHING;
            """

    con.execute(sql, (attributes["name"], attributes["party"]))




def insert_speech(speech:ElementTree.Element, attributes:dict, legislative_period:str, session:str, speech_id=None):
    """ Inserts speech information into the speech table. If the speech already exists, it does nothing.
    Args:
        speech (xml.etree.ElementTree.Element): The speech element containing the content and speaker information.
        attributes (dict): A dictionary containing the speaker's name, party, role, and position.
        legislative_period (str): The legislative period of the session.
        session (str): The session number of the session.
        speech_id (str, optional): The unique identifier for the speech. Defaults to None.
                                For speeches from periods >= 19 we will pass a speech_id,
                                which is the id provided by the Bundestag.

    Returns:
        None
    """
    speech_string = ElementTree.tostring(speech, encoding='unicode') if speech is not None else ''

    party_id = con.execute(f"""
                    SELECT id FROM party WHERE name = '{attributes['party']}';
                """).fetchone()[0]
    speaker_id = con.execute(f"""
                    SELECT harmonised_id FROM speaker WHERE name = '{attributes['name']}' AND party_id = {party_id};
                """).fetchone()[0]
    session_id = con.execute(f"""
                    SELECT id FROM session WHERE legislative_period = '{legislative_period}' AND session_no = '{session}';
                 """).fetchone()[0]

    speaker_role = attributes.get("role")
    speaker_role = None if speaker_role == 'NA' else speaker_role  # Convert 'NA' to None for SQL NULL
    speaker_position = attributes.get("position")
    speaker_position = None if speaker_position == 'NA' else speaker_position  # Convert 'NA' to None for SQL NULL
    # values to bind — order must match the placeholders
    if speech_id is None:
        # If we have no speech_id, we know that this is a speech from a period < 19 and we can use our own, automatically generating id.
        sql = """
        INSERT INTO speech
               (session_id, speaker_harmonised_id, role, position, content)
        VALUES (?, ?, ?, ?, ?)
        ON CONFLICT DO NOTHING;
        """
        params = (
        session_id,
        speaker_id,
        speaker_role,
        speaker_position,
        speech_string
        )

        con.execute(sql, params)
    else:
        # If we have a speech_id, we know that this is a speech from a period >= 19 and we can insert the new_id speech id instead of using our proprietary id.
        sql = """
        INSERT INTO speech
               (id, session_id, speaker_harmonised_id, role, position, content)
        VALUES (?, ?, ?, ?, ?, ?)
        ON CONFLICT DO NOTHING;
        """
        params = (
            speech_id,
            session_id,
            speaker_id,
            speaker_role,
            speaker_position,
            speech_string
        )
        con.execute(sql, params)

def get_speaker_attributes(speaker:ElementTree.Element) -> dict:
    """
    Extracts speaker attributes from the speaker data.

    Args:
        speaker (xml.etree.ElementTree.Element): The speaker data element.

    Returns:
        dict: A dictionary containing the speaker's name, party, and ID.
    """
    speaker_data = speaker.find('name')
    # Extract the speaker's first and last name @todo maybe get rid of the title or save it in another column!
    speaker_title = speaker_data.find('titel')
    speaker_title = "" if speaker_title is None else speaker_title.text
    # Check if first name exists
    speaker_first_name = speaker_data.find('vorname')
    speaker_first_name = speaker_first_name.text if speaker_first_name is not None else None
    # Check if last name exist
    speaker_last_name = speaker_data.find('nachname')
    speaker_last_name = speaker_last_name.text if speaker_last_name is not None else None
    # @todo strip the name and get rid of \n
    speaker_full_name = None
    if speaker_first_name is None and speaker_last_name is None:
        #sometimes speeches are broken. See speech: ID196901400
        speaker_full_name = "NA"
    else:
        speaker_full_name = speaker_title + " " + speaker_first_name + " " + speaker_last_name
    # Extract the speakers party
    speaker_party = speaker_data.find('fraktion')
    if speaker_party is not None:
        speaker_party = speaker_party.text
    else:
        speaker_party = 'NA' # Use 'NA' if no party is found

    # Get the speaker position, e.g. "Alterspräsident"
    speaker_position = speaker_data.find('rolle')
    if speaker_position is not None:
        speaker_position = speaker_position.find("rolle_lang").text
    else:
        speaker_position = None

    # Return the speaker attributes as a dictionary
    return {"party": speaker_party, "name": speaker_full_name, "position": speaker_position}


def parse_old_plenary_minute(root):
    """ Parses the plenary minute XML root element and extracts speeches, speakers, and parties and inserts them into the database.
        This method is used for periods < 19, since the XML structure changed after period 18.
        The XML for those periods is provided by https://github.com/PolMine/GermaParlTEI

    Args:
        root (xml.etree.ElementTree.Element): The root element of the XML tree containing the data of a single plenary minute.

    Raises:
        Exception: If the XML root element is None.

    Returns:
        None

    """
    if root is not None:
        # Get the meta data information, i.e. date, session, etc.
        meta_data = root.find('teiHeader').find('fileDesc')

        titleStmt = meta_data.find('titleStmt')
        publicationStmt = meta_data.find('publicationStmt')

        legislative_period = titleStmt.find('legislativePeriod').text
        session = titleStmt.find('sessionNo').text
        date = publicationStmt.find('date').text

        # Insert session entry into the db
        insert_session(legislative_period, session, date)
        #Get the actual content of the plenary minute
        plenary_minute = root.find('text').find('body').findall('div')
        speeches = []
        # Get all 'div' elements in the 'body' of the 'text'. Those are the elements that contain the actual plenary minute content.
        for div in plenary_minute:
            speeches.extend(div.findall('sp'))

        for speech in speeches:
            # Get attributes of the speech element
            attributes = speech.attrib
            # Insert party of speaker into db
            insert_party(attributes)
            # Insert speaker (politician) into db
            insert_speaker(attributes)
            # Insert the speech into the db
            insert_speech(speech, attributes, legislative_period, session)
    else:
        raise Exception("Missing XML root element.")

def parse_new_plenary_minute(root:ElementTree.Element):
    """ Parses plenary minute XML root element and extracts speeches, speakers, and parties and inserts them into the database.
        This function is used for periods >= 19, since the XML structure changed.

    Args:
        root (xml.etree.ElementTree.Element): The root element of the XML tree containing the data of a single plenary minute.

    Returns:
        None
    """

    if root is not None:
        # Get metadata
        kopfdaten = root.find("vorspann").find("kopfdaten")
        # Get period and session no
        plenarprotokoll_nummer = kopfdaten.find("plenarprotokoll-nummer")
        legislative_period = plenarprotokoll_nummer.find("wahlperiode").text
        session_number_tmp = plenarprotokoll_nummer.find("sitzungsnr").text
        # Sometime the editors of the xml added (neu) -> maybe if the original xml was faulty?
        session_number = session_number_tmp.replace(" (neu)", "")
        # Get the date
        raw_date = kopfdaten.find("veranstaltungsdaten").find('datum').attrib.get("date")
        date = datetime.strptime(raw_date, "%d.%m.%Y")
        # Insert the session into the database
        insert_session(legislative_period, session_number, date) #@ todo this will be the existing function
        # Get the session data where the speeches are stored
        tagesordnungspunkte = root.find("sitzungsverlauf").findall("tagesordnungspunkt")
        # Iterate over each Tagesordnungspunkt (agenda item)
        for top in tagesordnungspunkte:
            speeches = top.findall('rede')
            for speech in speeches:
                # This will extract the speaker data and feeds it into the database.
                speech_id = speech.attrib.get('id')
                # Finds the section where the speech of the speaker begins, this avoid comments from other before the speech. Then it get's the speaker's information
                speaker = speech.find(".//p[@klasse='redner']").find('redner')
                # try:
                attributes = get_speaker_attributes(speaker)
                insert_party(attributes) # passed as list to ensure compatibility with the insert function @todo existing function
                insert_speaker(attributes) #@ todo existing function
                insert_speech(speech, attributes, legislative_period, session_number, speech_id) #@todo existing function
                # except AttributeError as e:
                #     print(ElementTree.tostring(speech, encoding='unicode'), "\n\n")
                #     traceback.print_tb(e.__traceback__)
                #     con.close()

def parse_legislative_period(path:str, period:int):
    """ Parses the plenary minutes for a given legislative period and inserts the data into the database.
        The XML files to be processed are stored in the path provided, e.g. '../data/plenary_minutes/wahlperiode_01'.

    Args:
        path (str): The path to the directory containing the XML files for the legislative period.
        period (int): The legislative period number.

    Returns:
        None
    """
    print(f"Parsing legislative period at path: {path}")
    # We materialise the generator once, to obtain all entries, so we can call len() on it
    files = list(Path(path).rglob("*"))
    for file in tqdm(files, total=len(files), desc="Processing files"):
        if file.suffix == ".xml":
            root = parse_xml(file)
            # Depending on the period we parse the XML differently, since the structure of the XML changed after period 18.
            if period <= 18:
                parse_old_plenary_minute(root)
            elif period > 18:
                parse_new_plenary_minute(root)
    print(f"Finished parsing legislative period at path: {path}")



def main():
    """
    Main function to create the database and parse plenary minutes for all legislative periods from 1 to 21.
    It creates the necessary database tables and sequences, then iterates through each legislative period,
    parsing the XML files and inserting the data into the database.
    """
    # Create database tables
    create_db(False)
    # Parse each period between period 1 and 21
    for period in range(1, 22):
        try:
            con.execute("BEGIN TRANSACTION;")
            parse_legislative_period(f'../data/plenary_minutes/wahlperiode_{period:02d}', period) # 02:d adjusts the number to be two digits, e.g. 1 -> 01.
            con.execute("COMMIT;")
        except FatalException as e:
            con.execute("ROLLBACK;")
            print(f"Error processing legislative period {period}: {e}")


In [None]:
if __name__ == "__main__":
    main()

# Always close connection to ensure that we won't have problems reconnecting to the db!
con.close()