In [10]:

!! pip install -U deep-translator

['Defaulting to user installation because normal site-packages is not writeable',
 'Collecting deep-translator',
 '  Downloading deep_translator-1.11.4-py3-none-any.whl.metadata (30 kB)',
 'Downloading deep_translator-1.11.4-py3-none-any.whl (42 kB)',
 'Installing collected packages: deep-translator',
 'Successfully installed deep-translator-1.11.4',
 '',
 '[notice] A new release of pip is available: 25.0 -> 25.0.1',
 '[notice] To update, run: python.exe -m pip install --upgrade pip']

In [None]:
import xml.etree.ElementTree as ET
import re
import csv
from deep_translator import GoogleTranslator

# Input and output file paths
input_path = "epd_data_XML_format.xml"  
output_path = "output.csv"

# CSV header columns 
header = [
    "UUID", "Version", "Name (original)", "Name (en)", 
    "Category (original)", "Category (en)", "Compliance", "Location code", "Type",
    "Reference year", "Valid until", "URL", "Declaration owner", "Publication date",
    "Registration number", "Registration authority", "Predecessor UUID", "Predecessor Version",
    "Predecessor URL", "Ref. quantity", "Ref. unit", "Reference flow UUID", "Reference flow name",
    "Bulk Density (kg/m3)", "Grammage (kg/m2)", "Gross Density (kg/m3)", "Layer Thickness (m)",
    "Productiveness (m2)", "Linear Density (kg/m)", "Weight Per Piece (kg)", "Conversion factor to 1kg",
    "Carbon content (biogenic) in kg", "Carbon content (biogenic) - packaging in kg",
    "Module", "Scenario", "Scenario Description",
    "GWP", "ODP", "POCP", "AP", "EP", "ADPE", "ADPF",
    "PERE", "PERM", "PERT", "PENRE", "PENRM", "PENRT",
    "SM", "RSF", "NRSF", "FW", "HWD", "NHWD", "RWD",
    "CRU", "MFR", "MER", "EEE", "EET",
    "AP (A2)", "GWPtotal (A2)", "GWPbiogenic (A2)", "GWPfossil (A2)", "GWPluluc (A2)",
    "ETPfw (A2)", "PM (A2)", "EPmarine (A2)", "EPfreshwater (A2)", "EPterrestrial (A2)",
    "HTPc (A2)", "HTPnc (A2)", "IRP (A2)", "SOP (A2)", "ODP (A2)", "POCP (A2)",
    "ADPF (A2)", "ADPE (A2)", "WDP (A2)", "GWP_IOBC_GHG", ""
]
# index lookup for columns
col_index = {col: idx for idx, col in enumerate(header)}

# Mappings for indicator acronyms to CSV column names
# Older EN15804+A1 indicators:
indicator_map_A1 = {
    "GWP": "GWP", "ODP": "ODP", "POCP": "POCP", "AP": "AP",
    "EP": "EP", "ADPE": "ADPE", "ADPF": "ADPF"
}
# New EN15804+A2 indicators (including split categories and new metrics):
indicator_map_A2 = {
    "AP": "AP (A2)",
    "GWP-total": "GWPtotal (A2)",
    "GWP-biogenic": "GWPbiogenic (A2)",
    "GWP-fossil": "GWPfossil (A2)",
    "GWP-luluc": "GWPluluc (A2)",
    "ETP-fw": "ETPfw (A2)",
    "PM": "PM (A2)",
    "EP-marine": "EPmarine (A2)",
    "EP-freshwater": "EPfreshwater (A2)",
    "EP-terrestrial": "EPterrestrial (A2)",
    "HTP-c": "HTPc (A2)",
    "HTP-nc": "HTPnc (A2)",
    "IRP": "IRP (A2)",
    "ODP": "ODP (A2)",
    "POCP": "POCP (A2)",
    "ADPF": "ADPF (A2)",
    "ADPE": "ADPE (A2)",
    "WDP": "WDP (A2)",
    "SQP": "SOP (A2)",            # Soil Quality Potential (land use) -> SOP (A2)
    "GWP-IOBC/GHG": "GWP_IOBC_GHG"  # Biogenic carbon uptake/emission -> GWP_IOBC_GHG
}
# Abbreviations for resource use and waste flows to column names (they match 1:1)
flow_map = {abbr: abbr for abbr in [
    "PERE", "PERM", "PERT", "PENRE", "PENRM", "PENRT",
    "SM", "RSF", "NRSF", "FW", "HWD", "NHWD", "RWD",
    "CRU", "MFR", "MER", "EEE", "EET"
]}

# Helper function: parse one <processDataSet> XML string and return list of CSV rows (as lists of values)
def parse_epd_xml(xml_string):
    root = ET.fromstring(xml_string)  # parse the EPD XML chunk
    
    # Base values (common to all module rows of this EPD)
    base = [""] * len(header)
    # Basic dataset metadata
    base[col_index["UUID"]] = (root.findtext('.//{http://lca.jrc.it/ILCD/Common}UUID') or "").strip()
    base[col_index["Version"]] = (root.findtext('.//{http://lca.jrc.it/ILCD/Common}dataSetVersion') or "").strip()
    
    # Initialize translator for English translation
    translator = GoogleTranslator(source='auto', target='en')
    # Initialize name variables
    name_original = ""
    name_en = ""
    # Name in multiple languages
    name_elem = root.find('.//{http://lca.jrc.it/ILCD/Process}name')
    if name_elem is not None:
        for bn in name_elem.findall('{http://lca.jrc.it/ILCD/Process}baseName'):
            lang = bn.get('{http://www.w3.org/XML/1998/namespace}lang', "")
            name_text = (bn.text or "").strip() 
            if not name_original:
                name_original = name_text
            if lang == "en" and not name_en:
                name_en = name_text   
    if not name_en and name_original:
        try:
            name_en = translator.translate(name_original)
        except Exception as e:
            print(f"Translation failed for: {name_original} ‚Üí {e}")
            name_en = name_original  # fallback to original if translation fails
    
    base[col_index["Name (original)"]] = name_original
    base[col_index["Name (en)"]] = name_en

    # Category classification (original and translated)
    class_info = root.find('.//{http://lca.jrc.it/ILCD/Process}classificationInformation')
    if class_info is not None:
        class_node = class_info.find('{http://lca.jrc.it/ILCD/Common}classification')
        if class_node is not None:
            classes = [cls.text or "" for cls in class_node.findall('{http://lca.jrc.it/ILCD/Common}class')]
            if classes:
                # Original categories with quotes
                orig_cats = " / ".join(f"'{c}'" for c in classes)
                base[col_index["Category (original)"]] = orig_cats
                # English categories (translate if known, else same as original)
                if class_node.get('name') == 'EPDNorge':
                    # Known translation mapping for EPD Norge categories:
                    trans = []
                    for c in classes:
                        if c == "Bygg":
                            trans.append("'Construction'")
                        elif "St√•l" in c:
                            trans.append("'Steel and Aluminium'")
                        else:
                            trans.append(f"'{c}'")
                    base[col_index["Category (en)"]] = " / ".join(trans)
                else:
                    base[col_index["Category (en)"]] = orig_cats

    
    # Compliance standards
    compliance_texts = []
    for comp in root.findall('.//{http://lca.jrc.it/ILCD/Common}referenceToComplianceSystem'):
        desc = comp.find('{http://lca.jrc.it/ILCD/Common}shortDescription')
        if desc is not None and desc.text:
            compliance_texts.append(f"'{desc.text}'")
    base[col_index["Compliance"]] = " / ".join(compliance_texts)

    # Geography/location
    loc_elem = root.find('.//{http://lca.jrc.it/ILCD/Process}locationOfOperationSupplyOrProduction')
    if loc_elem is not None:
        base[col_index["Location code"]] = loc_elem.get('location', "")

    # Dataset type (EPD subtype)
    subtype = root.find('.//{http://www.iai.kit.edu/EPD/2013}subType')
    if subtype is not None:
        base[col_index["Type"]] = (subtype.text or "").strip()
    
    # Time (reference year and valid until)
    year = root.findtext('.//{http://lca.jrc.it/ILCD/Common}referenceYear')
    if year:
        base[col_index["Reference year"]] = year.strip()
    valid_until = root.findtext('.//{http://lca.jrc.it/ILCD/Common}dataSetValidUntil')
    if valid_until:
        base[col_index["Valid until"]] = valid_until.strip()[:4]
    
    # URL (constructed from UUID and version)
    if base[col_index["UUID"]] and base[col_index["Version"]]:
        base[col_index["URL"]] = f"processes/{base[col_index['UUID']]}?version={base[col_index['Version']]}"
    
    # Declaration owner (organization)
    owner_node = root.find('.//{http://lca.jrc.it/ILCD/Common}referenceToOwnershipOfDataSet')
    if owner_node is not None:
        owner_name = owner_node.find('{http://lca.jrc.it/ILCD/Common}shortDescription')
        if owner_name is not None:
            base[col_index["Declaration owner"]] = owner_name.text or ""
    
    # Publication date of EPD
    pub_date = root.findtext('.//{http://www.indata.network/EPD/2019}publicationDateOfEPD')
    if pub_date:
        base[col_index["Publication date"]] = pub_date
    
    # Registration number and authority
    base[col_index["Registration number"]] = root.findtext('.//{http://lca.jrc.it/ILCD/Common}registrationNumber') or ""
    reg_auth_node = root.find('.//{http://lca.jrc.it/ILCD/Common}referenceToRegistrationAuthority')
    if reg_auth_node is not None:
        auth_name = reg_auth_node.find('{http://lca.jrc.it/ILCD/Common}shortDescription')
        if auth_name is not None:
            base[col_index["Registration authority"]] = auth_name.text or ""
    
    # Predecessor EPD reference (if any)
    prev_node = root.find('.//{http://www.indata.network/EPD/2019}referenceToPreviousEPD')
    if prev_node is not None:
        base[col_index["Predecessor UUID"]]   = prev_node.get('refObjectId', "")
        base[col_index["Predecessor Version"]] = prev_node.get('version', "")
        base[col_index["Predecessor URL"]]    = prev_node.get('uri', "")
    
    # Reference flow (declared unit) details
    ref_flow_id = root.findtext('.//{http://lca.jrc.it/ILCD/Process}referenceToReferenceFlow')
    if ref_flow_id:
        # Find the exchange entry with this internal ID
        for exch in root.findall('.//{http://lca.jrc.it/ILCD/Process}exchange'):
            if exch.get('dataSetInternalID') == ref_flow_id.strip():
                # Reference flow UUID and name
                flow = exch.find('{http://lca.jrc.it/ILCD/Process}referenceToFlowDataSet')
                if flow is not None:
                    base[col_index["Reference flow UUID"]] = flow.get('refObjectId', "")
                    # Choose English name if available
                    names = flow.findall('{http://lca.jrc.it/ILCD/Common}shortDescription')
                    ref_name = ""
                    for nm in names:
                        if nm.get('{http://www.w3.org/XML/1998/namespace}lang') == 'en':
                            ref_name = nm.text or ""
                            break
                        if not ref_name:
                            ref_name = nm.text or ""
                    base[col_index["Reference flow name"]] = ref_name.strip()
                # Reference flow amount and unit
                base[col_index["Ref. quantity"]] = exch.findtext('{http://lca.jrc.it/ILCD/Process}meanAmount') or ""
                unit_ref = exch.find('.//{http://www.iai.kit.edu/EPD/2013}referenceToUnitGroupDataSet')
                if unit_ref is not None:
                    unit_sd = unit_ref.find('{http://lca.jrc.it/ILCD/Common}shortDescription')
                    if unit_sd is not None:
                        base[col_index["Ref. unit"]] = unit_sd.text or ""
                if not base[col_index["Ref. unit"]] and base[col_index["Reference flow name"]]:
                    # Derive unit from reference flow name if not explicitly given
                    match = re.search(r'1\s+(\w+)$', base[col_index["Reference flow name"]])
                    if match:
                        base[col_index["Ref. unit"]] = match.group(1)
                break
    # Default missing biogenic carbon content to "0" as in sample
    if not base[col_index["Carbon content (biogenic) in kg"]]:
        base[col_index["Carbon content (biogenic) in kg"]] = "0"
    if not base[col_index["Carbon content (biogenic) - packaging in kg"]]:
        base[col_index["Carbon content (biogenic) - packaging in kg"]] = "0"

    # Determine which modules (life-cycle stages) are present
    module_values = {}  # track sum of values per module to identify empty ones
    for amt in root.findall('.//{http://www.iai.kit.edu/EPD/2013}amount'):
        mod = amt.get('{http://www.iai.kit.edu/EPD/2013}module')
        if mod:
            # Initialize tracking
            if mod not in module_values:
                module_values[mod] = 0.0
            # Sum numeric values (non-numeric treated as 0)
            try:
                module_values[mod] += float(amt.text)
            except:
                module_values[mod] += 0.0

    # Prepare output rows for each relevant module
    module_order = ["A1-A3", "A4", "A5", "B1", "B2", "B3", "B4", "B5", "B6", "B7", "C1", "C2", "C3", "C4", "D"]
    rows = []
    for mod in module_order:
        if mod in module_values:
            # Skip B modules if all values are zero (not reported in sample)
            if mod.startswith("B") and abs(module_values.get(mod, 0.0)) < 1e-12:
                continue
            row = base.copy()
            row[col_index["Module"]] = mod
            rows.append(row)
    if not rows:
        return rows

    # Determine EN15804 standard version (A2 or A1) for indicator mapping
    is_A2 = "15804+A2" in base[col_index["Compliance"]]

    # Fill resource use and waste indicators from exchanges
    for exch in root.findall('.//{http://lca.jrc.it/ILCD/Process}exchange'):
        # Get flow short description and abbreviation in parentheses
        flow_node = exch.find('{http://lca.jrc.it/ILCD/Process}referenceToFlowDataSet')
        if flow_node is None:
            continue
        desc_node = flow_node.find('{http://lca.jrc.it/ILCD/Common}shortDescription')
        if desc_node is None or not desc_node.text:
            continue
        match = re.search(r'\(([^)]+)\)$', desc_node.text)
        if not match:
            continue
        abbrev = match.group(1)
        if abbrev in flow_map:
            # Assign this flow's values to each module
            for amt in exch.findall('.//{http://www.iai.kit.edu/EPD/2013}amount'):
                mod = amt.get('{http://www.iai.kit.edu/EPD/2013}module')
                if mod in module_values:  # module exists
                    # Find the corresponding row for this module
                    for row in rows:
                        if row[col_index["Module"]] == mod:
                            row[col_index[flow_map[abbrev]]] = amt.text or ""

    # Fill LCIA results (impact indicators) from LCIAResult entries
    for lcia in root.findall('.//{http://lca.jrc.it/ILCD/Process}LCIAResult'):
        desc = lcia.find('.//{http://lca.jrc.it/ILCD/Common}shortDescription')
        if desc is None or not desc.text:
            continue
        match = re.search(r'\(([^)]+)\)$', desc.text)
        if not match:
            continue
        code = match.group(1).strip()
        # Determine which mapping to use for this code
        if is_A2:
            # A2 standard: prefer new map, fall back to old if not in new
            if code in indicator_map_A2:
                col_name = indicator_map_A2[code]
            elif code in indicator_map_A1 and code not in indicator_map_A2:
                col_name = indicator_map_A1[code]
            else:
                # Try removing special chars (e.g., hyphens or slashes) to match keys
                code_clean = code.replace("-", "").replace("/", "")
                if code_clean in indicator_map_A2:
                    col_name = indicator_map_A2[code_clean]
                else:
                    continue
        else:
            # A1 standard: use old map
            if code in indicator_map_A1:
                col_name = indicator_map_A1[code]
            elif code in indicator_map_A2 and code not in indicator_map_A1:
                col_name = indicator_map_A2[code]  # (Just in case, e.g., if an A2 indicator appears in an A1 dataset)
            else:
                code_clean = code.replace("-", "").replace("/", "")
                if code_clean in indicator_map_A1:
                    col_name = indicator_map_A1[code_clean]
                else:
                    continue
        # Assign indicator values to each module row
        for amt in lcia.findall('.//{http://www.iai.kit.edu/EPD/2013}amount'):
            mod = amt.get('{http://www.iai.kit.edu/EPD/2013}module')
            if not mod:
                continue
            for row in rows:
                if row[col_index["Module"]] == mod:
                    row[col_index[col_name]] = amt.text or ""
    return rows

# Read the input file and write to CSV
with open(input_path, 'r', encoding='utf-8') as infile, open(output_path, 'w', newline='', encoding='utf-8') as outfile:
    writer = csv.writer(outfile, delimiter=';', quoting=csv.QUOTE_MINIMAL)
    writer.writerow(header)
    buffer = []  # to accumulate lines of one EPD
    
    epd_count = 0  # count of processed EPDs
    failed_count = 0  # count of failed EPDs
    log_path = 'epd_failures.log'
    log_lines = []  # to accumulate lines of failed EPDs
    line_num = 0  # line number in the input file
    epd_start_line = 0  # line number of the start of the current EPD
    for line in infile:
        line_num += 1
        if line.strip().startswith("<?xml"):
            if buffer:
                try:
                    epd_rows = parse_epd_xml("".join(buffer))
                    if epd_rows:
                        epd_count += 1
                except Exception as e:
                    failed_count += 1
                    buffer_str = "".join(buffer)
                    uuid_match = re.search(r"<UUID>(.*?)</UUID>", buffer_str)
                    uuid = uuid_match.group(1).strip() if uuid_match else "UNKNOWN"
                    log_lines.append(f"Failed EPD at line {epd_start_line}, UUID={uuid}: {str(e)}")
                for row in epd_rows:
                    writer.writerow(row)
            buffer = [line]
            epd_start_line = line_num
        else:
            if buffer:
                buffer.append(line)

    # Handle final buffer
    if buffer:
        try:
            epd_rows = parse_epd_xml("".join(buffer))
            if epd_rows:
                epd_count += 1
        except Exception as e:
            failed_count += 1
            buffer_str = "".join(buffer)
            uuid_match = re.search(r"<UUID>(.*?)</UUID>", buffer_str)
            uuid = uuid_match.group(1).strip() if uuid_match else "UNKNOWN"
            log_lines.append(f"Failed final EPD at line {epd_start_line}, UUID={uuid}: {str(e)}")
            epd_rows = []
        for row in epd_rows:
            writer.writerow(row)

 # Write failure log
if log_lines:
    with open(log_path, "w", encoding="utf-8") as log_file:
        log_file.write("\n".join(log_lines))

print(f"‚úÖ Finished! Total EPDs parsed and written to CSV: {epd_count}")
print(f"‚ùå Failed EPDs: {failed_count}")
if failed_count > 0:
    print(f"üìù Failure log written to: {log_path}")
print(f"üìÑ Output file saved as: {output_path}")   

    


‚úÖ Finished! Total EPDs parsed and written to CSV: 22398
‚ùå Failed EPDs: 1
üìù Failure log written to: epd_failures.log
üìÑ Output file saved as: output.csv
