In [None]:
import sys
sys.path.append("..")
%reload_ext autoreload
%autoreload 2
import os
import pandas as pd
import pyreadstat as pyr
import json
import numpy as np
from spss_import import read_sav
from lxml import etree
from xml_functions import remove_empty_elements, add_cdi_element, add_identifier, add_ddiref
pd.set_option('display.max_rows', 2500)
pd.set_option('display.max_columns', None)
pd.options.mode.chained_assignment = None

In [None]:
#help(pyr.read_sav)

In [None]:
spssfile = f"files/SPSS_Example2.sav"
df, df_meta = read_sav(spssfile)
df.head()

In [None]:
Label = df_meta.column_names_to_labels
Values = df_meta.variable_value_labels
Missing = df_meta.missing_ranges
Format = df_meta.original_variable_types
Measure = df_meta.variable_measure

In [None]:
# DataStore
def generate_DataStore(df_meta):
    element = add_cdi_element(root, 'DataStore')
    add_cdi_element(element, 'allowsDuplicates', "false")
    add_identifier(element, f"#dataStore")
    add_cdi_element(element, 'recordCount', df_meta.number_rows)
    LogicalRecord = add_cdi_element(element, 'DataStore_has_LogicalRecord')
    add_ddiref(LogicalRecord, f"#logicalRecord", agency, "LogicalRecord")

In [None]:
# logicalRecord
def generate_LogicalRecord(df_meta):
    element = add_cdi_element(root, 'LogicalRecord')
    add_identifier(element, f"#logicalRecord")
    LogicalRecord_organizes_DataSet = add_cdi_element(element, 'LogicalRecord_organizes_DataSet')
    add_ddiref(LogicalRecord_organizes_DataSet, f"#wideDataSet", agency, "WideDataSet")
    for idx, variable in enumerate(df_meta.column_names):
        LogicalRecord_has_InstanceVariable = add_cdi_element(element, 'LogicalRecord_has_InstanceVariable')
        add_ddiref(LogicalRecord_has_InstanceVariable, f"#instanceVariable-{variable}", agency, "InstanceVariable")

In [None]:
# WideDataSet
def generate_WideDataSet(df_meta):       
    element = add_cdi_element(root, 'WideDataSet')
    add_identifier(element, f"#wideDataSet")
    DataSet_isStructuredBy_DataStructure = add_cdi_element(element, 'DataSet_isStructuredBy_DataStructure')
    add_ddiref(DataSet_isStructuredBy_DataStructure, f"#wideDataStructure", agency, "WideDataStructure")

In [None]:
# WideDataStructure
def generate_WideDataStructure(df_meta):
    element = add_cdi_element(root, 'WideDataStructure')
    add_identifier(element, f"#wideDataStructure")
    
    DataStructure_has_DataStructureComponent = add_cdi_element(element, 'DataStructure_has_DataStructureComponent')
    add_ddiref(DataStructure_has_DataStructureComponent, f"#identifierComponent-{df_meta.column_names[0]}", agency, "IdentifierComponent")
    
    for x, variable in enumerate(df_meta.column_names[1:]): 
        DataStructure_has_DataStructureComponent = add_cdi_element(element, 'DataStructure_has_DataStructureComponent')
        add_ddiref(DataStructure_has_DataStructureComponent, f"#measureComponent-{variable}", agency, "MeasureComponent")

    DataStructure_has_PrimaryKey = add_cdi_element(element, 'DataStructure_has_PrimaryKey')
    add_ddiref(DataStructure_has_PrimaryKey, f"#primaryKey", agency, "PrimaryKey")

In [None]:
# IdentifierComponent
def generate_IdentifierComponent(df_meta):
    element = add_cdi_element(root, 'IdentifierComponent')
    add_identifier(element, f"#identifierComponent-{df_meta.column_names[0]}")
    DataStructureComponent_isDefinedBy_RepresentedVariable = add_cdi_element(element, 'DataStructureComponent_isDefinedBy_RepresentedVariable')
    add_ddiref(DataStructureComponent_isDefinedBy_RepresentedVariable, f"#instanceVariable-{df_meta.column_names[0]}", agency, "InstanceVariable")

In [None]:
# MeasureComponent
def generate_MeasureComponent(df_meta):
    for x, variable in enumerate(df_meta.column_names[1:]): 
        MeasureComponent = add_cdi_element(root, 'MeasureComponent')
        add_identifier(MeasureComponent, f"#measureComponent-{variable}")
        DataStructureComponent_isDefinedBy_RepresentedVariable = add_cdi_element(MeasureComponent, 'DataStructureComponent_isDefinedBy_RepresentedVariable')
        add_ddiref(DataStructureComponent_isDefinedBy_RepresentedVariable, f"#instanceVariable-{df_meta.column_names[0]}", agency, "InstanceVariable")

In [None]:
# PrimaryKey
def generate_PrimaryKey(df_meta):
    element = add_cdi_element(root, 'PrimaryKey')
    add_identifier(element, f"#primaryKey")
    PrimaryKey_isComposedOf_PrimaryKeyComponent = add_cdi_element(element, 'PrimaryKey_isComposedOf_PrimaryKeyComponent')
    add_ddiref(PrimaryKey_isComposedOf_PrimaryKeyComponent, f"#primaryKeyComponent", agency, "PrimaryKeyComponent")

In [None]:
# PrimaryKeyComponent
def generate_PrimaryKeyComponent(df_meta):
    element = add_cdi_element(root, 'PrimaryKeyComponent')
    add_identifier(element, f"#primaryKeyComponent")
    PrimaryKeyComponent_correspondsTo_DataStructureComponent = add_cdi_element(element, 'PrimaryKeyComponent_correspondsTo_DataStructureComponent')
    add_ddiref(PrimaryKeyComponent_correspondsTo_DataStructureComponent, f"#identifierComponent-{df_meta.column_names[0]}", agency, "IdentifierComponent")

In [None]:
def generate_InstanceVariable(df_meta):
    # Iterate through column names and associated index
    for idx, variable in enumerate(df_meta.column_names):
        element = add_cdi_element(root, 'InstanceVariable')
        displayLabel = add_cdi_element(element, 'displayLabel')
        languageSpecificString = add_cdi_element(displayLabel, 'languageSpecificString')
        add_cdi_element(languageSpecificString, 'content', f"{df_meta.column_labels[idx]}")
        add_identifier(element, f"#instanceVariable-{variable}")
        name = add_cdi_element(element, 'name')
        add_cdi_element(name, 'name', f"{variable}")
        hasIntendedDataType = add_cdi_element(element, 'hasIntendedDataType')
        add_cdi_element(hasIntendedDataType, 'name', f"{df_meta.original_variable_types[variable]}")

        # Check if variable has sentinel concepts
        if variable in df_meta.missing_ranges or (len(df_meta.missing_ranges) == 0 and variable in df_meta.missing_user_values):
            RepresentedVariable_takesSentinelValuesFrom_SentinelValueDomain = add_cdi_element(element, 'RepresentedVariable_takesSentinelValuesFrom_SentinelValueDomain')
            add_ddiref(RepresentedVariable_takesSentinelValuesFrom_SentinelValueDomain, f"#sentinelValueDomain-{variable}", agency, "SentinelValueDomain")
        RepresentedVariable_takesSubstantiveValuesFrom_SubstantiveValueDomain = add_cdi_element(element, 'RepresentedVariable_takesSubstantiveValuesFrom_SubstantiveValueDomain')
        add_ddiref(RepresentedVariable_takesSubstantiveValuesFrom_SubstantiveValueDomain, f"#substantiveValueDomain-{variable}", agency, 'SubstantiveValueDomain')

In [None]:
# SubstantiveValueDomain
def generate_SubstantiveValueDomain(df_meta):
    for var in df_meta.column_names:
        element = add_cdi_element(root, 'SubstantiveValueDomain')
        add_identifier(element, f"#substantiveValueDomain-{var}")
        if var in df_meta.variable_value_labels:
            SubstantiveValueDomain_takesValuesFrom_EnumerationDomain = add_cdi_element(element, 'SubstantiveValueDomain_takesValuesFrom_EnumerationDomain')
            add_ddiref(SubstantiveValueDomain_takesValuesFrom_EnumerationDomain, f"#substantiveCodelist-{var}", agency, 'CodeList')
        SubstantiveValueDomain_isDescribedBy_ValueAndConceptDescription = add_cdi_element(element, 'SubstantiveValueDomain_isDescribedBy_ValueAndConceptDescription')
        add_ddiref(SubstantiveValueDomain_isDescribedBy_ValueAndConceptDescription, f"#substantiveValueAndConceptDescription-{var}", agency, "ValueAndConceptDescription")


In [None]:
# SubstantiveCodeList
def generate_CodeList(df_meta):
    # Determine the relevant variables based on the presence of missing values
    relevant_variables = df_meta.missing_ranges if len(df_meta.missing_ranges) > 0 else df_meta.missing_user_values

    for variable_name, values_dict in df_meta.variable_value_labels.items():
        element = add_cdi_element(root, 'CodeList')
        add_identifier(element, f"#substantiveCodelist-{variable_name}")

        name = add_cdi_element(element, 'name')
        add_cdi_element(name, 'name', f"#substantiveCodelist-{variable_name}")
        
        add_cdi_element(element, 'allowsDuplicates', "false")

        excluded_values = set()

        # Check if variable_name is in relevant_variables
        if variable_name in relevant_variables:

            # If the relevant variable data is based on ranges and contains dictionaries
            if isinstance(relevant_variables[variable_name], list) and all(
                    isinstance(item, dict) for item in relevant_variables[variable_name]):
                for dict_range in relevant_variables[variable_name]:
                    lo_is_numeric = isinstance(dict_range['lo'], (int, float)) or (
                            isinstance(dict_range['lo'], str) and dict_range['lo'].isnumeric()
                    )
                    hi_is_numeric = isinstance(dict_range['hi'], (int, float)) or (
                            isinstance(dict_range['hi'], str) and dict_range['hi'].isnumeric()
                    )

                    if lo_is_numeric and hi_is_numeric:
                        excluded_values.update(
                            range(int(float(dict_range['lo'])), int(float(dict_range['hi'])) + 1)
                        )
                    elif isinstance(dict_range['lo'], str):
                        excluded_values.add(dict_range['lo'])
                    else:
                        print(f"Warning: Unsupported 'lo' value: {dict_range['lo']}")

            # If the relevant variable data contains strings (user-defined missing values)
            elif isinstance(relevant_variables[variable_name], list):
                excluded_values.update(set(map(str, relevant_variables[variable_name])))

        # Use list comprehension to generate the hasTopConcept list
        has_top_concept = []
        for value in values_dict.keys():
            excluded_values_str = {str(i) for i in excluded_values}
            if (not value in excluded_values) and (not str(value) in excluded_values_str) :
                CodeList_has_Code = add_cdi_element(element, 'CodeList_has_Code')
                add_ddiref(CodeList_has_Code, f"#code-{value}-{variable_name}", agency, "Code")

In [None]:
# Code
def generate_Code(df_meta):
    for variable_name, values_dict in df_meta.variable_value_labels.items():
        for key, value in values_dict.items():
            element = add_cdi_element(root, 'Code')
            add_identifier(element, f"#code-{key}-{variable_name}")
            Code_denotes_Category = add_cdi_element(element, 'Code_denotes_Category')
            add_ddiref(Code_denotes_Category, f"#category-{value}", agency, "Category")
            Code_uses_Notation = add_cdi_element(element, 'Code_uses_Notation')
            add_ddiref(Code_uses_Notation, f"#notation-{key}", agency, "Notation")

In [None]:
# Category and Notation
def generate_Category_Notation(df_meta):
    notations = list(set(key for values_dict in df_meta.variable_value_labels.values() for key in values_dict.keys()))
    cats = list(set(value for values_dict in df_meta.variable_value_labels.values() for value in values_dict.values()))
    
    for cat in cats:
        element = add_cdi_element(root, 'Category')
        displayLabel = add_cdi_element(element, 'displayLabel')
        languageSpecificString = add_cdi_element(displayLabel, 'languageSpecificString')
        add_cdi_element(languageSpecificString, 'content', f"{cat}")
        add_identifier(element, f"#category-{cat}")
        name = add_cdi_element(element, 'name')
        add_cdi_element(name, 'name', f"{cat}")

    for note in notations:
        element = add_cdi_element(root, 'Notation')
        content = add_cdi_element(element, 'content')
        add_cdi_element(content, 'content', f"{note}")
        add_identifier(element, f"#notation-{note}")

In [None]:

# ValueAndConceptDescription
def generate_ValueAndConceptDescription(df_meta):

    
    # Determine the relevant variables based on the presence of missing values
    relevant_variables = {}
    if df_meta.missing_ranges:
        relevant_variables = df_meta.missing_ranges
    elif df_meta.missing_user_values:
        relevant_variables = df_meta.missing_user_values

    json_ld_data = []

    # recode classification level
    class_level = {'nominal': 'Nominal', 'scale': 'Continuous', 'ordinal': 'Ordinal'}
    for variable in df_meta.column_names:
        element = add_cdi_element(root, 'ValueAndConceptDescription')
        add_cdi_element(element, 'classificationLevel', f"{class_level[df_meta.variable_measure[variable]]}")
        add_identifier(element, f"#substantiveValueAndConceptDescription-{variable}")

        # Add sentinelValueAndConceptDescription only if the condition is met
        if variable in relevant_variables:
            values = relevant_variables[variable]
            if isinstance(values[0], dict):  # Check if the values are dictionaries
                all_lo_values = [d['lo'] for d in values]
                all_hi_values = [d['hi'] for d in values]
                min_val = min(all_lo_values)
                max_val = max(all_hi_values)
            else:
                min_val, max_val = min(values), max(values)
        
            element = add_cdi_element(root, 'ValueAndConceptDescription')
            description = add_cdi_element(element, 'description')
            languageSpecificString = add_cdi_element(description, 'languageSpecificString')
            add_cdi_element(languageSpecificString, 'content', str(values))
            add_identifier(element, f"#sentinelValueAndConceptDescription-{variable}")
            add_cdi_element(element, "maximumValueExclusive", str(max_val))
            add_cdi_element(element, "minimumValueExclusive", str(min_val))

            json_ld_data.append({
                "@id": f"#sentinelValueAndConceptDescription-{variable}",
                "@type": "ValueAndConceptDescription",
                "description": str(values),
                "minimumValueExclusive": str(min_val),
                "maximumValueExclusive": str(max_val),
            })


In [None]:
# Define the namespace
nsmap = {'cdi': 'http://ddialliance.org/Specification/DDI-CDI/1.0/XMLSchema/'}
# Create the root element
root = etree.Element(etree.QName(nsmap['cdi'], 'DDICDIModels'), nsmap=nsmap)
root.set('{http://www.w3.org/2001/XMLSchema-instance}schemaLocation',
         'http://ddialliance.org/Specification/DDI-CDI/1.0/XMLSchema/ https://ddi-cdi-resources.bitbucket.io/2023-11-12/encoding/xml-schema/ddi-cdi.xsd')
agency='int.esseric'

In [None]:
generate_DataStore(df_meta)
generate_LogicalRecord(df_meta)
generate_WideDataSet(df_meta)
generate_WideDataStructure(df_meta)
generate_IdentifierComponent(df_meta)
generate_MeasureComponent(df_meta)
generate_PrimaryKey(df_meta)
generate_PrimaryKeyComponent(df_meta)
generate_InstanceVariable(df_meta)
generate_SubstantiveValueDomain(df_meta)
generate_ValueAndConceptDescription(df_meta)
generate_CodeList(df_meta)
generate_Code(df_meta)
generate_Category_Notation(df_meta)

In [None]:
# Add XML declaration and write XML file
xml_string = etree.tostring(root, encoding='UTF-8', xml_declaration=True, pretty_print=True)

# Add the comment as the second line
xml_string_with_comment = xml_string.replace(b'?>', b'?>\n<!-- CDI, version 1, 2024.01.20 -->', 1)

with open(r'files/CDI.xml', 'wb') as f:
    f.write(xml_string_with_comment)

##### 