In [1]:
import os
import glob
import json
import tqdm
import zipfile
import io, json, re, zipfile, pickle, html
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
DATA_PATH = "/home/azureuser/cloudfiles/code/Users/a.melzer/content"

In [2]:
def as_list(x):
    if x is None:
        return []
    if isinstance(x, list):
        return x
    # Some lists are stored under "____listValues"
    if isinstance(x, dict) and "____listValues" in x:
        return x["____listValues"]
    return [x]

def get_field_value_list(doc: Dict[str, Any]) -> List[Dict[str, Any]]:
    try:
        return as_list(doc["entityHolderMap"]["nl-NL"]["dynamicEntity"]["fieldValueList"]["____listValues"])
    except KeyError:
        return []

def find_field(field_values: List[Dict[str, Any]], wanted_prefix: str) -> Optional[Any]:
    for fv in field_values:
        fid = fv.get("fieldId", "")
        if fid.startswith(wanted_prefix):
            val = fv.get("value")
            # ClobDynamicValue: nested value under value["value"]
            if isinstance(val, dict) and "value" in val:
                return val["value"]
            return val
    return None

TAG_ID = re.compile(r"\[\[--ContentED\.([a-z0-9]+)\|\|([^|]+)\|\|([^|]+)\|\|([^-\]]+)--\]\]", re.IGNORECASE)

def extract_content_links(html_text: str) -> List[Dict[str, str]]:
    out = []
    for m in TAG_ID.finditer(html_text or ""):
        out.append({
            "content_id": m.group(1),
            "link_title": m.group(2),
            "km_id": m.group(3),
            "type": m.group(4)
        })
    return out

def strip_html(html_text: Optional[str]) -> str:
    if not html_text:
        return ""
    # quick & decent: remove tags; preserve <br> as newline first
    t = html_text.replace("<br>", "\n").replace("<br/>", "\n").replace("<br />", "\n")
    t = re.sub(r"<\/p\s*>", "\n", t, flags=re.I)
    t = re.sub(r"<[^>]+>", "", t)
    return html.unescape(re.sub(r"\n{3,}", "\n\n", t)).strip()

def get_id(doc: Dict[str, Any]) -> Optional[str]:
    try:
        return doc["entityHolderMap"]["nl-NL"]["dynamicEntity"]["id"]
    except KeyError:
        # some files also mirror an "id" at top-level of the nl-NL object
        return doc.get("entityHolderMap", {}).get("nl-NL", {}).get("id")

def get_tags(doc: Dict[str, Any], field_name: str) -> List[str]:
    # e.g. "topic", "agentskill", "knowledgeBase"
    fields = get_field_value_list(doc)
    val = find_field(fields, f"{field_name}::::")
    # Tag sets come wrapped; walk to selection list
    if isinstance(val, dict):
        try:
            sel = val["value"]["tagSetSelectionList"]["____listValues"]
            return list(sel) if isinstance(sel, list) else []
        except Exception:
            return []
    return []

def get_comment_list(doc: Dict[str, Any]) -> List[str]:
    try:
        return as_list(doc["entityHolderMap"]["nl-NL"]["commentList"]["____listValues"])
    except KeyError:
        return []


In [3]:
def get_all_json_docs():
    jar_paths = glob.glob(DATA_PATH + "/*cleansed.jar")
    docs = {}
    for jar_path in jar_paths:  
        with zipfile.ZipFile(jar_path, "r") as zf:
            members = [m for m in zf.namelist() if m.lower().endswith(".json")]
            for i, name in tqdm.tqdm(enumerate(members, 1)):
                try:
                    with zf.open(name, "r") as fh:
                        raw = fh.read()
                    doc = json.loads(raw)
                    docs[name] = doc
                except Exception as e:
                    print(str(e))
    return docs
    

In [4]:
def extract_json(filename,doc):
    fvals = get_field_value_list(doc)
    title = find_field(fvals, "title::::")
    public_html = find_field(fvals,"publicAnswer::::")
    private_html = find_field(fvals, "privateAnswer::::")

    # some files mirror privateAnswer at nl-NL level too; fallback
    if not private_html:
        private_html = doc.get("entityHolderMap", {}).get("nl-NL", {}).get("privateAnswer")
    if not public_html:
        public_html = doc.get("entityHolderMap", {}).get("nl-NL", {}).get("publicAnswer")
    rec = {
        "source_file": filename,
        "id": get_id(doc),
        "title": title,
        "private_answer_html": private_html,
        "private_answer_text": strip_html(private_html),
        "publicAnswer_html":  public_html,
        "publicAnswer_text":  strip_html(public_html),
        "links_in_private_answer": extract_content_links(private_html or ""),
        "topic_tags": get_tags(doc, "topic"),
        "agentskill_tags": get_tags(doc, "agentskill"),
        "knowledgebase_tags": get_tags(doc, "knowledgeBase"),
        "must_read": bool(find_field(fvals, "mustRead::::") == "true"),
        'full_text' :  "Public Answer: " + strip_html(public_html)+ ' Private Answer: ' + strip_html(private_html)
    } 
    return rec

In [5]:
extracted = {}
json_docs = get_all_json_docs()
for key, doc in json_docs.items():
    extracted_data = extract_json(key,doc)
    extracted[key] = extracted_data
extracted_df = pd.DataFrame(extracted.values())

3477it [00:01, 2611.44it/s]
2672it [00:00, 2911.83it/s]


In [6]:
def get_duplicates_details(extracted_df):
    title_counts = extracted_df.groupby("title").size().reset_index(name="count")
    dupes = title_counts[title_counts["count"] > 1]
    dupe_details = (
        extracted_df[extracted_df["title"].isin(dupes["title"])]
        .groupby("title")[["id", "source_file"]]
        .agg(list)
        .reset_index()
    )

In [7]:
extracted_df_nodupes = extracted_df.drop_duplicates(subset='title')

### Koppelling met KM nummer

In [8]:
km_title_map = pd.read_excel(DATA_PATH + '/titel_km_map.xlsx').rename(columns={'Content Title':'title',"Article Identifier": 'km_nummer'})
km_title_map = km_title_map.drop_duplicates('title').set_index('title')
extracted_df_km= extracted_df_nodupes.set_index('title').join(km_title_map).dropna(subset='km_nummer')

In [9]:
extracted_df_km.to_pickle(DATA_PATH + "/extracted_df_km.pkl")

### Lijst met duplicaten exporteren

In [10]:
extracted_df.set_index("id").loc[extracted_df.set_index("id").index.difference(extracted_df_nodupes.set_index("id").index)].to_excel("dupelist.xlsx")