# ==================================================================================
# 1. Python code to dynamically create the Instagram script 
# ==================================================================================

In [None]:
import pandas as pd
import re

# --- Load schema ---
schema_df = pd.read_csv("/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_structures_IG_2_new.csv")
schema_df.columns = schema_df.columns.str.strip()
schema_df = schema_df.dropna(subset=["variable", "value"])

# --- Helper to build access path for get_in/get_list ---
def build_access_path(row):
    parts = []
    for i in range(1, 6):
        key = f"col_path_{i}"
        list_key = f"{key}_LIST"
        if pd.notna(row.get(key)):
            value = str(row[key]).strip()
            if pd.notna(row.get(list_key)) and str(row[list_key]).strip().upper() == "LIST":
                parts.append(f"[{value}]")
            else:
                parts.append(value)
    return parts

# --- Generate extractor function per json_name ---
def generate_df_function_by_json(json_name: str, group: pd.DataFrame) -> str:
    json_name_no_ext = json_name.replace(".json", "")
    func_name = re.sub(r"\W|^(?=\d)", "_", json_name_no_ext.lower()) + "_df"
    row_path = group["row_path"].iloc[0]

    col_paths = {}
    for _, row in group.iterrows():
        access_path = build_access_path(row)
        if not access_path:
            continue
        col_name = re.sub(r"\W|^(?=\d)", "_", access_path[-1].replace(" ", "_"))
        col_paths[col_name] = access_path

    # --- Build function body ---
    lines = []
    lines.append(f"def {func_name}(file_input: list[str]) -> pd.DataFrame:")
    lines.append(f'    data = read_json(file_input, ["*/{json_name}"])')
    lines.append("    rows = get_list(data, [\"" + row_path + "\"])")
    lines.append("    if not rows:")
    lines.append("        return pd.DataFrame([])")
    lines.append("    out = []")
    lines.append("    for entry in rows:")
    lines.append("        row = {}")

    for col, access_path in col_paths.items():
        path_str = "[" + ", ".join([f'"{p}"' if not p.startswith("[") else p for p in access_path]) + "]"
        lines.append(f"        row[\"{col}\"] = get_in(entry, {path_str})")

    lines.append("        out.append(row)")
    lines.append("    df = pd.DataFrame(out)")
    lines.append('    if "time" in df.columns:')
    lines.append('        df["date"] = pd.to_datetime(df["time"], unit="s").dt.strftime("%Y-%m-%d %H:%M:%S")')
    lines.append('        df = df.sort_values("date")')
    lines.append("    return df\n")

    return "\n".join(lines)

# --- Generate explicit donation flow function ---
def generate_donation_flow_function_explicit(schema_df: pd.DataFrame) -> str:
    lines = []
    lines.append("def create_donation_flow(file_input: list[str]):")
    lines.append('    """Creates donation flow for Instagram JSON data."""')
    lines.append("    tables = []\n")

    grouped = schema_df.groupby("json_name")

    for json_name, group in grouped:
        name_base = re.sub(r"\W|^(?=\d)", "_", json_name.replace(".json", "").lower())
        func_name = name_base + "_df"
        table_name = name_base
        lines.append(f"    # Extract table: {table_name}")
        lines.append("    try:")
        lines.append(f"        {table_name}_table = donation_table(")
        lines.append(f'            name="{table_name}",')
        lines.append(f"            df={func_name}(file_input),")
        lines.append(f'            title={{\"en\": \"{table_name}\", \"nl\": \"{table_name}\"}},')
        lines.append("        )")
        lines.append(f"        tables.append({table_name}_table)")
        lines.append("    except Exception as e:")
        lines.append(f"        logger.warning(f\"Skipping {table_name}: {{e}}\")\n")

    lines.append("    if tables:")
    lines.append("        return donation_flow(")
    lines.append('            id="instagram",')
    lines.append("            tables=tables")
    lines.append("        )")
    lines.append("    else:")
    lines.append("        return None")

    return "\n".join(lines)

# --- Generate all extractors ---
generated_functions = [
    generate_df_function_by_json(json_name, group)
    for json_name, group in schema_df.groupby("json_name")
]

# --- Save to output file ---
output_file = "/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/instagram_generated_extractors.py"
with open(output_file, "w", encoding="utf-8") as f:
    f.write("# Auto-generated Instagram extractors\n\n")
    f.write("import pandas as pd\n")
    f.write("import logging\n")
    f.write("from port.helpers.donation_flow import donation_table, donation_flow\n")
    f.write("from port.helpers.readers import read_json\n")
    f.write("from port.helpers.parsers import get_in, get_list\n\n")
    f.write("logger = logging.getLogger(__name__)\n\n")
    f.write("\n\n".join(generated_functions))
    f.write("\n\n")
    f.write(generate_donation_flow_function_explicit(schema_df))

# ==================================================================================
# 2. Python code to dynamically create the Facebook script 
# ==================================================================================

In [None]:
import pandas as pd
import json
import re
import inspect
import keyword 

# Load schema       - Alt Path (/mnt/c/Users/arodilla/Downloads/Merged_structures_IG.csv)
schema_df = pd.read_csv('/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_structures_FB.csv')
schema_df.columns = schema_df.columns.str.strip()
schema_df = schema_df.dropna(subset=["variable", "value"])

# Helper function to flatten the schema and map to simplified column names
def flatten_schema(schema: dict, parent_key: str = '') -> dict:
    """Recursively flattens nested schema dicts to dot notation paths and returns user-friendly column names."""
    items = {}
    for k, v in schema.items():
        new_key = f"{parent_key}.{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten_schema(v, new_key))
        else:
            items[new_key] = v
    return items

def generate_df_function_by_json(json_name: str, group: pd.DataFrame) -> str:
    """
    Generates a parsing function per json_name, using 'row_path' for row selection
    and up to 5 levels of nested 'col_path_*' fields for column mapping.
    """
    json_name_no_ext = json_name.replace(".json", "")
    row_path = group["row_path"].iloc[0]  # updated to use new column

    # Clean function name
    func_name = re.sub(r"\W|^(?=\d)", "_", json_name_no_ext.lower()) + "_df"

    col_paths = {}
    col_path_fields = ["col_path_1", "col_path_2", "col_path_3", "col_path_4", "col_path_5"]

    for _, row in group.iterrows():
        # Extract all non-null parts of the path
        path_parts = [str(row[col]) for col in col_path_fields if pd.notna(row.get(col))]
        if not path_parts:
            continue

        # The column name is the last part
        col_name = path_parts[-1]
        col_name = re.sub(r"\W|^(?=\d)", "_", col_name.strip().replace(" ", "_"))

        # If it's a Python keyword, add underscore
        if keyword.iskeyword(col_name):
            col_name += "_"

        json_path = ".".join(path_parts)
        col_paths[col_name] = [json_path]

    # Start function definition
    lines = []
    lines.append(f"def {func_name}(file_input: list[str]) -> pd.DataFrame:")
    lines.append(f'    data = read_json(file_input, ["*/{json_name_no_ext}.json"])')
    lines.append("")
    lines.append("    df = parse_json(data,")
    lines.append(f'        row_path=["$.{row_path}"],')
    lines.append("        col_paths=dict(")

    for col_name, path in col_paths.items():
        lines.append(f'        {col_name} = {path},')

    lines.append("        )")
    lines.append("    )")
    lines.append("")

    lines.append('    if "time" in df.columns:')
    lines.append('        df["date"] = pd.to_datetime(df["time"], unit="s").dt.strftime("%Y-%m-%d %H:%M:%S")')
    lines.append('        df = df.sort_values("date")')
    lines.append("")

    lines.append("    return df\n")

    return "\n".join(lines)

def generate_donation_flow_function_explicit(schema_df: pd.DataFrame) -> str:
    """
    Generates an explicit version of create_donation_flow that includes hardcoded try/except
    blocks per known extractor function.
    """
    lines = []
    lines.append("def create_donation_flow(file_input: list[str]):")
    lines.append('    """')
    lines.append("    Creates a donation flow for Facebook data, explicitly trying each extractor function.")
    lines.append("    Only creates tables for data that's available in the provided files.")
    lines.append('    """')
    lines.append("    tables = []")
    lines.append("    #print(file_input)\n")

    grouped = schema_df.groupby("json_name")

    for json_name, group in grouped:
        #name_base = json_name.replace(".json", "").replace("'","_").replace("-","_").lower()
        name_base = re.sub(r"\W|^(?=\d)", "_", json_name.replace(".json", "").lower())
        func_name = re.sub(r"\W|^(?=\d)", "_", name_base) + "_df"
        table_name = name_base

        # Basic fallback titles (could be improved by a proper map)
        #raw_title = group["value"].iloc[0] if pd.notna(group["value"].iloc[0]) else table_name.replace("_", " ").title()
        #escaped_title = raw_title.replace('"', '\\"').replace("'", "\\'")
        escaped_title = table_name
        english_title = escaped_title
        dutch_title = escaped_title  # or translate if needed

        lines.append(f"    # {english_title}")
        lines.append("    try:")
        lines.append(f"        {table_name}_table = donation_table(")
        lines.append(f'            name="{table_name}",')
        lines.append(f"            df={func_name}(file_input),")
        lines.append(f'            title={{\"en\": \"{english_title}\", \"nl\": \"{dutch_title}\"}},')
        lines.append("        )")
        lines.append(f"        tables.append({table_name}_table)")
        lines.append("    except Exception as e:")
        lines.append(f'        #print(f\"Skipping {table_name}: {{e}}\")')
        lines.append("        pass\n")

    lines.append("    # Only create the donation flow if we have at least one table")
    lines.append("    if tables:")
    lines.append("        return donation_flow(")
    lines.append('            id=\"facebook\",')
    lines.append("            tables=tables")
    lines.append("        )")
    lines.append("    else:")
    lines.append('        #print(\"No tables could be generated from the provided files\")')
    lines.append("        return None")

    return "\n".join(lines)

# Group by json_name (1 table per JSON file)
generated_functions = [
    generate_df_function_by_json(json_name, group)
    for json_name, group in schema_df.groupby("json_name")
]

# Function to create donation flow (remains the same)
def create_donation_flow(file_input: list[str]):
    """
    Creates a donation flow using pre-defined extractors applied to file_input ZIP.
    """
    tables = []

    # List of all available extractor functions dynamically
    extraction_functions = {}

    for row in schema_df.iterrows():
        filename = row[1]["variable"].split("/")[-1]
        func_name = re.sub(r"\W|^(?=\d)", "_", filename.replace(".json", "").lower()) + "_df"
        extraction_functions[filename] = globals().get(func_name)

    # Run the extractors
    for filename, extractor_func in extraction_functions.items():
        try:
            df = pd.DataFrame(extractor_func(file_input))
            if not df.empty:
                table_name = filename.replace(".json", "").capitalize()
                tables.append(
                    donation_table(
                        name=table_name,
                        df=df,
                        title={"en": table_name}
                    )
                )
        except Exception as e:
            logger.error(f"Error running {extractor_func.__name__}: {e}")

    return donation_flow(
        id="Facebook",
        tables=tables
    )

# Save the generated functions to a file
all_code = "\n\n".join(generated_functions)
# Get the source code of the create_donation_flow function as a string
donation_flow_function_str = generate_donation_flow_function_explicit(schema_df)

# Save the generated functions to a file
all_code = "\n\n".join(generated_functions)
with open("src/framework/processing/py/port/helpers/facebook_generated_extractors.py", "w", encoding="utf-8") as f:
    f.write("# Auto-generated Facebook extractors\n\n")
    f.write("import pandas as pd\n")
    f.write("import logging\n")
    f.write("from port.helpers.donation_flow import donation_table, donation_flow\n")
    f.write("from port.helpers.readers import read_json\n")
    f.write("from port.helpers.parsers import parse_json\n\n")
    f.write("logger = logging.getLogger(__name__)\n\n")
    f.write(all_code)
    f.write("\n\n")
    f.write(donation_flow_function_str)

# ==================================================================================
# 3. Python code to dynamically create the TikTok script 
# ==================================================================================

In [None]:
import pandas as pd
import json
import inspect
from typing import List
from pathlib import Path

# -----------------------
# Utility Functions
# -----------------------
def get_in(d: dict, *keys):
    for key in keys:
        if isinstance(d, dict):
            d = d.get(key)
        else:
            return None
    return d

def get_list(d: dict, *keys):
    val = get_in(d, *keys)
    return val if isinstance(val, list) else []

def snake_case(name: str) -> str:
    return name.lower().replace("-", "_").replace(".json", "").replace(".js", "").replace(" ", "_")

# -----------------------
# Load Schema
# -----------------------
schema_path = '/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_structures_TT_new_2.csv'
schema_df = pd.read_csv(schema_path)
schema_df.columns = schema_df.columns.str.strip()

# -----------------------
# Extract Field Paths
# -----------------------
def build_field_mappings(schema_group: pd.DataFrame):
    static_fields = []
    list_blocks = {}

    for _, row in schema_group.iterrows():
        path = []
        list_path = []
        last_key = None
        row_path = row['row_path']
        path.append(row_path)

        for i in range(1, 6):
            col_val = row.get(f'col_path_{i}')
            list_status = row.get(f'col_path_{i}_LIST', 'NO LIST')
            if pd.notna(col_val):
                path.append(col_val)
                if list_status == 'LIST':
                    list_path = path.copy()

        last_key = path[-1] if path else None

        if list_path:
            list_path_tuple = tuple(list_path[:-1])
            field = list_path[-1]
            list_blocks.setdefault(list_path_tuple, set()).add(field)
        elif last_key:
            static_fields.append(path)

    return static_fields, list_blocks

# -----------------------
# Generate Extractor Function
# -----------------------
def generate_df_function(file_folder_name: str, group: pd.DataFrame) -> str:
    original_root_key = file_folder_name
    func_name = f"{snake_case(file_folder_name)}_df"

    lines = [
        f"def {func_name}(file_input: List[str]) -> pd.DataFrame:",
        "    try:",
        "        with open(file_input[0], 'r', encoding='utf-8') as f:",
        "            data = json.load(f)",
        f"        root_data = get_in(data, 'Activity','{original_root_key}')",
        f"        if not root_data:",
        f"            print(f'\u26a0\ufe0f No data found at path: {original_root_key}')",
        "            return pd.DataFrame()",
        "",
        "        base_row = {}",
    ]

    static_fields, list_blocks = build_field_mappings(group)

    # Static fields
    for path in static_fields:
        field = path[-1]
        path_str = ", ".join([f"'{p}'" for p in path[1:]])
        lines.append(f"        base_row['{field}'] = get_in(root_data, {path_str})")

    if not list_blocks:
        lines.append("        return pd.DataFrame([base_row])")
    else:
        lines.append("        all_records = []")

        for list_path, fields in list_blocks.items():
            path_str = ", ".join([f"'{p}'" for p in list_path[1:]])
            lines.append(f"        items = get_list(root_data, {path_str})")
            lines.append("        for item in items:")
            lines.append("            row = base_row.copy()")
            lines.append(f"            row['__source_list__'] = '{list_path[-1]}'")
            for field in sorted(fields):
                lines.append(f"            row['{field}'] = item.get('{field}', '.*?')")
            lines.append("            all_records.append(row)")

        lines.append("        return pd.DataFrame(all_records)")

    lines.extend([
        "    except Exception as e:",
        f"        print(f'\u274c Error in {func_name}:', e)",
        "        return pd.DataFrame()",
        ""
    ])

    return "\n".join(lines)

# -----------------------
# Generate Donation Flow
# -----------------------
def generate_donation_flow(schema_df: pd.DataFrame) -> str:
    lines = [
        "def create_donation_flow(file_input: List[str]):",
        '    """Create donation flow from TikTok JSON."""',
        "    tables = []",
        ""
    ]
    for file_name, group in schema_df.groupby("row_path"):
        root_key = snake_case(file_name)
        func_name = f"{root_key}_df"
        lines.extend([
            "    try:",
            f"        df = {func_name}(file_input)",
            "        if not df.empty:",
            "            tables.append(",
            f"                donation_table(name='{root_key}', df=df, title={{'en': '{root_key}'}})",
            "            )",
            "    except Exception as e:",
            f"        print(f'Error in {func_name}:', e)",
            ""
        ])
    lines.extend([
        "    if tables:",
        "        return donation_flow(id='tiktok', tables=tables)",
        "    else:",
        "        return None"
    ])
    return "\n".join(lines)

# -----------------------
# Generate & Write Output
# -----------------------
generated_functions = [
    generate_df_function(file_name, group)
    for file_name, group in schema_df.groupby("row_path")
]

donation_flow_code = generate_donation_flow(schema_df)

output_path = Path("/home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/tiktok_generated_extractors.py")
with open(output_path, "w", encoding="utf-8") as f:
    f.write("# Auto-generated TikTok extractors\n\n")
    f.write("import pandas as pd\n")
    f.write("import json\n")
    f.write("import logging\n")
    f.write("from port.helpers.donation_flow import donation_table, donation_flow\n")
    f.write("from typing import List\n\n")
    f.write(inspect.getsource(get_in))
    f.write("\n")
    f.write(inspect.getsource(get_list))
    f.write("\n\n")
    f.write("\n\n".join(generated_functions))
    f.write("\n\n")
    f.write(donation_flow_code)

print(f"Extractors written to {output_path}")

Extractors written to /home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/tiktok_generated_extractors.py


# ==================================================================================
# 4. Python code to dynamically create the Twitter script 
# ==================================================================================

In [None]:
import pandas as pd
import json
import re
import io
import zipfile
import logging
import inspect
from typing import List
from pathlib import Path

logger = logging.getLogger(__name__)

# -----------------------
# Utility Functions
# -----------------------
def get_in(d: dict, *keys):
    for key in keys:
        if isinstance(d, dict):
            d = d.get(key)
        else:
            return None
    return d

def get_list(d: dict, *keys):
    val = get_in(d, *keys)
    return val if isinstance(val, list) else []

def get_dict(d: dict, *keys):
    val = get_in(d, *keys)
    return val if isinstance(val, dict) else {}

def snake_case(name: str) -> str:
    return name.lower().replace("-", "_").replace(".json", "").replace(".js", "").replace(" ", "_")

def extract_path(row):
    path = []
    for col in ["row_path"] + [f"col_path_{i}" for i in range(1, 6)]:
        val = row.get(col)
        if pd.notna(val) and str(val).strip().upper() != "MISSING":
            path.append(str(val).strip())
    return path

def get_field_name(row):
    for i in reversed(range(1, 6)):
        val = row.get(f"col_path_{i}")
        if pd.notna(val) and val != 'MISSING':
            return str(val).strip()
    return None

def is_list_index(val):
    return str(val).isdigit()

# -----------------------
# Load Schema
# -----------------------
schema_df = pd.read_csv("/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_structures_X_new_2.csv")
schema_df.columns = schema_df.columns.str.strip()
schema_df = schema_df.dropna(subset=["json_name", "row_path"])
schema_df = schema_df[schema_df['row_path'] != 'No data']

# -----------------------
# Read JS File from ZIP
# -----------------------
def read_js(file_input: list[str], target_files: list[str]) -> list[dict]:
    extracted_data = []
    for zip_path in file_input:
        with zipfile.ZipFile(zip_path, "r") as z:
            for target_file in target_files:
                js_files = [f for f in z.namelist() if target_file in f]
                if js_files:
                    with z.open(js_files[0]) as raw_file:
                        with io.TextIOWrapper(raw_file, encoding="utf8") as text_file:
                            lines = text_file.readlines()
                        lines[0] = re.sub(r"^.*? = ", "", lines[0])
                        try:
                            data = json.loads("".join(lines))

                            if isinstance(data,list):
                                extracted_data.extend(data)
                            else:
                                extracted_data.append(data)

                        except json.JSONDecodeError as e:
                            logger.error(f"Error decoding {target_file} in {zip_path}: {e}")
    return extracted_data

# -----------------------
# Generate Extractor Function
# -----------------------
def generate_df_function(json_name: str, group: pd.DataFrame) -> str:
    root_key = json_name
    func_name = f"{snake_case(json_name)}_df"

    lines = [
        f"def {func_name}(file_input: list[str]) -> pd.DataFrame:",
        f"    data = read_js(file_input, ['/{json_name}'])",
        "    records = []",
        "    for item in data:",
        "        base_row = {}",
    ]

    static_fields = []
    list_blocks = {}
    seen_static = set()

    for _, row in group.iterrows():
        path = extract_path(row)

        for i, key in enumerate(path):
            if str(row.get(f"col_path_{i+1}_LIST", "")).strip().upper() == "LIST":
                list_path = tuple(path[:i+1])
                subfield_path = path[i+1:]
                if subfield_path:
                    list_blocks.setdefault(list_path, set()).add(tuple(subfield_path))
                break
        else:
            field = path[-1]
            if field not in seen_static:
                static_fields.append(path)
                seen_static.add(field)

    for path in static_fields:
        field = path[-1]
        path_str = "', '".join(path)
        lines.append(f"        base_row['{field}'] = get_in(item, '{path_str}')")

    if not list_blocks:
        lines.append("        records.append(base_row)")
    else:
        for list_path, fields in list_blocks.items():
            path_str = "', '".join(list_path)
            lines += [
                f"        for entry in get_list(item, '{path_str}'):",
                "            row = base_row.copy()",
                f"            row['__source_list__'] = '{list_path[-1]}'"
            ]
            for field_path in sorted(fields):
                field_name = field_path[-1]
                path_str = "', '".join(field_path)
                lines.append(f"            row['{field_name}'] = get_in(entry, '{path_str}')")
            lines.append("            records.append(row)")

    lines.append("    return pd.DataFrame(records)\n")
    return "\n".join(lines)

# -----------------------
# Generate Donation Flow
# -----------------------
def generate_donation_flow(schema_df: pd.DataFrame) -> str:
    lines = [
        "def create_donation_flow(file_input: list[str]):",
        '    """Create a donation flow for Twitter data."""',
        "    tables = []\n"
    ]
    for json_name, group in schema_df.groupby("json_name"):
        func_name = f"{snake_case(json_name)}_df"
        lines += [
            f"    try:",
            f"        df = {func_name}(file_input)",
            f"        if not df.empty:",
            f"            tables.append(",
            f"                donation_table(name='{snake_case(json_name)}', df=df, title={{'en': '{snake_case(json_name)}'}})",
            f"            )",
            f"    except Exception as e:",
            f"        logger.error(f'Skipping {func_name}: {{e}}')\n"
        ]

    lines += [
        "    if tables:",
        "        return donation_flow(id='twitter', tables=tables)",
        "    else:",
        "        return None"
    ]
    return "\n".join(lines)

# -----------------------
# Write Output File
# -----------------------
generated_functions = [
    generate_df_function(json_name, group)
    for json_name, group in schema_df.groupby("json_name")
    if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
]

donation_flow_function_str = generate_donation_flow(schema_df)

output_path = Path("/home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/twitter_generated_extractors.py")
with open(output_path, "w", encoding="utf-8") as f:
    f.write("# Auto-generated Twitter extractors\n\n")
    f.write("import pandas as pd\n")
    f.write("import json\n")
    f.write("import logging\n")
    f.write("import io\n")
    f.write("import zipfile\n")
    f.write("import re\n")
    f.write("from port.helpers.donation_flow import donation_table, donation_flow\n\n")
    f.write("logger = logging.getLogger(__name__)\n\n")
    for fn in [get_in, get_list, get_dict, snake_case, extract_path, get_field_name]:
        f.write(inspect.getsource(fn).strip() + "\n\n")
    f.write(inspect.getsource(read_js).strip() + "\n\n")
    f.write("\n\n".join(generated_functions))
    f.write("\n\n")
    f.write(donation_flow_function_str)

print(f"Extractors written to {output_path}")


Extractors written to /home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/twitter_generated_extractors.py


  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()
  if not group[[f"col_path_{i}" for i in range(1, 6)]].applymap(lambda v: pd.isna(v) or str(v).strip().upper() == "MISSING").all(axis=1).all()

# ==================================================================================
# 4. Python code to dynamically create the Youtube script 
# ==================================================================================

In [None]:
import pandas as pd
import json
import inspect
from typing import List
from pathlib import Path
import zipfile
import fnmatch
import io
import os

# -----------------------
# Utility Functions
# -----------------------
def read_csv_from_file_input(file_input: list[str], csv_filename: str) -> pd.DataFrame:
    """
    Reads a CSV file from a zip inside file_input.

    Args:
        file_input (list[str]): List of file paths, including the zip file.
        csv_filename (str): Name of the CSV file inside the zip.

    Returns:
        pd.DataFrame: The loaded DataFrame.
    """
    for path in file_input:
        if path.endswith('.zip'):
            with zipfile.ZipFile(path, 'r') as zip_ref:
                # Find matching file (e.g., endswith 'channel.csv')
                for name in zip_ref.namelist():
                    if name.endswith(csv_filename):
                        with zip_ref.open(name) as f:
                            try:
                                return pd.read_csv(f, encoding='utf-8')
                            except UnicodeDecodeError:
                                f.seek(0)
                                return pd.read_csv(f, encoding='latin1')  # fallback
    raise FileNotFoundError(f"{csv_filename} not found in ZIP files: {file_input}")

def get_in(d: dict, *keys):
    for key in keys:
        if isinstance(d, dict):
            d = d.get(key)
        elif isinstance(d, list) and isinstance(key, int) and len(d) > key:
            d = d[key]
        else:
            return None
    return d

def get_list(d: dict, *keys):
    val = get_in(d, *keys)
    return val if isinstance(val, list) else []

def snake_case(name: str) -> str:
    return name.lower().replace("-", "_").replace(".json", "").replace(".js", "").replace(" ", "_")

# -----------------------
# Read JSON from ZIP
# -----------------------
def read_json(file_input: List[str], pattern: List[str]) -> List[dict]:
    zip_path = file_input[0]
    matched_data = []

    with zipfile.ZipFile(zip_path, 'r') as zipf:
        for pat in pattern:
            for name in zipf.namelist():
                if fnmatch.fnmatch(name, pat):
                    with zipf.open(name) as f:
                        content = json.load(f)
                        if isinstance(content, list):
                            matched_data.extend(content)
                        else:
                            matched_data.append(content)
    return matched_data

# -----------------------
# Path Utilities
# -----------------------
def extract_path(row):
    path = []
    for col in ["row_path"] + [f"col_path_{i}" for i in range(1, 6)]:
        val = row.get(col)
        if pd.notna(val):
            path.append(str(val).strip())
    return path

# -----------------------
# Generate Extractor Function
# -----------------------
def generate_df_function(file_name: str, group: pd.DataFrame) -> str:
    func_name = f"{snake_case(file_name)}_df"
    json_path = f"*/{file_name}"

    lines = [
        f"def {func_name}(file_input: List[str]) -> pd.DataFrame:",
        f"    data = read_json(file_input, [\"{json_path}\"])\n",
        "    records = []",
        "    for entry in data:",
        "        records.append({"
    ]

    for _, row in group.iterrows():
        path = extract_path(row)
        field_name = "_".join(path)

        # Handle list with .join
        if path[-1] == "activityControls" or path[-1] == "products":
            path_str = "', '".join(path)
            lines.append(f"            \"{field_name}\": \", \".join(get_list(entry, '{path_str}')),")
        # Handle nested fields
        elif any(p.isdigit() for p in path):
            index_path = [int(p) if p.isdigit() else f"'{p}'" for p in path]
            index_path_str = ", ".join(str(p) for p in index_path)
            lines.append(f"            \"{field_name}\": get_in(entry, {index_path_str}),")
        # Handle top-level fields
        elif len(path) == 1:
            lines.append(f"            \"{field_name}\": entry.get('{path[0]}'),")
        else:
            path_str = "', '".join(path)
            lines.append(f"            \"{field_name}\": get_in(entry, '{path_str}'),")

    lines += [
        "        })",
        "    df = pd.DataFrame(records)",
        "    return df\n"
    ]
    return "\n".join(lines)

# -----------------------
# Generate Donation Flow
# -----------------------
def generate_donation_flow(schema_df: pd.DataFrame, schema_csv: pd.DataFrame) -> str:
    lines = [
        "def create_donation_flow(file_input: List[str]):",
        '    """Create donation flow from YouTube ZIP."""',
        "    tables = []",
        ""
    ]

    # Add JSON-based tables
    for file_name, _ in schema_df.groupby("json_name"):
        func = f"{snake_case(file_name)}_df"
        table = snake_case(file_name)
        lines += [
            f"    try:",
            f"        df = {func}(file_input)",
            f"        if not df.empty:",
            f"            tables.append(donation_table(name='{table}', df=df, title={{'en': '{table}'}}))",
            f"    except Exception:",
            f"        pass",
            ""
        ]

    # Add CSV-based tables
    csv_groups = schema_csv.groupby("Key")  # Key = CSV file name

    for csv_name, group in csv_groups:
        table_name = os.path.splitext(csv_name)[0]  # Strip .csv
        expected_columns = list(group["Values"].unique())
        lines += [
            f"    try:",
            f"        df = read_csv_from_file_input(file_input, '/{csv_name}')",  # uses your helper
            f"        expected_columns = {expected_columns}",
            f"        existing_columns = [col for col in expected_columns if col in df.columns]",
            f"        df = df[existing_columns]",
            f"        if not df.empty:",
            f"            tables.append(donation_table(name='{table_name}', df=df, title={{'en': '{table_name}'}}))",
            f"    except Exception:",
            f"        pass",
            ""
        ]

    lines += [
        "    if tables:",
        "        return donation_flow(id='youtube', tables=tables)",
        "    else:",
        "        return None"
    ]
    return "\n".join(lines)

# -----------------------
# Write Output to File
# -----------------------
def main():
    schema_path = "/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_structures_YT.csv"  # ← Update this path
    schema_csv_path = "/mnt/c/Users/arodilla/OneDrive - Universitat de Barcelona/BSC/WHAT-IF/SCHEMA_DATA/Merged_Column_Names_YT.csv"
    output_path = Path("/home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/youtube_generated_extractors.py")

    schema_df = pd.read_csv(schema_path)
    schema_df.columns = schema_df.columns.str.strip()

    schema_csv = pd.read_csv(schema_csv_path)
    schema_csv.columns = schema_csv.columns.str.strip()

    with open(output_path, "w", encoding="utf-8") as f:
        f.write("# Auto-generated YouTube extractors\n\n")
        f.write("import pandas as pd\n")
        f.write("import json\n")
        f.write("from typing import List\n")
        f.write("from port.helpers.donation_flow import donation_table, donation_flow\n")
        f.write("import zipfile\n")
        f.write("import fnmatch\n\n")
        f.write(inspect.getsource(read_csv_from_file_input) + "\n")
        f.write(inspect.getsource(get_in) + "\n")
        f.write(inspect.getsource(get_list) + "\n")
        f.write(inspect.getsource(snake_case) + "\n")
        f.write(inspect.getsource(read_json) + "\n")

        for file_name, group in schema_df.groupby("json_name"):
            f.write("\n\n" + generate_df_function(file_name, group))

        f.write("\n\n" + generate_donation_flow(schema_df, schema_csv))

    print(f"Extractor functions written to: {output_path}")

if __name__ == "__main__":
    main()

Extractor functions written to: /home/larodilla/BSC/WHATIF/what-if-data-donation/src/framework/processing/py/port/helpers/youtube_generated_extractors.py
