Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 190 additions & 32 deletions CASE/CASE-CTDL.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from urllib.parse import urlparse
from urllib.error import URLError, HTTPError

CTDLASN_CONTEXT = "https://credreg.net/ctdlasn/schema/context/json"
# Contexts
CTDL_CONTEXT = "https://credreg.net/ctdl/schema/context/json" # Courses
CTDLASN_CONTEXT = "https://credreg.net/ctdlasn/schema/context/json" # Competency frameworks (old one)

DEFAULT_REG_BASE = "https://credentialengineregistry.org/resources/"

def _to_str(v):
Expand Down Expand Up @@ -45,13 +48,22 @@ def reg_id(base: str, ctid: str) -> str:
base += "/"
return base + ctid

def build_graphs(pkg, reg_base, output_dir):
def is_registry_ce_uri(value: str, reg_base: str = DEFAULT_REG_BASE) -> bool:
"""Check registry URI that ends with ce- GUID."""
if not isinstance(value, str):
return False
if not value.startswith("http"):
return False
return value.startswith(reg_base + "ce-")

def build_and_write(pkg, reg_base, courses_outdir, frameworks_outdir):
root = pkg.get("CFPackage") or pkg
cfdoc = root.get("CFDocument") or {}
items = root.get("CFItems") or []
assocs = root.get("CFAssociations") or []

os.makedirs(output_dir, exist_ok=True)
os.makedirs(courses_outdir, exist_ok=True)
os.makedirs(frameworks_outdir, exist_ok=True)

# Use CASE language exactly as provided (no normalization)
fw_lang = _to_str(cfdoc.get("language")).strip() if cfdoc.get("language") else None
Expand Down Expand Up @@ -85,21 +97,17 @@ def is_childof_type(t):

# Build hierarchy (destination = parent, origin = child) and capture sequenceNumber
children_of = OrderedDict() # parent -> [child ...] in input order
parents_of = {} # child -> [parent ...]
seq_for_child = {} # child -> first sequenceNumber seen
for a in assocs:
if not is_childof_type(a.get("associationType")):
continue
parent_ident = resolve_endpoint(a.get("destinationNodeURI") or a.get("destinationNodeIdentifier"))
child_ident = resolve_endpoint(a.get("originNodeURI") or a.get("originNodeIdentifier"))
child_ident = resolve_endpoint(a.get("originNodeURI") or a.get("originNodeIdentifier"))
if not parent_ident or not child_ident:
continue
children_of.setdefault(parent_ident, [])
if child_ident not in children_of[parent_ident]:
children_of[parent_ident].append(child_ident)
parents_of.setdefault(child_ident, [])
if parent_ident not in parents_of[child_ident]:
parents_of[child_ident].append(parent_ident)
if "sequenceNumber" in a and child_ident not in seq_for_child:
seq_for_child[child_ident] = str(a.get("sequenceNumber")).strip()

Expand All @@ -122,6 +130,8 @@ def is_childof_type(t):
"@type": "ceterms:Course",
"ceterms:ctid": ce_ctid,
}
if fw_lang:
course_node["ceterms:inLanguage"] = fw_lang
if name:
course_node["ceterms:name"] = {fw_lang: name} if fw_lang else name
if desc and desc != name:
Expand All @@ -139,7 +149,6 @@ def is_childof_type(t):
}
if fw_lang:
comp_node["ceasn:inLanguage"] = fw_lang
# Language-mapped fields
fs = it.get("fullStatement")
if fs:
comp_node["ceasn:competencyText"] = {fw_lang: fs} if fw_lang else fs
Expand Down Expand Up @@ -195,14 +204,119 @@ def expand_competency_subtree(root_guid, seen):
ttext = comp["ceasn:competencyLabel"]
teaches.append({
"@type": "ceterms:CredentialAlignmentObject",
"ceterms:framework": None, # filled in the per-course framework pass
"ceterms:framework": None,
"ceterms:targetNode": comp["@id"],
"ceterms:frameworkName": ({fw_lang: ""} if fw_lang else ""),
"ceterms:targetNodeName": ({fw_lang: ttext} if fw_lang else ttext)
})
course_node["ceterms:teaches"] = teaches

# Create one framework JSON per course
# Validation report structures (only add entries when there are errors)
validation_report = {
"frameworks": [],
"competencies": [],
"courses": [],
"summary": {}
}

def add_fw_validation(fw_node, errors):
if errors:
validation_report["frameworks"].append({
"@id": fw_node.get("@id"),
"errors": errors
})

def add_comp_validation(comp_node, errors):
if errors:
validation_report["competencies"].append({
"@id": comp_node.get("@id"),
"errors": errors
})

def add_course_validation(course_node, errors):
if errors:
validation_report["courses"].append({
"@id": course_node.get("@id"),
"errors": errors
})

# Validators
def validate_framework(fw_node):
errs = []
if not fw_node.get("ceterms:ctid"):
errs.append("Missing ceterms:ctid")
if not fw_node.get("ceasn:name"):
errs.append("Missing ceasn:name")
if not fw_node.get("ceasn:description"):
errs.append("Missing ceasn:description")
il = fw_node.get("ceasn:inLanguage")
if not il or not isinstance(il, list) or not il[0]:
errs.append("Missing ceasn:inLanguage")
pub = fw_node.get("ceasn:publisher")
if not pub:
errs.append("Missing ceasn:publisher (must be CE registry URI)")
elif isinstance(pub, list):
if not all(is_registry_ce_uri(p, reg_base) for p in pub):
errs.append("ceasn:publisher must be CE registry URI(s)")
elif isinstance(pub, str):
if not is_registry_ce_uri(pub, reg_base):
errs.append("ceasn:publisher must be CE registry URI")
else:
errs.append("ceasn:publisher must be CE registry URI (string or list)")
return errs

def validate_competency(c):
errs = []
if not c.get("ceterms:ctid"):
errs.append("Missing ceterms:ctid")
ct = c.get("ceasn:competencyText")
if not ct or (isinstance(ct, dict) and not any(ct.values())):
errs.append("Missing ceasn:competencyText")
if not c.get("ceasn:isPartOf"):
errs.append("Missing ceasn:isPartOf")
return errs

def validate_course(crs):
errs = []
# Required identifiers and basic fields
if not crs.get("ceterms:ctid"):
errs.append("Missing ceterms:ctid")
nm = crs.get("ceterms:name")
if not nm or (isinstance(nm, dict) and not any(nm.values())):
errs.append("Missing ceterms:name")
desc = crs.get("ceterms:description")
if not desc or (isinstance(desc, dict) and not any(desc.values())):
errs.append("Missing ceterms:description")
if not crs.get("ceterms:inLanguage"):
errs.append("Missing ceterms:inLanguage")
if not crs.get("ceterms:lifeCycleStatusType"):
errs.append("Missing ceterms:lifeCycleStatusType")
owned = crs.get("ceterms:ownedBy")
offered = crs.get("ceterms:offeredBy")
if not owned and not offered:
errs.append("Missing ceterms:ownedBy or ceterms:offeredBy (one required)")

def _validate_org_field(val, field_name):
if val is None:
return
if isinstance(val, str):
if not is_registry_ce_uri(val, reg_base):
errs.append(f"{field_name} must be a CE registry URI")
elif isinstance(val, list):
bad = [x for x in val if not isinstance(x, str) or not is_registry_ce_uri(x, reg_base)]
if bad:
errs.append(f"{field_name} must be CE registry URI(s)")
else:
errs.append(f"{field_name} must be a CE registry URI (string or list)")

_validate_org_field(owned, "ceterms:ownedBy")
_validate_org_field(offered, "ceterms:offeredBy")

return errs

total_competencies = 0

# For each course: create a framework (+ write both files)
for course_ident, course_node in courses.items():
course_ctid = course_node["ceterms:ctid"]

Expand All @@ -213,7 +327,7 @@ def expand_competency_subtree(root_guid, seen):

# Framework name (avoid double-encoding if name already a language map)
course_name_field = course_node.get("ceterms:name", {})
if isinstance(course_name_field, dict) and fw_lang in course_name_field:
if isinstance(course_name_field, dict) and (fw_lang and fw_lang in course_name_field):
fw_name_val = course_name_field[fw_lang]
else:
fw_name_val = _to_str(course_name_field)
Expand Down Expand Up @@ -250,9 +364,7 @@ def expand_competency_subtree(root_guid, seen):
for guid in subtree:
base = competencies[guid]
node_copy = dict(base)
# ceasn:isPartOf (replace inFramework)
node_copy["ceasn:isPartOf"] = fw_atid
# ensure @id remains the registry id already set
comp_nodes[guid] = node_copy

# Add local hasChild and reciprocal isChildOf
Expand All @@ -274,37 +386,83 @@ def expand_competency_subtree(root_guid, seen):
aln["ceterms:framework"] = fw_atid
aln["ceterms:frameworkName"] = fw_node["ceasn:name"]

# Write one JSON per framework
graph = {
# Validate framework + its competencies now (only record if errors exist)
fw_errs = validate_framework(fw_node)
add_fw_validation(fw_node, fw_errs)

for comp in comp_nodes.values():
comp_errs = validate_competency(comp)
add_comp_validation(comp, comp_errs)

# --- Write one JSON per framework (CTDLASN context) ---
framework_graph = {
"@context": CTDLASN_CONTEXT,
"@id": fw_atid,
"@graph": [fw_node] + list(comp_nodes.values())
}
filename = os.path.join(output_dir, f"framework_{course_ctid}.json")
with open(filename, "w", encoding="utf-8") as f:
json.dump(graph, f, ensure_ascii=False, indent=2)

return courses
fw_filename = os.path.join(frameworks_outdir, f"framework_{course_ctid}.json")
with open(fw_filename, "w", encoding="utf-8") as f:
json.dump(framework_graph, f, ensure_ascii=False, indent=2)

# --- Write one JSON per course (CTDL context) ---
course_graph_single = {
"@context": CTDL_CONTEXT,
"@graph": [course_node]
}
course_filename = os.path.join(courses_outdir, f"course_{course_ctid}.json")
with open(course_filename, "w", encoding="utf-8") as f:
json.dump(course_graph_single, f, ensure_ascii=False, indent=2)

total_competencies += len(comp_nodes)

# Validate courses (after teaches filled)
for crs in courses.values():
errs = validate_course(crs)
add_course_validation(crs, errs)

# Summarize validations and totals
total_courses = len(courses)
fw_err_count = len(validation_report["frameworks"])
comp_err_count = len(validation_report["competencies"])
course_err_count = len(validation_report["courses"])

validation_report["summary"] = {
"framework_count": total_courses, # 1 framework per course
"framework_error_count": fw_err_count, # frameworks with errors
"competency_count": total_competencies, # total competencies across frameworks
"competency_error_count": comp_err_count, # competencies with errors
"course_count": total_courses, # total courses written
"course_error_count": course_err_count # courses with errors
}

return validation_report

def main():
try:
url = input("Enter CASE CFPackage URL: ").strip()
output_dir = input("Output folder for frameworks [frameworks_out]: ").strip() or "frameworks_out"
out_courses = input("Courses output [courses.json]: ").strip() or "courses.json"
courses_outdir = input("Output folder for COURSE files [courses_out]: ").strip() or "courses_out"
frameworks_outdir = input("Output folder for FRAMEWORK files [frameworks_out]: ").strip() or "frameworks_out"
out_valid = input("Validation report [validations.json]: ").strip() or "validations.json"
reg_base = input(f"Registry base URL [{DEFAULT_REG_BASE}]: ").strip() or DEFAULT_REG_BASE

print("Fetching CASE package...")
pkg = fetch_json(url)
print("Building individual course frameworks + courses...")
courses = build_graphs(pkg, reg_base, output_dir)

# Write combined courses.json
course_graph = {"@context": CTDLASN_CONTEXT, "@graph": list(courses.values())}
with open(out_courses, "w", encoding="utf-8") as f:
json.dump(course_graph, f, ensure_ascii=False, indent=2)
print("Creating individual course and framework JSON files...")
validation_report = build_and_write(pkg, reg_base, courses_outdir, frameworks_outdir)

with open(out_valid, "w", encoding="utf-8") as f:
json.dump(validation_report, f, ensure_ascii=False, indent=2)

summary = validation_report["summary"]
print(f"Created {summary['course_count']} course JSON files in '{courses_outdir}' (CTDL)")
print(f"Created {summary['framework_count']} framework JSON files in '{frameworks_outdir}' (CTDLASN)")
print("— Validation summary —")
print(f"Framework errors: {summary['framework_error_count']}")
print(f"Competency errors: {summary['competency_error_count']}")
print(f"Course errors: {summary['course_error_count']}")
if summary['framework_error_count'] or summary['competency_error_count'] or summary['course_error_count']:
print(f"Validation details saved to {out_valid}")

print(f"Created {len(courses)} framework JSON files in '{output_dir}'")
print(f"Wrote {len(courses)} courses to {out_courses}")
except (HTTPError, URLError) as e:
print(f"Network/HTTP error: {e}")
except json.JSONDecodeError as e:
Expand Down
Loading