In [None]:
import pandas as pd

excel_file = '/home/aakash/NIC/nic-metadata-cleaning/sample_catalog_metadata/NIC_metadata_curation_master.xlsx'
catalog_df = pd.read_csv('/home/aakash/NIC/nic-metadata-cleaning/sample_catalog_metadata/nic_sample_catalog.csv')
resource_df = pd.read_csv('/home/aakash/NIC/nic-metadata-cleaning/sample_datasets_metadata/nic_sample_dataset.csv')

print("Catalog metadata columns:", catalog_df.columns.tolist())
print("Resource metadata columns:", resource_df.columns.tolist())

print("\nFirst catalog entry:\n", catalog_df.iloc[0].to_dict())
print("\nFirst resource entry:\n", resource_df.iloc[0].to_dict())

KPI Evaluation Implementation

In [None]:
def evaluate_catalog_row(row):
    results={}
    required_fields = ['title', 'body:value', 'published_date','field_ministry_department:name', 'node_alias']
    all_fields_present = all(pd.notna(row[f]) and str(row[f]).strip() != "" for f in required_fields)
    results['FieldCoverage'] = 1 if all_fields_present else 0
    has_distribution = row['title'] in set(resource_df['catalog_title'])
    results['DCATCompliance'] = 1 if (all_fields_present and has_distribution) else 0
    controlled = True
    if pd.isna(row.get('field_asset_jurisdiction:name')) or str(row['field_asset_jurisdiction:name']).strip() == "":
        controlled = False
    if pd.isna(row.get('field_ds_govt_type')) or str(row['field_ds_govt_type']).strip() == "":
        controlled = False
    if pd.isna(row.get('field_ministry_department:name')) or str(row['field_ministry_department:name']).strip() == "":
        controlled = False
    if pd.isna(row.get('field_sector:name')) or str(row['field_sector:name']).strip() == "":
        controlled = False
    results['ControlledVocab'] = 1 if controlled else 0
    # 4. Jurisdiction-Level 
    jur = row.get('field_asset_jurisdiction:name')
    results['JurisdictionComplete'] = 1 if (pd.notna(jur) and str(jur).strip() != "") else 0
    # 5. Legislation Field Coverage

    results['LegislationCoverage'] = 0 # e.g., would be 1 if an 'applicableLegislation' field is not empty
    # # Check if there's at least one distribution with a valid link and format
    dataset_title = row['title']
    dist_entries = resource_df[resource_df['catalog_title'] == dataset_title]
    if len(dist_entries) > 0:
        valid_dist = False
        for _, res in dist_entries.iterrows():
            has_link = (pd.notna(res['datafile']) or pd.notna(res['datafile_url']))
            has_format = pd.notna(res['file_format']) and str(res['file_format']).strip() != ""
            if has_link and has_format:
                valid_dist = True
                break
            results['DistributionIntegrity'] = 1 if valid_dist else 0
    else:
        results['DistributionIntegrity'] = 0

    # 7. Multilingual Metadata Coverage (dataset-level)
    title_text = str(row['title']) if pd.notna(row['title']) else ""
    desc_text = str(row['body:value']) if pd.notna(row['body:value']) else ""
    def contains_non_ascii(text):
        return any(ord(ch) > 127 for ch in text)
    multilingual = contains_non_ascii(title_text) or contains_non_ascii(desc_text)
    results['Multilingual'] = 1 if multilingual else 0


    # Metadata-to-Data Link Validity 
    if len(dist_entries) > 0:
        all_links_ok = True
        for _, res in dist_entries.iterrows():
            # Check each distribution's link format
            url = None
            if pd.notna(res['datafile_url']):
                url = str(res['datafile_url'])
            elif pd.notna(res['datafile']):
                url = str(res['datafile'])
            if not (url and (url.startswith('http://') or
                             url.startswith('https://'))):
                all_links_ok = False
        results['LinkValidity'] = 1 if all_links_ok else 0
    else:
        results['LinkValidity'] = 0

    # 10. Metadata Coverage for Dublin Core (dataset-level)
    dc_fields = ['title', 'body:value', 'published_date',
                 'field_ministry_department:name', 'keywords']
    dc_fields_present = all(pd.notna(row[f]) and str(row[f]).strip() != "" for f in dc_fields)
    results['DublinCoreCoverage'] = 1 if dc_fields_present else 0
    # 11. Catalog Sector Field Accuracy
    sector_val = str(row.get('field_sector:name', "")).strip()
    # Mark accurate if sector is not empty and not a generic "All"
    if sector_val == "" or sector_val.lower() == "all":
        results['SectorAccuracy'] = 0
    else:
        results['SectorAccuracy'] = 1
    return results


In [None]:
def evaluate_resource_row(row):
    results = {}
    # 1. Metadata Field Coverage (resource-level)
    # Required: title, format, and a data link (datafile or datafile_url)
    has_title = pd.notna(row['title']) and str(row['title']).strip() != ""
    has_format = pd.notna(row['file_format']) and str(row['file_format']).strip() != ""
    has_data_link = pd.notna(row['datafile']) or pd.notna(row['datafile_url'])
    results['FieldCoverage'] = 1 if (has_title and has_format and has_data_link) else 0
    # 2. DCAT Compliance (resource-level)
    #DCAT compliance is exact matched to Dublin and DCAT seperate
    results['DCATCompliance'] = 1 if (has_title and has_format and has_data_link) else 0
    # 3. Controlled Vocabulary Usage (resource-level)
    controlled = True

    if not (pd.notna(row['file_format']) and "/" in str(row['file_format'])):
        controlled = False

    if pd.isna(row.get('resource_category')) or str(row['resource_category']).strip() == "":
        controlled = False
    results['ControlledVocab'] = 1 if controlled else 0
    # 4. Jurisdiction-Level Completeness (resource-level)
    jur_ok = True
    if pd.isna(row.get('govt_type')) or str(row['govt_type']).strip() == "":
        jur_ok = False
    elif str(row['govt_type']).strip().lower() == 'state':

        if pd.isna(row.get('state_department')) or str(row['state_department']).strip() == "":
            jur_ok = False
    results['JurisdictionComplete'] = 1 if jur_ok else 0
    # 5. Legislation Field Coverage (resource-level)
    results['LegislationCoverage'] = 0
    # 6. Dataset Distribution Integrity (resource-level)
    # For an individual resource, this means the resource itself is valid (has
    # link and format)
    results['DistributionIntegrity'] = 1 if (has_format and has_data_link) else 0

    # 7. Multilingual Metadata Coverage (resource-level)
    title_text = str(row['title']) if pd.notna(row['title']) else ""
    note_text = str(row['note']) if pd.notna(row['note']) else "" # 'note'
    # could be description of resource
    multilingual = any(ord(ch) > 127 for ch in title_text) or any(ord(ch) > 127 for ch in note_text)
    results['Multilingual'] = 1 if multilingual else 0
    # 8. Metadata Harvestability (resource-level)

        # 8. Harvestability
    api_flag = str(row.get('external_api_reference') or "").strip().lower()
    fmt = str(row.get('file_format') or "").strip().lower()

    if (api_flag and api_flag != 'nan') or ('json' in fmt):
        results['Harvestability'] = 1
    else:
        results['Harvestability'] = 0
    # 9. Metadata-to-Data Link Validity (resource-level)
    url = None
    if pd.notna(row['datafile_url']):
        url = str(row['datafile_url'])
    elif pd.notna(row['datafile']):
        url = str(row['datafile'])
    link_ok = url is not None and (url.startswith('http://') or url.startswith('https://'))
    results['LinkValidity'] = 1 if link_ok else 0
    # 10. Metadata Coverage for Dublin Core (resource-level)

    results['DublinCoreCoverage'] = 1 if (has_title and has_format) else 0
    # 11. Catalog Sector Field Accuracy (resource-level)
    sector_val = str(row.get('sector', "")).strip()
    if sector_val == "" or sector_val.lower() == "all":
        results['SectorAccuracy'] = 0
    else:
        results['SectorAccuracy'] = 1
        
    return results

In [None]:
#Apply the evaluation to each row of the DataFrames
catalog_scores = catalog_df.apply(evaluate_catalog_row, axis=1,result_type='expand')
resource_scores = resource_df.apply(evaluate_resource_row, axis=1,result_type='expand')
# Calculate cumulative score (sum of all KPI flags) for each entry
catalog_scores['TotalScore'] = catalog_scores.sum(axis=1)
resource_scores['TotalScore'] = resource_scores.sum(axis=1)

# Add an identifier column (like title) for clarity in the results
catalog_scores.insert(0, 'CatalogTitle', catalog_df['title'])
resource_scores.insert(0, 'ResourceTitle', resource_df['title'])



In [None]:
# Display first 5 catalog entries with their KPI scores
print("KPI scores for first 5 catalog:")
print(catalog_scores.head(5).to_string(index=False))
# Display first 5 resource entries with their KPI scores
print("\nKPI scores for first 5 resources:")
print(resource_scores.head(5).to_string(index=False))


In [None]:
# Calculate and print overall fulfillment rates for each KPI for catalogs
total_catalogs = len(catalog_scores)
print("Overall Catalog KPI Fulfillment Rates:")
for kpi in catalog_scores.columns:
    if kpi in ('CatalogTitle', 'DatasetTitle', 'TotalScore', 'Category'):
        continue
    # Only process numeric columns
    if pd.api.types.is_numeric_dtype(catalog_scores[kpi]):
        fulfilled = int(catalog_scores[kpi].sum())
        total     = len(catalog_scores)
        print(f"- {kpi}: {fulfilled}/{total} ({(fulfilled/total*100 if total else 0):.1f}%)")
# Calculate and print overall fulfillment rates for each KPI for resources
total_resources = len(resource_scores)
print("\nOverall Resource KPI Fulfillment Rates:")
for kpi in resource_scores.columns[2:-1]:

    fulfilled = resource_scores[kpi].sum()
    rate = fulfilled / total_resources * 100
    print(f"{kpi}: {fulfilled}/{total_resources} ({rate:.1f}%)")
# Print average total scores
avg_catalog_score = catalog_scores['TotalScore'].mean()
avg_resource_score = resource_scores['TotalScore'].mean()



In [None]:
import csv

def kpi_fulfillment_summary(df, level_name, exclude_cols=("DatasetTitle","ResourceTitle","TotalScore","Type","HTTP_Status")):
    summary = []
    kpi_cols = [c for c in df.columns if c not in exclude_cols]
    total = len(df)
    for kpi in kpi_cols:
        if total == 0:
            fulfilled, pct = 0, 0.0
        else:
            fulfilled = int(pd.to_numeric(df[kpi], errors="coerce").fillna(0).astype(int).sum())
            pct = (fulfilled / total) * 100
        summary.append([level_name, kpi, fulfilled, total, round(pct,1)])
    return summary

# Generate summaries
catalog_summary = kpi_fulfillment_summary(catalog_scores, "Catalog")
resource_summary = kpi_fulfillment_summary(resource_scores, "Resource")

# Write to CSV
with open("kpi_result/kpi_fulfillment_summary.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["Type", "KPI", "Fulfilled", "Total", "Percent"])
    writer.writerows(catalog_summary + resource_summary)

print("Wrote kpi_fulfillment_summary.csv")


In [None]:
import requests
from collections import Counter
import os
from datetime import datetime

# Function to check HTTP status of URLs
def check_url_status(url):
    try:
        response = requests.head(url, allow_redirects=True, timeout=10)
        return response.status_code
    except requests.RequestException as e:
        return f"Error: {e}"

# Initialize a counter for response codes
response_code_counter = Counter()

# Iterate through all links in 'datafile' and 'datafile_url'
for column in ['datafile', 'datafile_url']:
    for url in resource_df[column].dropna():
        status = check_url_status(url)
        response_code_counter[status] += 1

# Log the stats to a .log file
log_file = "link_stats.log"
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "w") as log:
    log.write(f"Link Statistics Log\n")
    log.write(f"Generated on: {now}\n\n")
    log.write("HTTP Response Code Counts:\n")
    for code, count in response_code_counter.items():
        log.write(f"- {code}: {count}\n")

print(f"Link statistics logged to {log_file}")

In [None]:

# import pandas as pd
# from pathlib import Path

# # Exact file name (as provided)
# MAPPING_XLSX = Path("/home/aakash/NIC/nic-metadata-cleaning/sample_catalog_metadata/NIC_metadata_curation_master.xlsx")

# # Exact sheet names (as in the workbook)
# SHEET_CAT  = "catalog_metadata_mapping_table"
# SHEET_DS   = "dataset_metadata_mapping_table"
# SHEET_DIST = "distribution_metadata_mapping_t"

# # Read exactly the sheets we need
# df_cat_map  = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_CAT)
# df_ds_map   = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_DS)
# df_dist_map = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_DIST)

# # Keep only rows with a defined Mapping Type (as in the sheets)
# for _df in (df_cat_map, df_ds_map, df_dist_map):
#     _df.dropna(subset=["Mapping Type"], inplace=True)
#     _df.reset_index(drop=True, inplace=True)

# def categorize_mapping(df_map: pd.DataFrame, nic_col: str):
#     """
#     Bucket NIC fields into exact / partial / unmapped using:
#     - 'Mapping Type' (exact, partial/closest, no match/no direct)
#     - NIC field column passed explicitly (no autodetection).
#     """
#     exact_fields, partial_fields, unmapped_fields = [], [], []

#     for _, row in df_map.iterrows():
#         mtype = str(row["Mapping Type"]).lower()
#         field_name = row[nic_col]

#         # Normalize the field name for empty/NaN
#         if field_name is None or (isinstance(field_name, float) and pd.isna(field_name)):
#             field_name = "(None)"  # standard field exists but not implemented in NIC yet
#         else:
#             field_name = str(field_name).strip()

#         if "exact" in mtype:
#             exact_fields.append(field_name)
#         elif "partial" in mtype or "closest" in mtype:
#             partial_fields.append(field_name)
#         elif "no match" in mtype or "no direct" in mtype:
#             unmapped_fields.append(field_name)

#     return exact_fields, partial_fields, unmapped_fields


# #Catalog sheet "Equivalent OGD Catalog Metadata"
# #Dataset sheet "Equivalent OGD Dataset Metadata"

# exact_cat,  partial_cat,  unmapped_cat  = categorize_mapping(df_cat_map,  "Equivalent OGD Catalog Metadata")
# exact_ds,   partial_ds,   unmapped_ds   = categorize_mapping(df_ds_map,   "Equivalent OGD Dataset Metadata")
# exact_dist, partial_dist, unmapped_dist = categorize_mapping(df_dist_map, "Equivalent OGD Dataset Metadata")

# # (Optional) Append a short summary to evaluation.log
# from pathlib import Path
# log_path = Path("evaluation.log")
# with log_path.open("a", encoding="utf-8") as log:
#     log.write("## Metadata Field Mapping to Standards (DCAT v3 and Dublin Core)\n\n")

#     log.write("### Catalog Metadata Fields Mapping\n")
#     log.write(f"- Exact Matches ({len(exact_cat)}): " + (", ".join(exact_cat) if exact_cat else "None") + "\n")
#     log.write(f"- Partial/Close Matches ({len(partial_cat)}): " + (", ".join(partial_cat) if partial_cat else "None") + "\n")
#     log.write(f"- Problematic/Unmapped ({len(unmapped_cat)}): " + (", ".join(unmapped_cat) if unmapped_cat else "None") + "\n\n")

#     log.write("### Dataset Metadata Fields Mapping\n")
#     log.write(f"- Exact Matches ({len(exact_ds)}): " + (", ".join(exact_ds) if exact_ds else "None") + "\n")
#     log.write(f"- Partial/Close Matches ({len(partial_ds)}): " + (", ".join(partial_ds) if partial_ds else "None") + "\n")
#     log.write(f"- Problematic/Unmapped ({len(unmapped_ds)}): " + (", ".join(unmapped_ds) if unmapped_ds else "None") + "\n\n")
# # 
#     log.write("### Distribution Metadata Fields Mapping\n")
#     log.write(f"- Exact Matches ({len(exact_dist)}): " + (", ".join(exact_dist) if exact_dist else "None") + "\n")
#     log.write(f"- Partial/Close Matches ({len(partial_dist)}): " + (", ".join(partial_dist) if partial_dist else "None") + "\n")
#     log.write(f"- Problematic/Unmapped ({len(unmapped_dist)}): " + (", ".join(unmapped_dist) if unmapped_dist else "None") + "\n\n")

# print("Buckets ready: exact_*, partial_*, unmapped_*; summary appended to evaluation.log")


In [None]:

import pandas as pd
from pathlib import Path

# Exact file/sheet/column names as in your workbook
MAPPING_XLSX = Path("/home/aakash/NIC/nic-metadata-cleaning/sample_catalog_metadata/NIC_metadata_curation_master.xlsx")
# if not MAPPING_XLSX.exists():
#     raise FileNotFoundError("NIC_metadata_curation_master (1).xlsx not found in working directory.")

SHEET_CAT  = "catalog_metadata_mapping_table"
SHEET_DS   = "dataset_metadata_mapping_table"
SHEET_DIST = "distribution_metadata_mapping_table"

# Read mapping tables (reuse existing variables if already loaded)
df_cat_map  = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_CAT)
df_ds_map   = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_DS)
df_dist_map = pd.read_excel(MAPPING_XLSX, sheet_name=SHEET_DIST)

# Keep only rows with a Mapping Type recorded
for _df in (df_cat_map, df_ds_map, df_dist_map):
    _df.dropna(subset=["Mapping Type"], inplace=True)
    _df.reset_index(drop=True, inplace=True)

# Fixed NIC column names per sheet
CAT_NIC_COL   = "Equivalent OGD Catalog Metadata"
DS_NIC_COL    = "Equivalent OGD Dataset Metadata"
DIST_NIC_COL  = "Equivalent OGD Dataset Metadata"  # as given

# Fixed Standard columns per sheet
CAT_DCAT_COL     = "DCAT v3 Mapping"
CAT_DUBLIN_COL   = "Dublin Core Element"

DS_DCAT_COL      = "DCAT v3 Mapping"
DS_DUBLIN_COL    = "Dublin Core Element"  # dataset sheet's Dublin Core column label in your file

DIST_DCAT_COL    = "DCAT v3 Mapping"
DIST_DUBLIN_COL  = "Dubline core element"  # spelling as in sheet

def _is_missing(val) -> bool:
    return (val is None) or (isinstance(val, float) and pd.isna(val)) or (isinstance(val, str) and val.strip() == "")

def proposed_fields(df_map: pd.DataFrame, nic_col: str, std_col: str,
                    level_label: str, standard_label: str):
    """
    Return:
      - proposed_list: sorted unique list of standard fields present in std_col
        where NIC field is missing/None in nic_col.
      - df_proposed: tidy DataFrame for export/logging.
    """
    # Hard fail if columns aren't present (no guessing)
    for c in (nic_col, std_col, "Mapping Type"):
        if c not in df_map.columns:
            raise KeyError(f"Expected column '{c}' not found in {level_label} mapping sheet.")

    rows = []
    for _, r in df_map.iterrows():
        nic_val = r[nic_col]
        std_val = r[std_col]

        if _is_missing(nic_val) and not _is_missing(std_val):
            rows.append({
                "Metadata Level": level_label,
                "Standard": standard_label,
                "Standard Field": str(std_val).strip(),
                "OGD/NIC Equivalent": "(None)",  
                "Mapping Type": str(r["Mapping Type"]).strip(),
                "Notes/Action": r.get("Action for NIC", "") if pd.notna(r.get("Action for NIC", "")) else ""
            })

    if rows:
        df_proposed = pd.DataFrame(rows)
        proposed_list = sorted(df_proposed["Standard Field"].unique())
    else:
        df_proposed = pd.DataFrame(columns=[
            "Metadata Level","Standard","Standard Field","OGD/NIC Equivalent","Mapping Type","Notes/Action"
        ])
        proposed_list = []

    return proposed_list, df_proposed

# Compute per-level proposed fields for DCAT v3
cat_dcat_list,  cat_dcat_df  = proposed_fields(df_cat_map,  CAT_NIC_COL,  CAT_DCAT_COL,   "Catalog",      "DCAT v3")
ds_dcat_list,   ds_dcat_df   = proposed_fields(df_ds_map,   DS_NIC_COL,   DS_DCAT_COL,    "Dataset",      "DCAT v3")
dist_dcat_list, dist_dcat_df = proposed_fields(df_dist_map, DIST_NIC_COL, DIST_DCAT_COL,  "Distribution", "DCAT v3")

# Compute per-level proposed fields for Dublin Core
cat_dc_list,  cat_dc_df  = proposed_fields(df_cat_map,  CAT_NIC_COL,  CAT_DUBLIN_COL,   "Catalog",      "Dublin Core")
ds_dc_list,   ds_dc_df   = proposed_fields(df_ds_map,   DS_NIC_COL,   DS_DUBLIN_COL,    "Dataset",      "Dublin Core")
dist_dc_list, dist_dc_df = proposed_fields(df_dist_map, DIST_NIC_COL, DIST_DUBLIN_COL,  "Distribution", "Dublin Core")

# Stack tidy outputs for each standard
df_proposed_dcat   = pd.concat([cat_dcat_df, ds_dcat_df, dist_dcat_df], ignore_index=True)
df_proposed_dublin = pd.concat([cat_dc_df,   ds_dc_df,   dist_dc_df],   ignore_index=True)


log_path = Path("evaluation.log")
with log_path.open("a", encoding="utf-8") as log:
    log.write("### Proposed New Fields (present in DCAT/Dublin, missing in OGD)\n\n")

    # DCAT v3
    log.write("#### DCAT v3\n")
    log.write(f"- Catalog ({len(cat_dcat_list)}): " + (", ".join(cat_dcat_list) if cat_dcat_list else "None") + "\n")
    log.write(f"- Dataset ({len(ds_dcat_list)}): " + (", ".join(ds_dcat_list) if ds_dcat_list else "None") + "\n")
    log.write(f"- Distribution ({len(dist_dcat_list)}): " + (", ".join(dist_dcat_list) if dist_dcat_list else "None") + "\n\n")

    # Dublin Core
    log.write("#### Dublin Core\n")
    log.write(f"- Catalog ({len(cat_dc_list)}): " + (", ".join(cat_dc_list) if cat_dc_list else "None") + "\n")
    log.write(f"- Dataset ({len(ds_dc_list)}): " + (", ".join(ds_dc_list) if ds_dc_list else "None") + "\n")
    log.write(f"- Distribution ({len(dist_dc_list)}): " + (", ".join(dist_dc_list) if dist_dc_list else "None") + "\n\n")

print("Prepared proposed-field outputs: df_proposed_dcat and df_proposed_dublin; summary appended to evaluation.log")


In [None]:

import pandas as pd
import matplotlib.pyplot as plt

# Reuse NIC column names (define if missing, without touching files)
if "CAT_NIC_COL" not in locals():  CAT_NIC_COL  = "Equivalent OGD Catalog Metadata"
if "DS_NIC_COL"  not in locals():  DS_NIC_COL   = "Equivalent OGD Dataset Metadata"
if "DIST_NIC_COL" not in locals(): DIST_NIC_COL = "Equivalent OGD Dataset Metadata"


def _is_missing(v):
    if isinstance(v, str):
        return v.strip() == ""
    return pd.isna(v)
def _proposed_list(df_prop: pd.DataFrame, level: str) -> list[str]:
    if df_prop is None or df_prop.empty:
        return []
    return sorted(df_prop.loc[df_prop["Metadata Level"] == level, "Standard Field"].dropna().astype(str).str.strip().unique())

# If the explicit *_list variables exist (from earlier), keep them; else derive from DataFrames
if "cat_dcat_list" not in locals():
    cat_dcat_list  = _proposed_list(df_proposed_dcat,   "Catalog")
    ds_dcat_list   = _proposed_list(df_proposed_dcat,   "Dataset")
    dist_dcat_list = _proposed_list(df_proposed_dcat,   "Distribution")

if "cat_dc_list" not in locals():
    cat_dc_list  = _proposed_list(df_proposed_dublin, "Catalog")
    ds_dc_list   = _proposed_list(df_proposed_dublin, "Dataset")
    dist_dc_list = _proposed_list(df_proposed_dublin, "Distribution")


def _counts_from_map(df_map: pd.DataFrame, nic_col: str) -> dict[str, int]:
    m = df_map["Mapping Type"].astype(str).str.lower()
    exact    = m.str.contains("exact", na=False)
    partial  = m.str.contains("partial|closest", na=False)
    unmapped = m.str.contains("no match|no direct", na=False)
    return {
        "exact":    int(exact.sum()),
        "partial":  int(partial.sum()),
        "unmapped": int(unmapped.sum()),
        "total":    int((exact | partial | unmapped).sum())
    }

buckets = {
    "Catalog":      _counts_from_map(df_cat_map,  CAT_NIC_COL),
    "Dataset":      _counts_from_map(df_ds_map,   DS_NIC_COL),
    "Distribution": _counts_from_map(df_dist_map, DIST_NIC_COL),
}
df_counts = pd.DataFrame(buckets).T[["exact","partial","unmapped","total"]].astype(int)

# Coverage %
df_pct = df_counts.copy()
for col in ["exact","partial","unmapped"]:
    df_pct[col] = (df_counts[col] / df_counts["total"]).where(df_counts["total"]>0, 0.0) * 100.0
df_pct = df_pct[["exact","partial","unmapped"]]

# Transformation Effort Score (Exact=0, Partial=1, Unmapped=3)
weights = {"exact": 0, "partial": 1, "unmapped": 3}
df_tes = (
    df_counts[["exact","partial","unmapped"]]
    .mul(pd.Series(weights))
    .sum(axis=1)
    .div(df_counts["total"].replace({0:1}))
    .rename("TES")
    .reset_index()
    .rename(columns={"index":"Level"})
)

# Proposed fields counts per standard
levels = ["Catalog","Dataset","Distribution"]
df_proposed_counts = pd.DataFrame({
    "Level": levels,
    "DCAT v3": [len(cat_dcat_list), len(ds_dcat_list), len(dist_dcat_list)],
    "Dublin Core": [len(cat_dc_list), len(ds_dc_list), len(dist_dc_list)],
})

# DCAT↔Dublin overlap & Jaccard
def _jacc(a, b):
    A, B = set(a), set(b)
    inter = len(A & B)
    union = len(A | B) or 1
    return inter, union, inter/union

over_cat  = _jacc(cat_dcat_list,  cat_dc_list)
over_ds   = _jacc(ds_dcat_list,   ds_dc_list)
over_dist = _jacc(dist_dcat_list, dist_dc_list)
df_overlap = pd.DataFrame({
    "Level": levels,
    "Intersection": [over_cat[0], over_ds[0], over_dist[0]],
    "Union":        [over_cat[1], over_ds[1], over_dist[1]],
    "Jaccard":      [over_cat[2], over_ds[2], over_dist[2]],
})

# Duplication diagnostics (same NIC field mapped >1 time)
def _dup_counts(df_map, nic_col):
    s = pd.Series(df_map[nic_col])
    s = s[~s.map(_is_missing)]
    vc = s.value_counts()
    repeated = vc[vc > 1]
    return repeated.sum(), repeated.shape[0]

dup_cat_total, dup_cat_unique   = _dup_counts(df_cat_map,  CAT_NIC_COL)
dup_ds_total,  dup_ds_unique    = _dup_counts(df_ds_map,   DS_NIC_COL)
dup_dist_total, dup_dist_unique = _dup_counts(df_dist_map, DIST_NIC_COL)

df_dup = pd.DataFrame({
    "Level": levels,
    "Repeated Mappings (rows)": [dup_cat_total, dup_ds_total, dup_dist_total],
    "Fields Reused (unique)":   [dup_cat_unique, dup_ds_unique, dup_dist_unique],
})
# 1) Mapping counts
plt.figure()
df_counts[["exact","partial","unmapped"]].plot(kind="bar")
plt.title("Mapping Counts by Level (Exact / Partial / Unmapped)")
plt.xlabel("Metadata Level")
plt.ylabel("Count")
plt.legend(title="Category")
plt.tight_layout()
plt.show()

# 2) Coverage %
plt.figure()
df_pct.plot(kind="bar")
plt.title("Coverage % by Level (Exact / Partial / Unmapped)")
plt.xlabel("Metadata Level")
plt.ylabel("Percentage (%)")
plt.legend(title="Category")
plt.tight_layout()
plt.show()

# 3) Transformation Effort Score
plt.figure()
plt.bar(df_tes["Level"], df_tes["TES"])
plt.title("Transformation Effort Score (TES) by Level")
plt.xlabel("Metadata Level")
plt.ylabel("TES (0=easy → higher=harder)")
plt.tight_layout()
plt.show()

# 4) Proposed fields: DCAT v3 vs Dublin Core
plt.figure()
x = range(len(df_proposed_counts["Level"]))
w = 0.35
plt.bar([i - w/2 for i in x], df_proposed_counts["DCAT v3"], w, label="DCAT v3")
plt.bar([i + w/2 for i in x], df_proposed_counts["Dublin Core"], w, label="Dublin Core")
plt.title("Proposed New Fields by Level")
plt.xlabel("Metadata Level")
plt.ylabel("Count")
plt.xticks(list(x), df_proposed_counts["Level"])
plt.legend()
plt.tight_layout()
plt.show()

# 5) DCAT↔Dublin Jaccard
plt.figure()
plt.bar(df_overlap["Level"], df_overlap["Jaccard"])
plt.title("Proposal Similarity (DCAT vs Dublin) by Level")
plt.xlabel("Metadata Level")
plt.ylabel("Jaccard Index (0–1)")
plt.tight_layout()
plt.show()

# 6) Duplication diagnostics
plt.figure()
plt.bar(df_dup["Level"], df_dup["Fields Reused (unique)"])
plt.title("NIC Field Reuse (Unique Fields Appearing >1x)")
plt.xlabel("Metadata Level")
plt.ylabel("Count of Reused NIC Fields")
plt.tight_layout()
plt.show()

# ----------------------- SAVE STATS & APPEND TO LOG -----------------------
df_counts.to_csv("mapping_counts_by_level.csv", index=True)
df_pct.to_csv("mapping_coverage_percent.csv", index=True)
df_tes.to_csv("transformation_effort_score.csv", index=False)
df_proposed_counts.to_csv("proposed_fields_counts.csv", index=False)
df_overlap.to_csv("proposed_fields_overlap.csv", index=False)
df_dup.to_csv("nic_field_duplication_stats.csv", index=False)

with log_path.open("a", encoding="utf-8") as log:
    log.write("### Transformation Stats & Visuals Summary\n\n")
    for lvl, row in df_counts.iterrows():
        log.write(f"- {lvl}: Exact={row['exact']}, Partial={row['partial']}, Unmapped={row['unmapped']}, Total={row['total']}\n")
    log.write("\n")
    for _, r in df_tes.iterrows():
        log.write(f"- TES {r['Level']}: {r['TES']:.2f}\n")
    log.write("\nSaved CSVs: mapping_counts_by_level.csv, mapping_coverage_percent.csv, transformation_effort_score.csv, "
              "proposed_fields_counts.csv, proposed_fields_overlap.csv, nic_field_duplication_stats.csv\n\n")

print("Visuals rendered. Stats saved and summary appended to evaluation.log.")
