In [5]:
# AI/ML Data Format Alignment Pipeline for Jupyter
# Author: You
# Purpose: Map data2 into data1's schema and export outputdata as CSV or Excel

import io
import os
import re
import difflib
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any, Optional

try:
    import ipywidgets as widgets
    from IPython.display import display
    IPYW_AVAILABLE = True
except Exception:
    IPYW_AVAILABLE = False

# -------------------------
# Helpers
# -------------------------

def _normalize_colname(s: str) -> str:
    if s is None:
        return ""
    s = s.strip().lower()
    s = re.sub(r"[\s\-_]+", " ", s)
    s = re.sub(r"[^a-z0-9 %]", "", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _infer_dtype_series(s: pd.Series) -> str:
    # Try boolean
    unique_lower = set(str(x).strip().lower() for x in s.dropna().unique()[:100])
    bool_set = {"true","false","yes","no","y","n","0","1"}
    if unique_lower and unique_lower.issubset(bool_set):
        return "boolean"

    # Try datetime
    try:
        pd.to_datetime(s.dropna().sample(min(len(s.dropna()), 200), random_state=0), errors="raise", infer_datetime_format=True)
        return "datetime"
    except Exception:
        pass

    # Try integer
    try:
        x = pd.to_numeric(s.dropna(), errors="raise")
        if np.all(np.mod(x, 1) == 0):
            return "integer"
        return "float"
    except Exception:
        pass

    return "string"

def infer_schema(df: pd.DataFrame) -> Dict[str, str]:
    schema = {}
    for c in df.columns:
        schema[c] = _infer_dtype_series(df[c])
    return schema

def _coerce_to_dtype(series: pd.Series, target: str) -> Tuple[pd.Series, float]:
    s = series.copy()
    success_ratio = 1.0

    if target == "string":
        s = s.astype(str)
        return s, success_ratio

    if target == "boolean":
        mapping = {
            "true": True, "false": False,
            "yes": True, "no": False,
            "y": True, "n": False,
            "1": True, "0": False,
            "True": True, "False": False
        }
        original = s.copy()
        s = s.astype(str).str.strip()
        s = s.map(lambda x: mapping.get(x, np.nan))
        success_ratio = 1.0 - (s.isna() & ~original.isna()).mean()
        return s, success_ratio

    if target in ("integer", "float"):
        coerced = pd.to_numeric(s, errors="coerce")
        success_ratio = 1.0 - coerced.isna().mean()
        if target == "integer":
            # keep as Int64 to allow NA
            s = coerced.round().astype("Int64")
        else:
            s = coerced.astype(float)
        return s, success_ratio

    if target == "datetime":
        coerced = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
        success_ratio = 1.0 - coerced.isna().mean()
        return coerced, success_ratio

    # default
    return s, 1.0

def best_column_match(target_col: str, candidates: List[str], threshold: float = 0.75) -> Optional[str]:
    # exact normalized match first
    norm_target = _normalize_colname(target_col)
    norm_map = {_normalize_colname(c): c for c in candidates}
    if norm_target in norm_map:
        return norm_map[norm_target]
    # difflib similarity
    best = difflib.get_close_matches(norm_target, list(norm_map.keys()), n=1, cutoff=threshold)
    if best:
        return norm_map[best[0]]
    return None

def build_column_mapping(
    data1_cols: List[str],
    data2_cols: List[str],
    cutoff: float = 0.75
) -> Dict[str, Optional[str]]:
    mapping = {}
    unmatched_data2 = set(data2_cols)
    # first pass: exact normalized matches
    norm_to_data2 = {_normalize_colname(c): c for c in data2_cols}
    for c in data1_cols:
        norm = _normalize_colname(c)
        if norm in norm_to_data2:
            mapping[c] = norm_to_data2[norm]
            unmatched_data2.discard(norm_to_data2[norm])
        else:
            mapping[c] = None
    # second pass: fuzzy for those still None
    for c in data1_cols:
        if mapping[c] is None:
            match = best_column_match(c, list(unmatched_data2), threshold=cutoff)
            if match is not None:
                mapping[c] = match
                unmatched_data2.discard(match)
    return mapping

def align_to_schema(
    df_data2: pd.DataFrame,
    schema_data1: Dict[str, str],
    mapping: Dict[str, Optional[str]],
    fill_missing_with: Any = np.nan
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    report = {
        "matched_columns": [],
        "unmatched_data1_columns": [],
        "unmapped_data2_columns": [],
        "type_coercion_success": {},
    }

    # Track unmapped data2
    mapped_targets = set([v for v in mapping.values() if v is not None])
    report["unmapped_data2_columns"] = [c for c in df_data2.columns if c not in mapped_targets]

    aligned = pd.DataFrame()
    for col_data1, dtype in schema_data1.items():
        source_col = mapping.get(col_data1, None)
        if source_col is not None and source_col in df_data2.columns:
            series = df_data2[source_col]
            # basic string cleanup if target string
            if dtype == "string":
                series = series.astype(str).str.strip()
            coerced, success = _coerce_to_dtype(series, dtype)
            aligned[col_data1] = coerced
            report["matched_columns"].append((col_data1, source_col, round(success, 4)))
            report["type_coercion_success"][col_data1] = round(success, 4)
        else:
            aligned[col_data1] = fill_missing_with
            report["unmatched_data1_columns"].append(col_data1)

    # reorder columns to data1 order
    aligned = aligned[list(schema_data1.keys())]
    return aligned, report

def print_report(schema: Dict[str, str], mapping: Dict[str, Optional[str]], report: Dict[str, Any]) -> None:
    print("Data1 schema")
    print(pd.DataFrame(list(schema.items()), columns=["column", "dtype"]).to_string(index=False))
    print("\nColumn mapping data1 -> data2")
    mapping_rows = []
    for k, v in mapping.items():
        mapping_rows.append((k, v if v is not None else "None"))
    print(pd.DataFrame(mapping_rows, columns=["data1_column", "data2_source"]).to_string(index=False))

    print("\nMatched columns with coercion success")
    if report["matched_columns"]:
        print(pd.DataFrame(report["matched_columns"], columns=["data1_column", "data2_source", "success"]).to_string(index=False))
    else:
        print("None")

    print("\nUnmatched data1 columns")
    print(report["unmatched_data1_columns"] if report["unmatched_data1_columns"] else "None")

    print("\nUnmapped data2 columns")
    print(report["unmapped_data2_columns"] if report["unmapped_data2_columns"] else "None")

    if report.get("type_coercion_success"):
        successes = list(report["type_coercion_success"].values())
        overall = round(float(np.mean(successes)), 4) if successes else 0.0
        print(f"\nApproximate overall type coercion success: {overall}")

def _read_any(file_bytes: bytes, filename: str) -> pd.DataFrame:
    name = filename.lower()
    if name.endswith(".csv"):
        return pd.read_csv(io.BytesIO(file_bytes))
    if name.endswith(".xlsx") or name.endswith(".xls"):
        return pd.read_excel(io.BytesIO(file_bytes))
    raise ValueError("Unsupported file format. Use .csv or .xlsx")

def _read_path(path: str) -> pd.DataFrame:
    if path.lower().endswith(".csv"):
        return pd.read_csv(path)
    if path.lower().endswith((".xlsx", ".xls")):
        return pd.read_excel(path)
    raise ValueError("Unsupported file format. Use .csv or .xlsx")

def _write_output(df: pd.DataFrame, out_path: str) -> str:
    if out_path.lower().endswith(".csv"):
        df.to_csv(out_path, index=False)
    elif out_path.lower().endswith((".xlsx", ".xls")):
        df.to_excel(out_path, index=False)
    else:
        raise ValueError("Output file must be .csv or .xlsx")
    return os.path.abspath(out_path)

# -------------------------
# UI: either widgets upload or direct paths
# -------------------------

print("If you see upload widgets below, use them. Otherwise, set file paths in the fallback section.")

upload_data1 = None
upload_data2 = None
out_format = None

if IPYW_AVAILABLE:
    fu1 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data1")
    fu2 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data2")
    fmt = widgets.Dropdown(options=["CSV", "Excel"], value="CSV", description="Output")
    run_btn = widgets.Button(description="Run Pipeline", button_style="primary")
    out_area = widgets.Output()

    def on_run_clicked(_):
        with out_area:
            out_area.clear_output()
            if len(fu1.value) == 0 or len(fu2.value) == 0:
                print("Please upload both files.")
                return
            # read
            key1 = list(fu1.value.keys())[0]
            key2 = list(fu2.value.keys())[0]
            f1 = fu1.value[key1]
            f2 = fu2.value[key2]
            df1 = _read_any(f1["content"], key1)
            df2 = _read_any(f2["content"], key2)

            print("Files loaded")
            print(f"data1 shape: {df1.shape}")
            print(f"data2 shape: {df2.shape}")

            schema = infer_schema(df1)
            mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
            aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)

            print_report(schema, mapping, rep)

            # choose output path in working dir
            if fmt.value == "CSV":
                out_path = "outputdata.csv"
            else:
                out_path = "outputdata.xlsx"

            abs_path = _write_output(aligned, out_path)
            print(f"\nOutput written to: {abs_path}")

    run_btn.on_click(on_run_clicked)

    display(fu1, fu2, fmt, run_btn, out_area)

# -------------------------
# Fallback: use local file paths if widgets are not available
# -------------------------

# Uncomment and set these paths if running in a non-widget environment:
# data1_path = "path/to/your/data1.xlsx"  # or .csv
# data2_path = "path/to/your/data2.csv"   # or .xlsx
# output_path = "outputdata.csv"          # or .xlsx

# If you use fallback, run this block after setting paths above:
# df1 = _read_path(data1_path)
# df2 = _read_path(data2_path)
# schema = infer_schema(df1)
# mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
# aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)
# print_report(schema, mapping, rep)
# abs_path = _write_output(aligned, output_path)
# print(f"\nOutput written to: {abs_path}")

If you see upload widgets below, use them. Otherwise, set file paths in the fallback section.


FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data1')

FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data2')

Dropdown(description='Output', options=('CSV', 'Excel'), value='CSV')

Button(button_style='primary', description='Run Pipeline', style=ButtonStyle())

Output()

In [6]:
import pandas as pd
import difflib
import numpy as np

# Load files
data1 = pd.read_excel("format.xlsx")   # or pd.read_csv("data1.csv")
data2 = pd.read_excel("Infor_file.xlsx")   # or pd.read_csv("data2.csv")

# Use only column names from data1 (schema)
schema_columns = list(data1.columns)

# Prepare an empty DataFrame with schema
output_df = pd.DataFrame(columns=schema_columns)

# Try to map each schema column to the closest match in data2
column_mapping = {}
for col in schema_columns:
    best_match = difflib.get_close_matches(col, data2.columns, n=1, cutoff=0.6)
    if best_match:
        column_mapping[col] = best_match[0]
    else:
        column_mapping[col] = None

print("Column mapping (data1 -> data2):")
print(column_mapping)

# Fill the output_df according to mapping
for col in schema_columns:
    if column_mapping[col] is not None:
        output_df[col] = data2[column_mapping[col]]
    else:
        output_df[col] = np.nan  # blank if not found

# Save output file
output_df.to_excel("outputdata.xlsx", index=False)
print("Output file saved as outputdata.xlsx")

Column mapping (data1 -> data2):
{'Student_Name': 'Student_Name', 'Student_Surename': 'Student_Surename', 'Student_Category': 'Student_Category', 'Student_Address': 'Student_Address', 'Student_Batch': 'Student_Batch', 'Student_Specilization': 'Student_Specilization', 'Student_Cell_No': 'Student_Cell_No', 'Stu_EmailID': 'Stu_EmailID'}
Output file saved as outputdata.xlsx


In [10]:
# AI/ML Data Format Alignment Pipeline for Jupyter
# Author: You
# Purpose: User uploads data1 (schema) and data2 (content), program maps data2 into data1's schema
#          and generates outputdata.csv or outputdata.xlsx

import io
import os
import re
import difflib
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any, Optional

try:
    import ipywidgets as widgets
    from IPython.display import display
    IPYW_AVAILABLE = True
except Exception:
    IPYW_AVAILABLE = False

# -------------------------
# Helpers
# -------------------------

def _normalize_colname(s: str) -> str:
    if s is None:
        return ""
    s = s.strip().lower()
    s = re.sub(r"[\s\-_]+", " ", s)
    s = re.sub(r"[^a-z0-9 %]", "", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _infer_dtype_series(s: pd.Series) -> str:
    unique_lower = set(str(x).strip().lower() for x in s.dropna().unique()[:100])
    bool_set = {"true","false","yes","no","y","n","0","1"}
    if unique_lower and unique_lower.issubset(bool_set):
        return "boolean"
    try:
        pd.to_datetime(s.dropna().sample(min(len(s.dropna()), 200), random_state=0), errors="raise", infer_datetime_format=True)
        return "datetime"
    except Exception:
        pass
    try:
        x = pd.to_numeric(s.dropna(), errors="raise")
        if np.all(np.mod(x, 1) == 0):
            return "integer"
        return "float"
    except Exception:
        pass
    return "string"

def infer_schema(df: pd.DataFrame) -> Dict[str, str]:
    return {c: _infer_dtype_series(df[c]) for c in df.columns}

def _coerce_to_dtype(series: pd.Series, target: str) -> Tuple[pd.Series, float]:
    s = series.copy()
    success_ratio = 1.0
    if target == "string":
        return s.astype(str), success_ratio
    if target == "boolean":
        mapping = {"true": True, "false": False,"yes": True, "no": False,"y": True, "n": False,"1": True, "0": False}
        original = s.copy()
        s = s.astype(str).str.strip()
        s = s.map(lambda x: mapping.get(x, np.nan))
        success_ratio = 1.0 - (s.isna() & ~original.isna()).mean()
        return s, success_ratio
    if target in ("integer", "float"):
        coerced = pd.to_numeric(s, errors="coerce")
        success_ratio = 1.0 - coerced.isna().mean()
        if target == "integer":
            s = coerced.round().astype("Int64")
        else:
            s = coerced.astype(float)
        return s, success_ratio
    if target == "datetime":
        coerced = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
        success_ratio = 1.0 - coerced.isna().mean()
        return coerced, success_ratio
    return s, 1.0

def best_column_match(target_col: str, candidates: List[str], threshold: float = 0.75) -> Optional[str]:
    norm_target = _normalize_colname(target_col)
    norm_map = {_normalize_colname(c): c for c in candidates}
    if norm_target in norm_map:
        return norm_map[norm_target]
    best = difflib.get_close_matches(norm_target, list(norm_map.keys()), n=1, cutoff=threshold)
    if best:
        return norm_map[best[0]]
    return None

def build_column_mapping(data1_cols: List[str], data2_cols: List[str], cutoff: float = 0.75) -> Dict[str, Optional[str]]:
    mapping = {}
    unmatched_data2 = set(data2_cols)
    norm_to_data2 = {_normalize_colname(c): c for c in data2_cols}
    for c in data1_cols:
        norm = _normalize_colname(c)
        if norm in norm_to_data2:
            mapping[c] = norm_to_data2[norm]
            unmatched_data2.discard(norm_to_data2[norm])
        else:
            mapping[c] = None
    for c in data1_cols:
        if mapping[c] is None:
            match = best_column_match(c, list(unmatched_data2), threshold=cutoff)
            if match is not None:
                mapping[c] = match
                unmatched_data2.discard(match)
    return mapping

def align_to_schema(df_data2: pd.DataFrame, schema_data1: Dict[str, str], mapping: Dict[str, Optional[str]], fill_missing_with: Any = np.nan) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    report = {"matched_columns": [],"unmatched_data1_columns": [],"unmapped_data2_columns": [],"type_coercion_success": {}}
    mapped_targets = set([v for v in mapping.values() if v is not None])
    report["unmapped_data2_columns"] = [c for c in df_data2.columns if c not in mapped_targets]
    aligned = pd.DataFrame()
    for col_data1, dtype in schema_data1.items():
        source_col = mapping.get(col_data1, None)
        if source_col is not None and source_col in df_data2.columns:
            series = df_data2[source_col]
            if dtype == "string":
                series = series.astype(str).str.strip()
            coerced, success = _coerce_to_dtype(series, dtype)
            aligned[col_data1] = coerced
            report["matched_columns"].append((col_data1, source_col, round(success, 4)))
            report["type_coercion_success"][col_data1] = round(success, 4)
        else:
            aligned[col_data1] = fill_missing_with
            report["unmatched_data1_columns"].append(col_data1)
    aligned = aligned[list(schema_data1.keys())]
    return aligned, report

def print_report(schema: Dict[str, str], mapping: Dict[str, Optional[str]], report: Dict[str, Any]) -> None:
    print("Data1 schema")
    print(pd.DataFrame(list(schema.items()), columns=["column", "dtype"]).to_string(index=False))
    print("\nColumn mapping data1 -> data2")
    print(pd.DataFrame([(k, v if v is not None else "None") for k,v in mapping.items()], columns=["data1_column","data2_source"]).to_string(index=False))
    print("\nMatched columns with coercion success")
    if report["matched_columns"]:
        print(pd.DataFrame(report["matched_columns"], columns=["data1_column","data2_source","success"]).to_string(index=False))
    else:
        print("None")
    print("\nUnmatched data1 columns")
    print(report["unmatched_data1_columns"] if report["unmatched_data1_columns"] else "None")
    print("\nUnmapped data2 columns")
    print(report["unmapped_data2_columns"] if report["unmapped_data2_columns"] else "None")
    if report.get("type_coercion_success"):
        successes = list(report["type_coercion_success"].values())
        overall = round(float(np.mean(successes)), 4) if successes else 0.0
        print(f"\nApproximate overall type coercion success: {overall}")

def _read_any(file_bytes: bytes, filename: str) -> pd.DataFrame:
    if filename.lower().endswith(".csv"):
        return pd.read_csv(io.BytesIO(file_bytes))
    if filename.lower().endswith((".xlsx", ".xls")):
        return pd.read_excel(io.BytesIO(file_bytes))
    raise ValueError("Unsupported file format. Use .csv or .xlsx")

def _write_output(df: pd.DataFrame, out_path: str) -> str:
    if out_path.lower().endswith(".csv"):
        df.to_csv(out_path, index=False)
    elif out_path.lower().endswith((".xlsx", ".xls")):
        df.to_excel(out_path, index=False)
    else:
        raise ValueError("Output file must be .csv or .xlsx")
    return os.path.abspath(out_path)

# -------------------------
# Jupyter UI
# -------------------------

print("If you see upload widgets below, use them. Otherwise, set file paths manually in fallback section.")

if IPYW_AVAILABLE:
    fu1 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data1")
    fu2 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data2")
    fmt = widgets.Dropdown(options=["CSV", "Excel"], value="CSV", description="Output")
    run_btn = widgets.Button(description="Run Pipeline", button_style="primary")
    out_area = widgets.Output()

    def on_run_clicked(_):
        with out_area:
            out_area.clear_output()
            if len(fu1.value) == 0 or len(fu2.value) == 0:
                print("Please upload both files.")
                return
            key1 = list(fu1.value.keys())[0]
            key2 = list(fu2.value.keys())[0]
            f1 = fu1.value[key1]
            f2 = fu2.value[key2]
            df1 = _read_any(f1["content"], key1)
            df2 = _read_any(f2["content"], key2)
            print("Files loaded")
            print(f"data1 shape: {df1.shape}")
            print(f"data2 shape: {df2.shape}")
            schema = infer_schema(df1)
            mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
            aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)
            print_report(schema, mapping, rep)
            out_path = "outputdata.csv" if fmt.value=="CSV" else "outputdata.xlsx"
            abs_path = _write_output(aligned, out_path)
            print(f"\nOutput written to: {abs_path}")

    run_btn.on_click(on_run_clicked)
    display(fu1, fu2, fmt, run_btn, out_area)

# -------------------------
# Fallback (no widgets)
# -------------------------
# Uncomment and set paths if not using widgets
# data1_path = "data1.xlsx"
# data2_path = "data2.xlsx"
# output_path = "outputdata.csv"
# df1 = pd.read_excel(data1_path)
# df2 = pd.read_excel(data2_path)
# schema = infer_schema(df1)
# mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
# aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)
# print_report(schema, mapping, rep)
# abs_path = _write_output(aligned, output_path)
# print(f"\nOutput written to: {abs_path}")


If you see upload widgets below, use them. Otherwise, set file paths manually in fallback section.


FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data1')

FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data2')

Dropdown(description='Output', options=('CSV', 'Excel'), value='CSV')

Button(button_style='primary', description='Run Pipeline', style=ButtonStyle())

Output()

# -------------------------------------------------------------

In [16]:
# AI/ML Data Format Alignment Pipeline (Simplified & Strict Schema Based)
# Author: You
# Purpose: 
#   - User uploads data1 (schema) and data2 (content)
#   - Program maps data2 into data1's schema
#   - Generates outputdata.csv or outputdata.xlsx

import pandas as pd
import numpy as np
import difflib
import re
import io
import os

# -------------------------
# Helpers
# -------------------------

def _normalize_colname(s: str) -> str:
    """Lowercase + strip + remove special chars for fuzzy matching."""
    if s is None:
        return ""
    s = str(s).strip().lower()
    s = re.sub(r"[\s\-_]+", " ", s)
    s = re.sub(r"[^a-z0-9 ]", "", s)
    return s.strip()

def best_column_match(target_col: str, candidates: list, threshold: float = 0.6) -> str | None:
    """Find closest matching column from candidates for target_col."""
    norm_target = _normalize_colname(target_col)
    norm_map = {_normalize_colname(c): c for c in candidates}
    if norm_target in norm_map:
        return norm_map[norm_target]
    best = difflib.get_close_matches(norm_target, list(norm_map.keys()), n=1, cutoff=threshold)
    if best:
        return norm_map[best[0]]
    return None

def build_column_mapping(schema_cols: list, data2_cols: list, cutoff: float = 0.6):
    """Map schema columns (data1) -> closest data2 columns."""
    mapping = {}
    for col in schema_cols:
        mapping[col] = best_column_match(col, data2_cols, cutoff)
    return mapping

def align_to_schema(df_data2: pd.DataFrame, df_data1: pd.DataFrame):
    """Rearrange df_data2 content into schema of df_data1."""
    schema_cols = list(df_data1.columns)
    mapping = build_column_mapping(schema_cols, list(df_data2.columns))
    
    output = pd.DataFrame(columns=schema_cols)
    for col in schema_cols:
        source_col = mapping.get(col)
        if source_col and source_col in df_data2.columns:
            output[col] = df_data2[source_col]
        else:
            output[col] = np.nan  # fill empty if no match
    
    return output, mapping

def _read_any(file_bytes: bytes, filename: str) -> pd.DataFrame:
    if filename.lower().endswith(".csv"):
        return pd.read_csv(io.BytesIO(file_bytes))
    if filename.lower().endswith((".xlsx", ".xls")):
        return pd.read_excel(io.BytesIO(file_bytes))
    raise ValueError("Unsupported file format")

def _write_output(df: pd.DataFrame, out_path: str):
    if out_path.lower().endswith(".csv"):
        df.to_csv(out_path, index=False)
    elif out_path.lower().endswith((".xlsx", ".xls")):
        df.to_excel(out_path, index=False)
    else:
        raise ValueError("Output file must be .csv or .xlsx")
    return os.path.abspath(out_path)

# -------------------------
# Example Usage (No Widgets)
# -------------------------
# Replace with your actual files
# data1_path = "data1.xlsx"
# data2_path = "data2.xlsx"
# output_path = "outputdata.xlsx"

# df1 = pd.read_excel(data1_path)  # schema
# df2 = pd.read_excel(data2_path)  # content
# aligned, mapping = align_to_schema(df2, df1)
# print("Column Mapping:", mapping)
# abs_path = _write_output(aligned, output_path)
# print(f"Output written to: {abs_path}")


In [19]:
import pandas as pd
import difflib
import numpy as np

# ===========================================================
# AI/ML Data Format Alignment Pipeline (Simplified & Strict Schema Based)
# Author: You
# Purpose: 
#   - User uploads data1 (schema) and data2 (content)
#   - Program maps data2 into data1's schema
#   - Generates outputdata.csv or outputdata.xlsx
# ===========================================================

# Load schema (data1) and content (data2)
data1 = pd.read_excel("format.xlsx")     # Schema file (defines column names)
data2 = pd.read_excel("Infor_file.xlsx") # Content file

# Extract schema columns
schema_columns = list(data1.columns)

# Create empty DataFrame with schema columns
aligned_data = pd.DataFrame(columns=schema_columns)

# Map columns from data2 to schema using fuzzy matching
for col in schema_columns:
    match = difflib.get_close_matches(col, data2.columns, n=1, cutoff=0.6)
    if match:
        aligned_data[col] = data2[match[0]]
    else:
        aligned_data[col] = np.nan  # Fill missing schema columns with NaN

# ===========================================================
# PREVIEW STEP
# ===========================================================
print("\n===== Preview of Aligned Data (first 5 rows) =====\n")
print(aligned_data.head())
print("\n=================================================\n")
user_input = input("Do you want to save the aligned data? (yes/no): ").strip().lower()

# ===========================================================
# SAVE STEP
# ===========================================================
if user_input in ["yes", "y"]:
    # Save as both CSV and Excel
    aligned_data.to_csv("outputdata.csv", index=False)
    aligned_data.to_excel("outputdata.xlsx", index=False)
    print("Aligned data saved as 'outputdata.csv' and 'outputdata.xlsx'")
else:
    print("Save cancelled. Data not written to file.")


===== Preview of Aligned Data (first 5 rows) =====

  Student_Name Student_Surename Student_Category             Student_Address  \
0        Mohit        Janbandhu               SC  Patil Nagar, Bavdhan, Pune   

  Student_Batch Student_Specilization  Student_Cell_No  \
0       2024-25     Business Analysis       8806818081   

                Stu_EmailID  
0  mohitjanbandhu@gmail.com  


Do you want to save the aligned data? (yes/no): yes
Aligned data saved as 'outputdata.csv' and 'outputdata.xlsx'


# ==========================================================

In [21]:
import io
import difflib
import numpy as np
import pandas as pd
import ipywidgets as widgets
from IPython.display import display

# -------------------------
# Helpers
# -------------------------

def align_data(df_schema, df_data):
    schema_columns = list(df_schema.columns)
    aligned = pd.DataFrame(columns=schema_columns)
    mapping = {}
    
    for col in schema_columns:
        match = difflib.get_close_matches(col, df_data.columns, n=1, cutoff=0.6)
        if match:
            aligned[col] = df_data[match[0]]
            mapping[col] = match[0]
        else:
            aligned[col] = np.nan
            mapping[col] = None
    return aligned, mapping


def preview_and_download(aligned, mapping, fmt):
    # Show mapping table
    print("\n===== Column Mapping (data1 -> data2) =====")
    for k, v in mapping.items():
        print(f"{k:25s} <-- {v if v else 'None'}")

    # Show preview of aligned data
    print("\n===== Preview (first 5 rows) =====")
    display(aligned.head())

    # Save output
    out_path = "outputdata.csv" if fmt == "CSV" else "outputdata.xlsx"
    if fmt == "CSV":
        aligned.to_csv(out_path, index=False)
    else:
        aligned.to_excel(out_path, index=False)

    # Download link
    with open(out_path, "rb") as f:
        btn = widgets.FileDownload(data=f.read(), 
                                   filename=out_path, 
                                   description=f" Download {out_path}")
    display(btn)


# -------------------------
# Jupyter UI
# -------------------------

fu1 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data1")
fu2 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data2")
fmt = widgets.Dropdown(options=["CSV", "Excel"], value="CSV", description="Output")
run_btn = widgets.Button(description="Run Pipeline", button_style="primary")
out_area = widgets.Output()

def on_run_clicked(_):
    with out_area:
        out_area.clear_output()
        if len(fu1.value) == 0 or len(fu2.value) == 0:
            print("Please upload both files.")
            return
        
        # Read uploaded files
        key1 = list(fu1.value.keys())[0]
        key2 = list(fu2.value.keys())[0]
        f1 = fu1.value[key1]
        f2 = fu2.value[key2]

        # Auto-detect format
        def read_any(file_bytes, filename):
            if filename.lower().endswith(".csv"):
                return pd.read_csv(io.BytesIO(file_bytes))
            else:
                return pd.read_excel(io.BytesIO(file_bytes))
        
        df1 = read_any(f1["content"], key1)
        df2 = read_any(f2["content"], key2)

        print("Files loaded")
        print(f"data1 (schema) shape: {df1.shape}")
        print(f"data2 (content) shape: {df2.shape}")

        # Align data
        aligned, mapping = align_data(df1, df2)
        preview_and_download(aligned, mapping, fmt.value)

run_btn.on_click(on_run_clicked)
display(fu1, fu2, fmt, run_btn, out_area)


FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data1')

FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data2')

Dropdown(description='Output', options=('CSV', 'Excel'), value='CSV')

Button(button_style='primary', description='Run Pipeline', style=ButtonStyle())

Output()

In [24]:
# AI/ML Data Format Alignment Pipeline for Jupyter
# Author: You
# Purpose: Map data2 into data1's schema and export outputdata as CSV or Excel

import io
import os
import re
import difflib
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any, Optional

try:
    import ipywidgets as widgets
    from IPython.display import display
    IPYW_AVAILABLE = True
except Exception:
    IPYW_AVAILABLE = False

# -------------------------
# Helpers
# -------------------------

def _normalize_colname(s: str) -> str:
    if s is None:
        return ""
    s = s.strip().lower()
    s = re.sub(r"[\s\-_]+", " ", s)
    s = re.sub(r"[^a-z0-9 %]", "", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _infer_dtype_series(s: pd.Series) -> str:
    # Try boolean
    unique_lower = set(str(x).strip().lower() for x in s.dropna().unique()[:100])
    bool_set = {"true","false","yes","no","y","n","0","1"}
    if unique_lower and unique_lower.issubset(bool_set):
        return "boolean"

    # Try datetime
    try:
        pd.to_datetime(s.dropna().sample(min(len(s.dropna()), 200), random_state=0), errors="raise", infer_datetime_format=True)
        return "datetime"
    except Exception:
        pass

    # Try integer
    try:
        x = pd.to_numeric(s.dropna(), errors="raise")
        if np.all(np.mod(x, 1) == 0):
            return "integer"
        return "float"
    except Exception:
        pass

    return "string"

def infer_schema(df: pd.DataFrame) -> Dict[str, str]:
    schema = {}
    for c in df.columns:
        schema[c] = _infer_dtype_series(df[c])
    return schema

def _coerce_to_dtype(series: pd.Series, target: str) -> Tuple[pd.Series, float]:
    s = series.copy()
    success_ratio = 1.0

    if target == "string":
        s = s.astype(str)
        return s, success_ratio

    if target == "boolean":
        mapping = {
            "true": True, "false": False,
            "yes": True, "no": False,
            "y": True, "n": False,
            "1": True, "0": False,
            "True": True, "False": False
        }
        original = s.copy()
        s = s.astype(str).str.strip()
        s = s.map(lambda x: mapping.get(x, np.nan))
        success_ratio = 1.0 - (s.isna() & ~original.isna()).mean()
        return s, success_ratio

    if target in ("integer", "float"):
        coerced = pd.to_numeric(s, errors="coerce")
        success_ratio = 1.0 - coerced.isna().mean()
        if target == "integer":
            # keep as Int64 to allow NA
            s = coerced.round().astype("Int64")
        else:
            s = coerced.astype(float)
        return s, success_ratio

    if target == "datetime":
        coerced = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
        success_ratio = 1.0 - coerced.isna().mean()
        return coerced, success_ratio

    # default
    return s, 1.0

def best_column_match(target_col: str, candidates: List[str], threshold: float = 0.75) -> Optional[str]:
    # exact normalized match first
    norm_target = _normalize_colname(target_col)
    norm_map = {_normalize_colname(c): c for c in candidates}
    if norm_target in norm_map:
        return norm_map[norm_target]
    # difflib similarity
    best = difflib.get_close_matches(norm_target, list(norm_map.keys()), n=1, cutoff=threshold)
    if best:
        return norm_map[best[0]]
    return None

def build_column_mapping(
    data1_cols: List[str],
    data2_cols: List[str],
    cutoff: float = 0.75
) -> Dict[str, Optional[str]]:
    mapping = {}
    unmatched_data2 = set(data2_cols)
    # first pass: exact normalized matches
    norm_to_data2 = {_normalize_colname(c): c for c in data2_cols}
    for c in data1_cols:
        norm = _normalize_colname(c)
        if norm in norm_to_data2:
            mapping[c] = norm_to_data2[norm]
            unmatched_data2.discard(norm_to_data2[norm])
        else:
            mapping[c] = None
    # second pass: fuzzy for those still None
    for c in data1_cols:
        if mapping[c] is None:
            match = best_column_match(c, list(unmatched_data2), threshold=cutoff)
            if match is not None:
                mapping[c] = match
                unmatched_data2.discard(match)
    return mapping

def align_to_schema(
    df_data2: pd.DataFrame,
    schema_data1: Dict[str, str],
    mapping: Dict[str, Optional[str]],
    fill_missing_with: Any = np.nan
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    report = {
        "matched_columns": [],
        "unmatched_data1_columns": [],
        "unmapped_data2_columns": [],
        "type_coercion_success": {},
    }

    # Track unmapped data2
    mapped_targets = set([v for v in mapping.values() if v is not None])
    report["unmapped_data2_columns"] = [c for c in df_data2.columns if c not in mapped_targets]

    aligned = pd.DataFrame()
    for col_data1, dtype in schema_data1.items():
        source_col = mapping.get(col_data1, None)
        if source_col is not None and source_col in df_data2.columns:
            series = df_data2[source_col]
            # basic string cleanup if target string
            if dtype == "string":
                series = series.astype(str).str.strip()
            coerced, success = _coerce_to_dtype(series, dtype)
            aligned[col_data1] = coerced
            report["matched_columns"].append((col_data1, source_col, round(success, 4)))
            report["type_coercion_success"][col_data1] = round(success, 4)
        else:
            aligned[col_data1] = fill_missing_with
            report["unmatched_data1_columns"].append(col_data1)

    # reorder columns to data1 order
    aligned = aligned[list(schema_data1.keys())]
    return aligned, report

def print_report(schema: Dict[str, str], mapping: Dict[str, Optional[str]], report: Dict[str, Any]) -> None:
    print("Data1 schema")
    print(pd.DataFrame(list(schema.items()), columns=["column", "dtype"]).to_string(index=False))
    print("\nColumn mapping data1 -> data2")
    mapping_rows = []
    for k, v in mapping.items():
        mapping_rows.append((k, v if v is not None else "None"))
    print(pd.DataFrame(mapping_rows, columns=["data1_column", "data2_source"]).to_string(index=False))

    print("\nMatched columns with coercion success")
    if report["matched_columns"]:
        print(pd.DataFrame(report["matched_columns"], columns=["data1_column", "data2_source", "success"]).to_string(index=False))
    else:
        print("None")

    print("\nUnmatched data1 columns")
    print(report["unmatched_data1_columns"] if report["unmatched_data1_columns"] else "None")

    print("\nUnmapped data2 columns")
    print(report["unmapped_data2_columns"] if report["unmapped_data2_columns"] else "None")

    if report.get("type_coercion_success"):
        successes = list(report["type_coercion_success"].values())
        overall = round(float(np.mean(successes)), 4) if successes else 0.0
        print(f"\nApproximate overall type coercion success: {overall}")

def _read_any(file_bytes: bytes, filename: str) -> pd.DataFrame:
    name = filename.lower()
    if name.endswith(".csv"):
        return pd.read_csv(io.BytesIO(file_bytes))
    if name.endswith(".xlsx") or name.endswith(".xls"):
        return pd.read_excel(io.BytesIO(file_bytes))
    raise ValueError("Unsupported file format. Use .csv or .xlsx")

def _read_path(path: str) -> pd.DataFrame:
    if path.lower().endswith(".csv"):
        return pd.read_csv(path)
    if path.lower().endswith((".xlsx", ".xls")):
        return pd.read_excel(path)
    raise ValueError("Unsupported file format. Use .csv or .xlsx")

def _write_output(df: pd.DataFrame, out_path: str) -> str:
    if out_path.lower().endswith(".csv"):
        df.to_csv(out_path, index=False)
    elif out_path.lower().endswith((".xlsx", ".xls")):
        df.to_excel(out_path, index=False)
    else:
        raise ValueError("Output file must be .csv or .xlsx")
    return os.path.abspath(out_path)

# -------------------------
# UI: either widgets upload or direct paths
# -------------------------

print("If you see upload widgets below, use them. Otherwise, set file paths in the fallback section.")

upload_data1 = None
upload_data2 = None
out_format = None

if IPYW_AVAILABLE:
    fu1 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data1")
    fu2 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data2")
    fmt = widgets.Dropdown(options=["CSV", "Excel"], value="CSV", description="Output")
    run_btn = widgets.Button(description="Run Pipeline", button_style="primary")
    out_area = widgets.Output()

    def on_run_clicked(_):
        with out_area:
            out_area.clear_output()
            if len(fu1.value) == 0 or len(fu2.value) == 0:
                print("Please upload both files.")
                return
            # read
            key1 = list(fu1.value.keys())[0]
            key2 = list(fu2.value.keys())[0]
            f1 = fu1.value[key1]
            f2 = fu2.value[key2]
            df1 = _read_any(f1["content"], key1)
            df2 = _read_any(f2["content"], key2)

            print("Files loaded")
            print(f"data1 shape: {df1.shape}")
            print(f"data2 shape: {df2.shape}")

            schema = infer_schema(df1)
            mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
            aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)

            print_report(schema, mapping, rep)

            # choose output path in working dir
            if fmt.value == "CSV":
                out_path = "outputdata.csv"
            else:
                out_path = "outputdata.xlsx"

            abs_path = _write_output(aligned, out_path)
            print(f"\nOutput written to: {abs_path}")

    run_btn.on_click(on_run_clicked)

    display(fu1, fu2, fmt, run_btn, out_area)

# -------------------------
# Fallback: use local file paths if widgets are not available
# -------------------------

# Uncomment and set these paths if running in a non-widget environment:
# data1_path = "path/to/your/data1.xlsx"  # or .csv
# data2_path = "path/to/your/data2.csv"   # or .xlsx
# output_path = "outputdata.csv"          # or .xlsx

# If you use fallback, run this block after setting paths above:
# df1 = _read_path(data1_path)
# df2 = _read_path(data2_path)
# schema = infer_schema(df1)
# mapping = build_column_mapping(list(df1.columns), list(df2.columns), cutoff=0.75)
# aligned, rep = align_to_schema(df2, schema, mapping, fill_missing_with=np.nan)
# print_report(schema, mapping, rep)
# abs_path = _write_output(aligned, output_path)
# print(f"\nOutput written to: {abs_path}")

If you see upload widgets below, use them. Otherwise, set file paths in the fallback section.


FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data1')

FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data2')

Dropdown(description='Output', options=('CSV', 'Excel'), value='CSV')

Button(button_style='primary', description='Run Pipeline', style=ButtonStyle())

Output()

# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

In [23]:
import io
import difflib
import numpy as np
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, FileLink

# -------------------------
# Helpers
# -------------------------

def align_data(df_schema, df_data):
    schema_columns = list(df_schema.columns)
    aligned = pd.DataFrame(columns=schema_columns)
    mapping = {}
    
    for col in schema_columns:
        match = difflib.get_close_matches(col, df_data.columns, n=1, cutoff=0.6)
        if match:
            aligned[col] = df_data[match[0]]
            mapping[col] = match[0]
        else:
            aligned[col] = np.nan
            mapping[col] = None
    return aligned, mapping


def preview_and_download(aligned, mapping, fmt):
    # Show mapping table
    print("\n===== Column Mapping (data1 -> data2) =====")
    for k, v in mapping.items():
        print(f"{k:25s} <-- {v if v else 'None'}")

    # Show preview of aligned data
    print("\n===== Preview (first 5 rows) =====")
    display(aligned.head())

    # Save output
    out_path = "outputdata.csv" if fmt == "CSV" else "outputdata.xlsx"
    if fmt == "CSV":
        aligned.to_csv(out_path, index=False)
    else:
        aligned.to_excel(out_path, index=False)

    # Provide download link
    print(f"\nFile saved as {out_path}. Click below to download:")
    display(FileLink(out_path))


# -------------------------
# Jupyter UI
# -------------------------

fu1 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data1")
fu2 = widgets.FileUpload(accept=".csv,.xlsx,.xls", multiple=False, description="Upload data2")
fmt = widgets.Dropdown(options=["CSV", "Excel"], value="CSV", description="Output")
run_btn = widgets.Button(description="Run Pipeline", button_style="primary")
out_area = widgets.Output()

def on_run_clicked(_):
    with out_area:
        out_area.clear_output()
        if len(fu1.value) == 0 or len(fu2.value) == 0:
            print("Please upload both files.")
            return
        
        # Read uploaded files
        key1 = list(fu1.value.keys())[0]
        key2 = list(fu2.value.keys())[0]
        f1 = fu1.value[key1]
        f2 = fu2.value[key2]

        # Auto-detect format
        def read_any(file_bytes, filename):
            if filename.lower().endswith(".csv"):
                return pd.read_csv(io.BytesIO(file_bytes))
            else:
                return pd.read_excel(io.BytesIO(file_bytes))
        
        df1 = read_any(f1["content"], key1)
        df2 = read_any(f2["content"], key2)

        print("Files loaded")
        print(f"data1 (schema) shape: {df1.shape}")
        print(f"data2 (content) shape: {df2.shape}")

        # Align data
        aligned, mapping = align_data(df1, df2)
        preview_and_download(aligned, mapping, fmt.value)

run_btn.on_click(on_run_clicked)
display(fu1, fu2, fmt, run_btn, out_area)


FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data1')

FileUpload(value={}, accept='.csv,.xlsx,.xls', description='Upload data2')

Dropdown(description='Output', options=('CSV', 'Excel'), value='CSV')

Button(button_style='primary', description='Run Pipeline', style=ButtonStyle())

Output()