# Put this Jupyter Notebook(code) in the same file as the input csv
# Change the settings below

In [19]:
import json
import pandas as pd
from typing import Any, Dict, List, Tuple, Callable

# Edit these rach runn
SCHEMA_CSV      = "887_Schema_mp_cs.dim_survey_df__sg_cx_view_20250916-134323.csv"
SUBMISSIONS_CSV = "887_Submissions_mp_cs.dwd_survey_submission_di__sg_cx_view_20250924-155511.csv"
OUT_CSV         = "887_survey_parsed.csv"

# column names inside your files (check this before running)
SCHEMA_COL     = "survey_schema"
RESP_JSON_COL  = "submitted_data"
ID_COL         = "response_id"

- Note that: Emojis cannot be processed and will be output in their encoding format, in the output csv. 
- Some columns are redundant, but is created due to the safeguard of the code, which is to detect all possible questions. Kindly clean the output csv file up by deleting unnecessary columns.

In [20]:
# ---------- helpers ----------
def walk_schema(schema: Dict[str, Any]) -> List[Tuple[str, Dict[str, Any]]]:
    out: List[Tuple[str, Dict[str, Any]]] = []
    def walk(n: Any) -> None:
        if isinstance(n, dict) and n.get("type") == "object" and isinstance(n.get("properties"), dict):
            # within "properties", is a nested dictionary of qn code(key), qn object(value)
            for qcode, child in n["properties"].items(): #qcode is the object code, then child is the dict
                if isinstance(child, dict):
                    out.append((qcode, child)) #grab the question/object code and return the child
                    walk(child)
    walk(schema)
    return out

def classify_kind(node: Dict[str, Any]) -> str:
    """
    One of: 'ranking', 'multiselect', 'single_select', 'rating', 'single_text'.
    """
    t = node.get("type")
    title = (node.get("title") or node.get("questionLabel") or "")
    title_lc = title.lower() if isinstance(title, str) else ""
    has_options = isinstance(node.get("options"), list) and len(node["options"]) > 0
    has_score_marks = isinstance(node.get("scoreMarks"), list) and len(node["scoreMarks"]) >= 2

    if has_score_marks:
        return "rating"
    if has_options and ("rank" in title_lc or "preference" in title_lc):
        return "ranking"
    if has_options:
        return "single_select"
    if t == "string":
        return "single_text"
    return "single_text"

def build_specs(schema: Dict[str, Any]) -> List[Dict[str, Any]]:
    specs: List[Dict[str, Any]] = []
    for code, node in walk_schema(schema):
        title_raw = node.get("title") or node.get("questionLabel") or ""
        title = title_raw.strip() if isinstance(title_raw, str) else str(title_raw)
        kind = classify_kind(node)
        specs.append({"code": code, "title": title, "kind": kind})
    return specs #specs is a list of dictionaries, "code":"af94dcb2e4", "title":"Description Area",  "kind" :"rating"

def make_title_columns_only(specs: List[Dict[str, Any]]) -> Dict[str, str]:
    """
    Map question code -> unique title (titles only; no codes).
    If duplicates appear, append ' (2)', ' (3)', ...
    Blank titles become 'no title'.
    """
    used_counts: Dict[str, int] = {}
    m: Dict[str, str] = {}
    for s in specs:
        base = s["title"].strip() if isinstance(s["title"], str) else ""
        if not base:
            base = "no title"
        used_counts[base] = used_counts.get(base, 0) + 1
        name = base if used_counts[base] == 1 else f"{base} ({used_counts[base]})"
        m[s["code"]] = name
    return m

# helper: get all schema nodes by code (so we can read options for ranking size)
def nodes_by_code(schema: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    return {code: node for code, node in walk_schema(schema)}

# ---------- extractors for non-ranking questions ----------
def ext_single_text(v: Any) -> str:
    if isinstance(v, dict):
        if v.get("label") is not None: return str(v["label"])
        if v.get("value") is not None and not isinstance(v["value"], (dict, list)): return str(v["value"])
        return ""
    if isinstance(v, (str, int, float)): return str(v)
    return ""

def ext_single_select(v: Any) -> str:
    if isinstance(v, dict) and v.get("label") is not None: return str(v["label"])
    if isinstance(v, (str, int, float)): return str(v)
    return ""

def ext_multiselect(v: Any) -> str:
    # Accept list of dicts (with 'label'), list of strings, or a single dict
    if isinstance(v, list):
        pairs: List[Tuple[str,str]] = []
        for it in v:
            if isinstance(it, dict):
                lab = it.get("label")
                val =it.get("value")
                if lab is not None and val is not None:
                    pairs.append((str(lab).strip(), str(val).strip()))
           
    return ";".join( f"{lab}({val})" for lab, val in pairs )
    

def ms_labels_and_other(v: Any) -> Tuple[str, str]:
    """
    Returns (labels_joined, other_text).
    - labels_joined: '; '.join of non-'other' labels
    - other_text: free text from the 'other' option (value), or '' if not present
    """
    labels: List[str] = []
    other_text = ""

    if isinstance(v, list):
        for it in v:
            if not isinstance(it, dict):
                if it is not None:
                    labels.append(str(it).strip())
                continue

            lab = (it.get("label") or "").strip()
            val = it.get("value")
            is_other = (
                bool(it.get("isOther")) or
                str(it.get("key", "")).lower() == "isother" or
                "other" in lab.lower()
            )

            if is_other:
                # take free text from 'value' if present
                labels.append(lab if lab else "other")
                if isinstance(val, (str, int, float)):
                    if not other_text:
                        other_text = str(val).strip()
            else:
                if lab:
                    labels.append(lab)

    return "; ".join(labels), other_text


def ext_rating(v: Any) -> str:
    if isinstance(v, (int, float)):
        return str(int(v)) if isinstance(v, float) and v.is_integer() else str(v)
    if isinstance(v, dict) and isinstance(v.get("value"), (int, float)):
        val = v["value"]
        return str(int(val)) if isinstance(val, float) and val.is_integer() else str(val)
    return ""

EXTRACTORS: Dict[str, Callable[[Any], str]] = {
    "single_text":   ext_single_text,
    "single_select": ext_single_select,
    "multiselect":   ext_multiselect,
    "rating":        ext_rating,
}

# infer ranking length from responses if schema options are absent
def infer_rank_len_from_responses(resp_df: pd.DataFrame, rank_code: str) -> int:
    max_len = 0
    for raw in resp_df[RESP_JSON_COL].tolist():
        try:
            payload = json.loads(raw) if isinstance(raw, str) else (raw or {})
        except Exception:
            payload = {}
        v = payload.get(rank_code)
        if isinstance(v, list):
            max_len = max(max_len, len(v))
    return max_len

# ---------- run ----------
# 1) read CSVs
schema_df = pd.read_csv(SCHEMA_CSV, low_memory=False, encoding="utf-8")
resp_df   = pd.read_csv(SUBMISSIONS_CSV, low_memory=False, encoding ="utf-8")

# 2) parse schema JSON
schema = json.loads(schema_df[SCHEMA_COL].iloc[0])

# 3) build specs + columns (titles only) + node lookup
specs = build_specs(schema)
code_to_col = make_title_columns_only(specs)
kind_by_code = {s["code"]: s["kind"] for s in specs}
node_map = nodes_by_code(schema)

# 3b) pre-compute ranking column expansions per ranking question
ranking_codes = [s["code"] for s in specs if s["kind"] == "ranking"]
rank_cols_by_code: Dict[str, List[str]] = {}
rank_len_by_code: Dict[str, int] = {}

for code in ranking_codes:
    node = node_map.get(code, {})
    # prefer schema options length
    n = len(node.get("options", [])) if isinstance(node.get("options"), list) else 0
    # if missing, infer from responses
    if n == 0:
        n = infer_rank_len_from_responses(resp_df, code)
    rank_len_by_code[code] = n
    base_title = code_to_col[code]  # unique title for this question
    rank_cols_by_code[code] = [f"{base_title} [{i}]" for i in range(1, n + 1)]

# 4) transform each submission row
rows: List[Dict[str, Any]] = []
other_cols_by_code: Dict[str, str] = {}  # code -> "<title> - Other"

for _, row in resp_df.iterrows(): #iterrows is pandas method to get _,row which is (index, series) pairs
    raw = row.get(RESP_JSON_COL) # a row is a pandas series respresenting the whole row, represented in a dictionary format. We get the submitted_data column here
    try:
        payload = json.loads(raw) if isinstance(raw, str) else (raw or {}) #sort of like if-else, but try, except is for error handling, while if else is for branching.
    except json.JSONDecodeError: #if json throws this error, set the payload into blank
        payload = {}

    rec: Dict[str, Any] = {} #for each row, acts as dictionary of metadata, then the column header if matches will call up this dictionary and output the value.
    if ID_COL in resp_df.columns: #ID_COL is response id
        rec[ID_COL] = row[ID_COL] #each iteration == each row, check response id for that row and create dict entry(response_id, actual number)
        rec["user_id"] = row["user_id"]
        rec["response_submitted_date"] = row["response_submitted_date"]
 
    # initialize all ranking position columns as blank
    for code in ranking_codes:
        for colname in rank_cols_by_code[code]:
            rec[colname] = ""

    # fill values
    for s in specs:
        code = s["code"]
        v    = payload.get(code)

        if s["kind"] == "ranking":
            # fill position columns instead of a single aggregated cell
            labels = []
            if isinstance(v, list):
                for item in v:
                    if isinstance(item, dict) and item.get("label") is not None:
                        labels.append(str(item["label"]))
                    elif item is not None:
                        labels.append(str(item))
            # write into [1]..[N]
            cols_for_this = rank_cols_by_code.get(code, [])
            n = len(cols_for_this)
            for i in range(n):
                rec[cols_for_this[i]] = labels[i] if i < len(labels) else ""
        else:
            colname = code_to_col[code]

            if isinstance(v, list):
                # Treat lists as multiselect and split out 'Other'
                labels_joined, other_text = ms_labels_and_other(v)
                rec[colname] = labels_joined

                if other_text != "":
                    other_col = f"{colname} - Other"
                    rec[other_col] = other_text
                    other_cols_by_code.setdefault(code, other_col)

            elif kind_by_code.get(code) == "rating":
                rec[colname] = EXTRACTORS["rating"](v)

            else:
                runtime_kind = kind_by_code.get(code, "single_text")
                rec[colname] = EXTRACTORS.get(runtime_kind, ext_single_text)(v)



    rows.append(rec)

# 5) assemble + save (order: id, then for each spec either its ranking subcolumns or the single column)
out_df = pd.DataFrame.from_records(rows) # from_records makes a DataFrame from a list of rows dict, where one dict = one row of entry. DIct keys become columns, each dict become 1 row.

ordered_cols: List[str] = []
if ID_COL in out_df.columns: ordered_cols.append(ID_COL)
if "user_id" in out_df.columns: ordered_cols.append("user_id")
if "response_submitted_date" in out_df.columns: ordered_cols.append("response_submitted_date")

for s in specs:
    if s["kind"] == "ranking":
        ordered_cols.extend(rank_cols_by_code.get(s["code"], []))
    else:
        main_col = code_to_col[s["code"]]
        ordered_cols.append(main_col)
        # add companion 'Other' column if we created it
        oc = other_cols_by_code.get(s["code"])
        if oc:
            ordered_cols.append(oc)


out_df = out_df.reindex(columns=ordered_cols)
out_df.to_csv(OUT_CSV, index=False, encoding ="utf-8-sig")
print(f"Wrote: {OUT_CSV}")


Wrote: 887_survey_parsed.csv
