# Data Prep


The following code converts all SAMM 2.0.0 to 2.1.0.

In [1]:
import os

def replace_in_ttl_files(directory):
    """
    Recursively find all .ttl files in the given directory, replace a string, rewrite the files
    
    Args:
    directory (str): The root directory to start searching from.
    
    """
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".ttl"):
                file_path = os.path.join(root, file)
                
                # Read file content
#                 print(file_path)
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.readlines()  # Read file as a list of lines
                
                # Process each line
                new_content = []
                for line in content:
                    if line.strip().startswith('@'):
                        # Replace content
                        line = line.replace("urn:samm:org.eclipse.esmf.samm:characteristic:2.0.0#",
                                                        "urn:samm:org.eclipse.esmf.samm:characteristic:2.1.0#")

                        line = line.replace("urn:samm:org.eclipse.esmf.samm:entity:2.0.0#",
                                                        "urn:samm:org.eclipse.esmf.samm:entity:2.1.0#")

                        line = line.replace("urn:samm:org.eclipse.esmf.samm:meta-model:2.0.0#",
                                                        "urn:samm:org.eclipse.esmf.samm:meta-model:2.1.0#")

                        line = line.replace("urn:samm:org.eclipse.esmf.samm:unit:2.0.0#",
                                                        "urn:samm:org.eclipse.esmf.samm:unit:2.1.0#")
                    if line.strip().startswith('#'):
                        # Replace content
                        continue
                    new_content.append(line)
                # Write the updated content back to the file
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.writelines(new_content)


directory = 'D:/NMT Thesis/Dataset/content/sldt-semantic-models'

replace_in_ttl_files(directory)

Perform data split based on model name.

In [2]:
import os
from pathlib import Path

json_paths = []
ttl_paths = []
base_path = set()

clone_dir = 'D:\\NMT Thesis\\Dataset\\sldt-semantic-models'
for root, dirs, files in os.walk(clone_dir):
    for file in files:
          if file.endswith(".ttl"):
            ttl_path = os.path.relpath(os.path.join(root, file), clone_dir)
            ttl_paths.append(ttl_path)
            base_path.add(Path(ttl_path).parts[0])
            

In [42]:
ttl_paths

['io.catenax.asset_tracker_links\\2.0.0\\AssetTrackerLinks.ttl',
 'io.catenax.asset_tracker_links\\2.1.0\\AssetTrackerLinks.ttl',
 'io.catenax.asset_tracker_links\\2.2.0\\AssetTrackerLinks.ttl',
 'io.catenax.batch\\2.0.0\\Batch.ttl',
 'io.catenax.batch\\2.0.1\\Batch.ttl',
 'io.catenax.batch\\2.1.0\\Batch.ttl',
 'io.catenax.batch\\2.1.1\\Batch.ttl',
 'io.catenax.batch\\2.2.0\\Batch.ttl',
 'io.catenax.batch\\2.2.1\\Batch.ttl',
 'io.catenax.batch\\3.0.0\\Batch.ttl',
 'io.catenax.batch\\3.1.0\\Batch.ttl',
 'io.catenax.batch\\3.2.0\\Batch.ttl',
 'io.catenax.battery.battery_pass\\4.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\4.1.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\4.2.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.1.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.2.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\6.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\

In [3]:
from sklearn.model_selection import train_test_split

base_path_list = list(base_path)
# Split into train and test (80% train, 20% test)
train_list, test_list = train_test_split(base_path_list, test_size=0.2, random_state=42)



In [4]:
filtered_list_train = [item for item in ttl_paths if any(substring in item for substring in train_list)]
print(len(filtered_list_train))
filtered_list_train


367


['io.catenax.asset_tracker_links\\2.0.0\\AssetTrackerLinks.ttl',
 'io.catenax.asset_tracker_links\\2.1.0\\AssetTrackerLinks.ttl',
 'io.catenax.asset_tracker_links\\2.2.0\\AssetTrackerLinks.ttl',
 'io.catenax.batch\\2.0.0\\Batch.ttl',
 'io.catenax.batch\\2.0.1\\Batch.ttl',
 'io.catenax.batch\\2.1.0\\Batch.ttl',
 'io.catenax.batch\\2.1.1\\Batch.ttl',
 'io.catenax.batch\\2.2.0\\Batch.ttl',
 'io.catenax.batch\\2.2.1\\Batch.ttl',
 'io.catenax.batch\\3.0.0\\Batch.ttl',
 'io.catenax.batch\\3.1.0\\Batch.ttl',
 'io.catenax.batch\\3.2.0\\Batch.ttl',
 'io.catenax.battery.battery_pass\\4.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\4.1.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\4.2.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.1.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\5.2.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\\6.0.0\\BatteryPass.ttl',
 'io.catenax.battery.battery_pass\

In [5]:
filtered_list_test = [item for item in ttl_paths if any(substring in item for substring in test_list)]
print(len(filtered_list_test))
filtered_list_test

86


['io.catenax.certificate_of_dismantler\\1.0.1\\CertificateOfDismantler.ttl',
 'io.catenax.certificate_of_dismantler\\1.1.1\\CertificateOfDismantler.ttl',
 'io.catenax.certificate_of_dismantler\\1.2.1\\CertificateOfDismantler.ttl',
 'io.catenax.demand_and_capacity_notification\\1.0.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.demand_and_capacity_notification\\1.1.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.demand_and_capacity_notification\\1.2.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.demand_and_capacity_notification\\2.0.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.demand_and_capacity_notification\\2.1.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.demand_and_capacity_notification\\2.2.0\\DemandAndCapacityNotification.ttl',
 'io.catenax.eol_story\\2.0.0\\EndOfLife.ttl',
 'io.catenax.eol_story\\2.1.0\\EndOfLife.ttl',
 'io.catenax.eol_story\\2.2.0\\EndOfLife.ttl',
 'io.catenax.global_transport_label\\1.0.0\\GlobalTransportLabel.ttl',
 'io.catenax.glob

Construct CSV files from splited file path.

In [8]:
import csv

csv_file = "train_2024-09-05.csv"

with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    
    writer.writerow(["JSON_paths", "TTL_paths"])
    
    for ttl in filtered_list_train:
        ttl_path = Path(ttl)
        file_name = ttl_path.stem  
        json = ttl_path.parent / "gen" / f"{file_name}.json"
        writer.writerow([json, ttl])

print(f"Data has been written to {csv_file}")

Data has been written to train_2024-09-05.csv


In [9]:
def create_csv(csv_file, data_list):
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.writer(file)

        writer.writerow(["JSON_paths", "TTL_paths"])

        for ttl in data_list:
            ttl_path = Path(ttl)
            file_name = ttl_path.stem  
            json = ttl_path.parent / "gen" / f"{file_name}.json"
            writer.writerow([json, ttl])

    print(f"Data has been written to {csv_file}")
    
    
csv_file = "test_2024-09-05.csv"
create_csv(csv_file, filtered_list_test)

Data has been written to test_2024-09-05.csv


In [41]:
test_list

['io.catenax.item_stock',
 'io.catenax.reuse_certificate',
 'io.catenax.part_as_planned',
 'io.catenax.shared.bill_of_process',
 'io.catenax.manufactured_parts_quality_information',
 'io.catenax.fleet.diagnostic_data',
 'io.catenax.material.chemical_material_passport',
 'io.catenax.transmission.transmission_pass',
 'io.catenax.certificate_of_destruction',
 'io.catenax.shared.quantity']

In [10]:
base_path

{'io.catenax.asset_tracker_links',
 'io.catenax.batch',
 'io.catenax.battery.battery_pass',
 'io.catenax.battery.product_description',
 'io.catenax.bom_as_specified',
 'io.catenax.certificate_of_destruction',
 'io.catenax.certificate_of_dismantler',
 'io.catenax.certificate_signing_requests',
 'io.catenax.classified_load_spectrum',
 'io.catenax.customs_information',
 'io.catenax.days_of_supply',
 'io.catenax.decomissioning_certificate',
 'io.catenax.delivery_information',
 'io.catenax.demand_and_capacity_notification',
 'io.catenax.electric_drive.electric_drive_passport',
 'io.catenax.eol_story',
 'io.catenax.essincident',
 'io.catenax.failure_pattern',
 'io.catenax.fleet.claim_data',
 'io.catenax.fleet.diagnostic_data',
 'io.catenax.fleet.vehicles',
 'io.catenax.generic.digital_product_passport',
 'io.catenax.global_transport_label',
 'io.catenax.id_based_comment',
 'io.catenax.id_based_request_for_update',
 'io.catenax.individual_asset_definition',
 'io.catenax.iot_sensor_data',
 'io

In [26]:
len(ttl_paths)

453

In [25]:
len(json_paths)

453

# Aggregate
After running the experiments using the inference module, this code will collect and aggregate the results

## Version 1
This script performs the following tasks:

1. **Read and Filter Text Files**:
   - It traverses a specified folder structure (`root_folder`) to locate `summary.txt` files.
   - Filters the files based on matching `model_name`, `inference_type`, and `experiment_name`.

2. **Aggregate Data**:
   - Reads data from the filtered files, ensuring valid entries with exactly 4 columns.
   - Appends each entry with the file's path for context.

3. **Write to CSV**:
   - Writes the aggregated data, along with column headers, into a CSV file (`output_csv`), named based on the experiment, model, and inference type.


In [1]:
import os
import csv
from pathlib import Path

model_name = "azuregpt4o20240806SemanticAspectMetaModelV1"
inference_type = "FewShotPromptTemplate1"
experiment_name = "Test2"

def read_and_aggregate_txt_files(root_folder, output_csv):
    # List to hold the aggregated data
    aggregated_data = []
    csv_header = ['model', 'isTurtle', 'isSAMM', 'isJSON', 'filePath']

    # Traverse the folder structure
    for subdir, _, files in os.walk(root_folder):
        for file in files:
            if file.endswith('summary.txt'):
                file_path = os.path.join(subdir, file)
                path = Path(file_path)
                inference_part = path.parent.name
                # Extract experiment's name (two levels up from the file)
                experiment_part = path.parent.parent.parent.name
    
                # Extract model name (one level up from the file)
                model_part = path.parent.parent.name
                if (model_part != model_name or inference_part != inference_type or experiment_part != experiment_name):
                    continue
                with open(file_path, 'r') as f:
                    lines = f.readlines()
                    for line in lines[1:]:  # Skip the header line
                        # Split by commas to extract the data
                        data = line.strip().split(',')
                        if len(data) == 4:  # Ensure that there are exactly 4 columns
                            # Append file path to the data
                            data.append(file_path)
                            aggregated_data.append(data)

    # Write the aggregated data into a CSV file
    with open(output_csv, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(csv_header)
        writer.writerows(aggregated_data)

    print(f"Aggregated data written to {output_csv}")


root_folder = 'D:\\NMT Thesis\\Results'
output_csv = f'aggregated_summary_data_{experiment_name}_{model_name}_{inference_type}.csv'

read_and_aggregate_txt_files(root_folder, output_csv)


Aggregated data written to aggregated_summary_data_Test2_azuregpt4o20240806SemanticAspectMetaModelV1_FewShotPromptTemplate1.csv


In [2]:
import pandas as pd 
import matplotlib as mp
df = pd.read_csv(f"aggregated_summary_data_{experiment_name}_{model_name}_{inference_type}.csv")
df = df[['isTurtle','isSAMM','isJSON']]
counts = df.apply(pd.Series.value_counts)

# Filling NaN values with 0 (in case a column has no True or False)
counts = counts.fillna(0).astype(int)

In [3]:
counts


Unnamed: 0,isTurtle,isSAMM,isJSON
False,14,63,72
True,82,33,24


In [43]:
import pandas as pd

# Load CSV file
df = pd.read_csv(f"aggregated_summary_data_{experiment_name}_{model_name}_{inference_type}.csv")
# Filter rows where isJSON is True and isSAMM is False
filtered_df = df[(df['isJSON'] == True) & (df['isSAMM'] == False)]



In [None]:
filtered_df

In [128]:
df

Unnamed: 0,model,isTurtle,isSAMM,isJSON,passAt,filePath
0,io.catenax.days_of_supply/1.0.0/gen/DaysOfSupply,True,True,True,4,D:\NMT Thesis\Results\io.catenax.days_of_suppl...
1,io.catenax.days_of_supply/1.1.0/gen/DaysOfSupply,True,False,False,6,D:\NMT Thesis\Results\io.catenax.days_of_suppl...
2,io.catenax.days_of_supply/1.2.0/gen/DaysOfSupply,True,False,False,6,D:\NMT Thesis\Results\io.catenax.days_of_suppl...
3,io.catenax.days_of_supply/2.0.0/gen/DaysOfSupply,True,False,False,6,D:\NMT Thesis\Results\io.catenax.days_of_suppl...
4,io.catenax.days_of_supply/2.1.0/gen/DaysOfSupply,True,True,True,1,D:\NMT Thesis\Results\io.catenax.days_of_suppl...
...,...,...,...,...,...,...
91,io.catenax.week_based_capacity_group/2.1.0/gen...,True,True,False,6,D:\NMT Thesis\Results\io.catenax.week_based_ca...
92,io.catenax.week_based_capacity_group/2.2.0/gen...,True,True,True,2,D:\NMT Thesis\Results\io.catenax.week_based_ca...
93,io.catenax.week_based_capacity_group/3.0.0/gen...,True,False,False,6,D:\NMT Thesis\Results\io.catenax.week_based_ca...
94,io.catenax.week_based_capacity_group/3.1.0/gen...,True,True,True,2,D:\NMT Thesis\Results\io.catenax.week_based_ca...


## Version 2
- Works with new version that contains `passAt` information.
- Aggregated data saved to `output_csv`.
- Displays counts for `isTurtle`, `isSAMM`, `isJSON`, and `passAt` summary.

In [9]:
import os
import csv
from pathlib import Path
import pandas as pd

model_name = "qwen25codersammv2latest"
inference_type = "ZeroShotPromptTemplate0"
experiment_name = "T07-FineTunedQwenv2"

def read_and_aggregate_txt_files(root_folder, output_csv):
    # List to hold the aggregated data
    aggregated_data = []
    csv_header = ['model', 'isTurtle', 'isSAMM', 'isJSON','passAt', 'filePath']

    # Traverse the folder structure
    for subdir, _, files in os.walk(root_folder):
        for file in files:
            if file=='summary.txt':
                file_path = os.path.join(subdir, file)
                path = Path(file_path)
                inference_part = path.parent.name
                # Extract experiment's name (two levels up from the file)
                experiment_part = path.parent.parent.parent.name
    
                # Extract model name (one level up from the file)
                model_part = path.parent.parent.name
                if (model_part != model_name or inference_part != inference_type or experiment_part != experiment_name):
                    continue
                with open(file_path, 'r') as f:
                    lines = f.readlines()
                    for line in lines[1:]:  # Skip the header line
                        # Split by commas to extract the data
                        data = line.strip().split(',')
                        if len(data) == 5:  # Ensure that there are exactly 4 columns
                            # Append file path to the data
                            data.append(file_path)
                            aggregated_data.append(data)
                        else:
                            print('wrong')

    # Write the aggregated data into a CSV file
    with open(os.path.join(root_folder,output_csv), 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(csv_header)
        writer.writerows(aggregated_data)

    print(f"Aggregated data written to {output_csv}")


root_folder = 'D:\\NMT Thesis\\Results'
output_csv = f'aggregated_summary_data_{experiment_name}_{model_name}_{inference_type}.csv'

read_and_aggregate_txt_files(root_folder, output_csv)

df = pd.read_csv(os.path.join(root_folder,output_csv))
counts = df[['isTurtle','isSAMM','isJSON']].apply(pd.Series.value_counts)

# Filling NaN values with 0 (in case a column has no True or False)
counts = counts.fillna(0).astype(int).sort_index()
print(model_name)
display(counts)
result_passAt = pd.DataFrame(df[df['isJSON']==True]['passAt'].value_counts()).sort_index()
display(result_passAt)

Aggregated data written to aggregated_summary_data_T07-FineTunedQwenv2_qwen25codersammv2latest_ZeroShotPromptTemplate0.csv
qwen25codersammv2latest


Unnamed: 0,isTurtle,isSAMM,isJSON
False,43,91,95
True,53,5,1


Unnamed: 0,passAt
1,1
