## Extract generic and missing categories from SQL database

In [None]:
import sqlite3
import json
import os
import pandas as pd
import re
from IPython.display import display  # For Jupyter Notebook display

# SQLite database file path
DB_PATH = "../../data/pipeline2/sql/epd_database.sqlite"
OUTPUT_FILE_PATH = "../../data/pipeline2/sql/filtered_epd_data05.csv"

# Allowed Classifications (normalized)
ALLOWED_CATEGORIES = {
    "construction products, infrastructure and buildings",
    "construction products",
    ""  # Handling missing categories
}

# Synonym & Unit Normalization Dictionary
SYNONYM_DICT = {
    "kilogram": "kg",
    "kilograms": "kg",
    "gram": "g",
    "grams": "g",
    "liter": "L",
    "liters": "L",
    "metre": "m",
    "meter": "m",
    "centimetre": "cm",
    "centimeter": "cm",
    "millimetre": "mm",
    "millimeter": "mm"
}

def clean_text(text, lowercase=True):
    """Cleans and normalizes text: removes newlines, extra spaces, normalizes punctuation, and standardizes synonyms/units."""
    if not isinstance(text, str):
        return "N/A"

    text = text.strip().replace("\n", " ")          # Remove newlines and trim spaces
    text = re.sub(r'\s+', ' ', text)                  # Remove excessive spaces
    if lowercase:
        text = text.lower()                           # Lowercase text if desired
    # Normalize punctuation (replace "&" with "and")
    text = text.replace("&", "and")
    # Remove special characters (except spaces and common separators)
    text = re.sub(r'[^a-zA-Z0-9.:,\-_/()\s]', '', text)
    # Normalize synonyms and units
    for key, value in SYNONYM_DICT.items():
        text = text.replace(key, value)
    return text

def extract_epd_data(json_data):
    """Extracts key fields from the EPD JSON document, including additional fields."""
    try:
        process_info = json_data.get("processInformation", {})
        dataset_info = process_info.get("dataSetInformation", {})
        classification_info = dataset_info.get("classificationInformation", {}).get("classification", [])
        geography_info = process_info.get("geography", {}).get("locationOfOperationSupplyOrProduction", {})
        technology_info = process_info.get("technology", {})
        modelling_info = json_data.get("modellingAndValidation", {})
        exchanges_info = json_data.get("exchanges", {})

        # Extract key fields
        uuid = dataset_info.get("UUID", "")
        base_name = clean_text(dataset_info.get("name", {}).get("baseName", [{}])[0].get("value", ""), lowercase=False)
        classification = ""
        database_name = ""
        if classification_info:
            classification = clean_text(classification_info[0]["class"][0].get("value", ""))
            database_name = clean_text(classification_info[0].get("name", ""))
        geographic_location = geography_info.get("location", "")
        technology_description = ""
        if "technologyDescriptionAndIncludedProcesses" in technology_info:
            tech_desc_list = technology_info["technologyDescriptionAndIncludedProcesses"]
            if tech_desc_list:
                technology_description = clean_text(tech_desc_list[0].get("value", ""), lowercase=False)
        technological_applicability = ""
        if "technologicalApplicability" in technology_info:
            tech_applicability_list = technology_info["technologicalApplicability"]
            if tech_applicability_list and isinstance(tech_applicability_list, list) and len(tech_applicability_list) > 0:
                technological_applicability = clean_text(tech_applicability_list[0].get("value", ""), lowercase=False)
        lca_method_details = ""
        lci_method_allocation = modelling_info.get("LCIMethodAndAllocation", {})
        ref_to_lca_method_details = lci_method_allocation.get("referenceToLCAMethodDetails", [])
        if ref_to_lca_method_details and isinstance(ref_to_lca_method_details, list) and len(ref_to_lca_method_details) > 0:
            short_desc_list = ref_to_lca_method_details[0].get("shortDescription", [])
            if short_desc_list and isinstance(short_desc_list, list) and len(short_desc_list) > 0:
                lca_method_details = clean_text(short_desc_list[0].get("value", ""), lowercase=False)
        flow_dataset_short_desc = ""
        exchanges = exchanges_info.get("exchange", [])
        if exchanges and isinstance(exchanges, list) and len(exchanges) > 0:
            ref_to_flow_dataset = exchanges[0].get("referenceToFlowDataSet", {})
            short_desc_list = ref_to_flow_dataset.get("shortDescription", [])
            if short_desc_list and isinstance(short_desc_list, list) and len(short_desc_list) > 0:
                flow_dataset_short_desc = clean_text(short_desc_list[0].get("value", ""), lowercase=False)
        flow_property_name = ""
        if exchanges and isinstance(exchanges, list) and len(exchanges) > 0:
            flow_properties = exchanges[0].get("flowProperties", [])
            if flow_properties and isinstance(flow_properties, list) and len(flow_properties) > 0:
                name_list = flow_properties[0].get("name", [])
                if name_list and isinstance(name_list, list) and len(name_list) > 0:
                    flow_property_name = clean_text(name_list[0].get("value", ""), lowercase=False)
        flow_property_mean_value = ""
        if exchanges and isinstance(exchanges, list) and len(exchanges) > 0:
            flow_properties = exchanges[0].get("flowProperties", [])
            if flow_properties and isinstance(flow_properties, list) and len(flow_properties) > 0:
                flow_property_mean_value = flow_properties[0].get("meanValue", "")
        flow_property_reference_unit = ""
        if exchanges and isinstance(exchanges, list) and len(exchanges) > 0:
            flow_properties = exchanges[0].get("flowProperties", [])
            if flow_properties and isinstance(flow_properties, list) and len(flow_properties) > 0:
                flow_property_reference_unit = flow_properties[0].get("referenceUnit", "")
        material_properties_cols = {}
        if exchanges:
            mat_props = exchanges[0].get("materialProperties", [])
            for idx, mp in enumerate(mat_props, start=1):
                mp_name  = clean_text(mp.get("name", ""),  lowercase=False)
                mp_value = mp.get("value", "")
                mp_unit  = mp.get("unit", "")
                material_properties_cols[f"Material Property {idx}"] = f"{mp_name}: {mp_value} {mp_unit}".strip()

        
        # Filter the data based on allowed classifications.
        if classification in ALLOWED_CATEGORIES:
            record = {
                "EPD Name": base_name,
                "Technology Description": technology_description,
                "Classification": classification,
                "Database Name": database_name,
                "Technological Applicability": technological_applicability,
                "LCA Method Details": lca_method_details,
                "Flow Dataset Short Description": flow_dataset_short_desc,
                "Flow Property Name": flow_property_name,
                "Flow Property Mean Value": flow_property_mean_value,
                "Flow Property Reference Unit": flow_property_reference_unit,
                "Geographic Location": geographic_location,
                "UUID": uuid,
            }

            # add dynamic material-property columns
            record.update(material_properties_cols)
            return record
        else:
            return None

    except Exception as e:
        print(f"Error extracting data: {e}")
        return None

def fetch_epd_data():
    """Fetches and filters EPD data from SQLite."""
    try:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()

        # Fetch all rows from the table
        cursor.execute("SELECT document FROM epd_documents")
        rows = cursor.fetchall()

        extracted_data = []
        unique_classifications = set()

        for row in rows:
            try:
                json_data = json.loads(row[0])
                epd_info = extract_epd_data(json_data)
                if epd_info:
                    extracted_data.append(epd_info)
                    unique_classifications.add(epd_info["Classification"])
            except json.JSONDecodeError:
                print("Skipping invalid JSON record.")

        conn.close()

        print("\n### Unique Classifications Found ###")
        for cls in sorted(unique_classifications):
            print(f"- {cls}")

        df = pd.DataFrame(extracted_data)
        return df

    except sqlite3.Error as e:
        print(f"SQLite error: {e}")
        return None

# Run extraction and display results.
df_epd = fetch_epd_data()

if df_epd is not None and not df_epd.empty:
    display(df_epd)
    output_dir = os.path.dirname(OUTPUT_FILE_PATH)
    os.makedirs(output_dir, exist_ok=True)
    try:
        df_epd.to_csv(OUTPUT_FILE_PATH, index=False, encoding="utf-8")
        print(f"Filtered EPD data saved to: {OUTPUT_FILE_PATH}")
    except Exception as e:
        print(f"Error saving CSV file: {e}")
else:
    print("No matching EPDs found.")


## Check Category Distribuition

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os

# Define file path
file_path = "../../data/pipeline2/sql/filtered_epd_data05.csv"

# Check if the file exists
if not os.path.exists(file_path):
    print(f"File not found: {file_path}")
else:
    # Read the CSV file
    df_epd = pd.read_csv(file_path)

    # Replace missing values in the Classification column with "Missing"
    df_epd["Classification"] = df_epd["Classification"].fillna("Missing")

    # Get classification counts
    classification_counts = df_epd["Classification"].value_counts()

    # Plot the classification distribution
    plt.figure(figsize=(12, 6))
    ax = classification_counts.plot(kind="bar", edgecolor="black", color="green")

    # Formatting the plot
    plt.xlabel("Category")
    plt.ylabel("Count")
    plt.title("Distribution of Problematic EPD Categories (Including Missing)")
    plt.xticks(rotation=0)
    plt.grid(axis="y", linestyle="--", alpha=0.7)

    # Add count labels inside the bars
    for p in ax.patches:
        # Get bar dimensions
        width = p.get_width()
        height = p.get_height()
        x, y = p.get_xy()
        # Place label in the middle of the bar
        ax.text(x + width/2, y + height/2, int(height), 
                horizontalalignment='center', 
                verticalalignment='center', 
                color='white', fontsize=12)

    # Show the plot
    plt.show()

    # Print category counts in text formats
    print("\n### Category Counts ###")
    for classification, count in classification_counts.items():
        print(f"{classification}: {count}")


## Check Character and Token Counts of `technologyDescriptionAndIncludedProcesses`

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import tiktoken
import numpy as np
import statistics
import os

# File path for the processed EPD data
file_path = "../../data/pipeline2/sql/filtered_epd_data.csv"

# Check if the file exists
if not os.path.exists(file_path):
    print(f"File not found: {file_path}")
else:
    # Read the CSV file
    df_epd = pd.read_csv(file_path)

    # Combine relevant text columns into a single string per row
    df_epd["tech_descr"] = df_epd[["Technology Description"]].astype(str).agg(" ".join, axis=1)

    # Compute character and token counts
    lengths = df_epd["tech_descr"].apply(len).tolist()

    # Initialize tokenizer
    enc = tiktoken.get_encoding("o200k_base")
    token_counts = [len(enc.encode(text)) for text in df_epd["tech_descr"]]

    # Basic statistics
    mean_length = statistics.mean(lengths) if lengths else 0
    mean_tokens = statistics.mean(token_counts) if token_counts else 0

    median_length = np.median(lengths) if lengths else 0
    median_tokens = np.median(token_counts) if token_counts else 0

    std_length = np.std(lengths, ddof=1) if len(lengths) > 1 else 0
    std_tokens = np.std(token_counts, ddof=1) if len(token_counts) > 1 else 0

    percentiles_length = np.percentile(lengths, [25, 50, 75, 90, 95]) if lengths else [0] * 5
    percentiles_tokens = np.percentile(token_counts, [25, 50, 75, 90, 95]) if token_counts else [0] * 5

    iqr_length = percentiles_length[2] - percentiles_length[0]
    iqr_tokens = percentiles_tokens[2] - percentiles_tokens[0]

    # Print statistical measures
    print(f"Mean character count: {mean_length:.2f}")
    print(f"Median character count: {median_length:.2f}")
    print(f"Standard deviation (chars): {std_length:.2f}")
    print(f"25th, 50th, 75th, 90th, 95th percentiles (chars): {percentiles_length}")
    print(f"IQR (chars): {iqr_length:.2f}\n")

    print(f"Mean token count: {mean_tokens:.2f}")
    print(f"Median token count: {median_tokens:.2f}")
    print(f"Standard deviation (tokens): {std_tokens:.2f}")
    print(f"25th, 50th, 75th, 90th, 95th percentiles (tokens): {percentiles_tokens}")
    print(f"IQR (tokens): {iqr_tokens:.2f}\n")

    # Plot histograms for visual inspection
    plt.figure(figsize=(10, 5))
    plt.hist(lengths, bins=20, edgecolor="black")
    plt.title("Character Count Distribution")
    plt.xlabel("Character Count")
    plt.ylabel("Frequency")
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.hist(token_counts, bins=20, edgecolor="black")
    plt.title("Token Count Distribution")
    plt.xlabel("Token Count")
    plt.ylabel("Frequency")
    plt.show()

    # Print histogram data in numeric format
    def print_histogram_data(data, bins=20, title="Histogram"):
        counts, bin_edges = np.histogram(data, bins=bins)
        print(title)
        for i in range(len(counts)):
            lower = bin_edges[i]
            upper = bin_edges[i + 1]
            print(f"{lower:5.1f} - {upper:5.1f}: {counts[i]}")

    print_histogram_data(lengths, bins=20, title="Character Count Histogram Data")
    print_histogram_data(token_counts, bins=20, title="Token Count Histogram Data")


# Fetch EN ÖKOBAUDAT Categories from API

In [None]:
import requests
import xml.etree.ElementTree as ET
from urllib.parse import quote
import xml.dom.minidom

# Optimize to not go past level 2
# Problems getting all German categories because of umlauts in the URL 

# Base URL for the API endpoints
BASE_URL = "https://www.oekobaudat.de/OEKOBAU.DAT/resource/processes/categories/"

def get_categories(path):
    """
    Fetches categories from the API.
    - If path is empty, gets top-level categories.
    - Otherwise, retrieves subcategories for the given path.
    """
    if path:
        encoded_path = "/".join(quote(segment) for segment in path.split("/"))
        url = f"{BASE_URL}{encoded_path}/subcategories/?catSystem=oekobau.dat&lang=en&sort=id"
    else:
        url = f"{BASE_URL}?catSystem=oekobau.dat&lang=en&sort=id"
    
    print(f"Fetching URL: {url}")
    response = requests.get(url)
    response.raise_for_status()  # Raise error for bad responses
    return ET.fromstring(response.content)

def build_tree(path, xml_parent, level=0):
    """
    Recursively builds the XML tree:
    - 'path' is used to construct the API URL.
    - 'xml_parent' is the XML element to which child categories will be added.
    - 'level' indicates the recursion depth (for debugging).
    """
    indent = "  " * level
    print(f"{indent}Processing path: '{path or 'Top-level'}'")
    
    try:
        root = get_categories(path)
    except Exception as e:
        print(f"{indent}Error retrieving categories for path '{path}': {e}")
        return

    ns = {"sapi": "http://www.ilcd-network.org/ILCD/ServiceAPI"}
    for cat in root.findall(".//sapi:category", ns):
        cat_name = cat.text.strip() if cat.text else "Unknown"
        cat_id = cat.get("classId", "N/A")
        print(f"{indent}  Found category: {cat_name} (ID: {cat_id})")
        
        new_elem = ET.SubElement(xml_parent, "category", id=cat_id, name=cat_name)
        new_path = cat_name if not path else f"{path}/{cat_name}"
        build_tree(new_path, new_elem, level + 1)

def main():
    # Register namespaces for the output file
    ET.register_namespace("", "http://lca.jrc.it/ILCD/Categories")
    ET.register_namespace("common", "http://lca.jrc.it/ILCD/Common")
    ET.register_namespace("xsi", "http://www.w3.org/2001/XMLSchema-instance")
    
    # Create the root element <CategorySystem> with attribute name="OEKOBAU.DAT"
    root_elem = ET.Element("CategorySystem", {"name": "OEKOBAU.DAT"})
    # Create the <categories> container with dataType="Process"
    categories_elem = ET.SubElement(root_elem, "categories", {"dataType": "Process"})
    
    print("Starting category tree traversal...")
    build_tree("", categories_elem)
    
    # Convert the ElementTree to a string and pretty print it using minidom
    xml_str = ET.tostring(root_elem, encoding="utf-8")
    parsed = xml.dom.minidom.parseString(xml_str)
    pretty_xml_str = parsed.toprettyxml(indent="    ", encoding="UTF-8")
    
    # Write the pretty XML to file
    with open("../../data/pipeline2/xml/OEKOBAU.DAT_Categories_EN_API.xml", "wb") as f:
        f.write(pretty_xml_str)
    
    print("Finished. Categories saved to categories.xml")

if __name__ == "__main__":
    main()
