In [2]:
from src.helpers import json_to_owl, parse_rdf_constraints, extract_relevant_constraints
import json
import re
from rdflib import Graph, Namespace, Literal, RDF, RDFS, URIRef
from rdflib.namespace import XSD, OWL
import datetime
from logs_simulation.simulation_process import *
import sqlite3
import csv

with open("visuals/HSL248.json", "r", encoding="utf-8") as f:
    file_json = json.load(f)

graph = json_to_owl(file_json, namespace="urn:crawlercrane-ontology#")

g = Graph()
output_file = "ontologies/crawler_crane_ontology.ttl"
with open(output_file, "w", encoding="utf-8") as out_file:
    out_file.write(graph.serialize(format="turtle"))
g.parse("ontologies/crawler_crane_ontology.ttl", format="turtle")

<Graph identifier=N18d5f2157cbc4d8ebacb2eb20156bf49 (<class 'rdflib.graph.Graph'>)>

In [3]:
rdf_constraints = parse_rdf_constraints(output_file)

In [9]:
def check_constraints_for_row(row, constraints_dict):
    """
    Check if a single telemetry row meets the constraints.

    row is a tuple: (id, timestamp, machine_id, component, sensor_name, sensor_value, sensor_unit)

    Returns True if the row passes all known constraints, or False if it fails any constraint.
    """
    row_id, ts, machine_id, component, sensor_name, sensor_value, sensor_unit = row

    if component not in constraints_dict:
        # If we have no constraints for this component, assume it's valid
        return True

    # Retrieve constraints for the component
    comp_constraints = constraints_dict[component]

    # We might want to do something more sophisticated, but as an example:
    #   - check keys that begin with "max" as maximum values
    #   - check keys that begin with "min" as minimum values
    #   - check other custom constraint keys if relevant
    #
    # In this script, we do a naive approach:
    #   If sensor_name is "hydraulicpressure" and there's a "maxHydraulicPressure" constraint, check it
    #   If sensor_name is "maintenanceInterval" and there's "maintenanceIntervalCheck", etc.
    # A more general approach might parse all keys and see which one lines up with sensor_name or do
    # some naming convention matching.

    for constraint_key, constraint_info in comp_constraints.items():
        constraint_val = constraint_info['value']
        constraint_unit = constraint_info['unit']

        # 1) Check unit matching (simple version)
        #    If sensor_unit != constraint_unit, skip or do a conversion if you have logic for that
        if sensor_unit != constraint_unit and 'working hours' not in sensor_unit:
            # In a real scenario, you might handle conversions or partial matches differently
            continue

        # 2) Detect if it is a max or min type constraint (based on the name)
        if constraint_key.lower().startswith('max'):
            # e.g. "maxHydraulicPressure"
            if sensor_value > constraint_val:
                # This row fails the "max" constraint
                return False
        elif constraint_key.lower().startswith('min'):
            # e.g. "minHydraulicPressure"
            if sensor_value < constraint_val:
                # This row fails the "min" constraint
                return False
        elif 'interval' in constraint_key.lower():
            # Example: "maintenanceIntervalCheck"
            # Suppose that means "sensor_value must not exceed the interval"
            # or any other custom logic
            if sensor_value > constraint_val:
                # Exceeds recommended maintenance interval, treat as invalid
                return False
        # Add more condition checks if needed

    # If we reach here, no constraints were violated
    return True


def main():
    """
    Main workflow:
      3) Retrieve telemetry rows.
      4) Validate each row against constraints.
      5) Collect and display (or store) the valid rows.
    """

    # 1) Load constraints
    constraints_dict = rdf_constraints

    # 2) Connect to DB - adjust file name or connection string to your environment
    conn = sqlite3.connect('logs_simulation/machine_data.db')
    cursor = conn.cursor()

    # 3) Retrieve telemetry
    rows = cursor.execute("""
        SELECT id, timestamp, machine_id, component, sensor_name, sensor_value, sensor_unit 
        FROM telemetry
    """).fetchall()

    # 4) Validate each row
    valid_rows = []
    for row in rows:
        if check_constraints_for_row(row, constraints_dict):
            valid_rows.append(row)

    # 5) Use or store the valid data
    print("Valid (filtered) rows:")
    for r in valid_rows:
        print(r)

    conn.close()


if __name__ == "__main__":
    main()


Valid (filtered) rows:
(1, '2025-01-12 15:07:14', 1, 'urn:crawlercrane-ontology#Swing_System', 'swingspeed', 0.7255944417257613, 'rpm')
(2, '2025-01-12 15:08:14', 1, 'urn:crawlercrane-ontology#Luffing_Jib', 'luffingjiblength', 21.636292554415622, 'm')
(3, '2025-01-12 15:09:14', 1, 'urn:crawlercrane-ontology#Hydraulic_System', 'hydraulicpressure', 205.07369264538053, 'bar')
(4, '2025-01-12 15:10:14', 1, 'urn:crawlercrane-ontology#Load_Hoist_Drums', 'wireropediameter', 19.092855081801876, 'mm')
(5, '2025-01-12 15:11:14', 1, 'urn:crawlercrane-ontology#Luffing_Boom', 'luffingboomlength', 16.12588575204922, 'm')
(6, '2025-01-12 15:12:14', 1, 'urn:crawlercrane-ontology#Rated_Capacity_Limiter_System', 'misc_sensor', 0.7065892272708995, 'N/A')
(7, '2025-01-12 15:13:14', 1, 'urn:crawlercrane-ontology#Counterweight', 'counterweight', 6.6535313244765995, '470 kg')
(8, '2025-01-12 15:14:14', 1, 'urn:crawlercrane-ontology#Fuel_Tank', 'fueltanklevel', 223.8931618783332, 'L')
(9, '2025-01-12 15:15:14

In [12]:
import sqlite3

def check_constraints_for_row(row, constraints_dict):
    """
    Check if a single telemetry row meets the constraints.

    row is a tuple: (id, timestamp, machine_id, component, sensor_name, sensor_value, sensor_unit)

    Returns a tuple (is_valid, violated_constraint_key).
      - is_valid is True if the row violates no constraints, otherwise False.
      - violated_constraint_key is the name of the constraint that was violated (None if none violated).
    """
    row_id, ts, machine_id, component, sensor_name, sensor_value, sensor_unit = row

    # If this component has no known constraints, assume it's valid
    if component not in constraints_dict:
        return (True, None)

    comp_constraints = constraints_dict[component]

    for constraint_key, constraint_info in comp_constraints.items():
        constraint_val = constraint_info['value']
        constraint_unit = constraint_info['unit']

        # 1) Check if sensor unit matches the constraint unit
        #    For demonstration, skip if units do not match 
        #    (unless it's something special like "working hours").
        #    In a real scenario, you might handle conversions or partial matches.
        if sensor_unit != constraint_unit and 'working hours' not in sensor_unit:
            continue

        # 2) Simple approach: check if constraint_key starts with 'max', 'min', or contains 'interval'
        if constraint_key.lower().startswith('max'):
            if sensor_value > constraint_val:
                return (False, constraint_key)
        elif constraint_key.lower().startswith('min'):
            if sensor_value < constraint_val:
                return (False, constraint_key)
        elif 'interval' in constraint_key.lower():
            # Example usage: sensor_value must not exceed the interval
            if sensor_value > constraint_val:
                return (False, constraint_key)

    # If we reach here, no constraints were violated
    return (True, None)


def main():
    """
    Main workflow:
      1) Load constraints dict (parsed from RDF).
      2) Connect to DB (or read CSV).
      3) Retrieve telemetry rows.
      4) Validate each row against constraints.
      5) Separate and display valid rows and those that violated constraints.
    """

    # 1) Load constraints
    constraints_dict = rdf_constraints

    # 2) Connect to DB - adjust file name or connection string to your environment
    conn = sqlite3.connect('logs_simulation/machine_data.db')
    cursor = conn.cursor()

    # 3) Retrieve telemetry
    rows = cursor.execute("""
        SELECT id, timestamp, machine_id, component, sensor_name, sensor_value, sensor_unit 
        FROM telemetry
    """).fetchall()

    # 4) Validate each row
    valid_rows = []
    invalid_rows = []  # We'll store (row, constraint_key) for those that fail

    for row in rows:
        is_valid, violated_constraint_key = check_constraints_for_row(row, constraints_dict)
        if is_valid:
            valid_rows.append(row)
        else:
            invalid_rows.append((row, violated_constraint_key))

    # 5) Output valid rows and constraint violations separately
#    print("=== Valid (Filtered) Rows ===")
#    for r in valid_rows:
#        print(r)

    print("\n=== Rows That Violated Constraints ===")
    for row, ckey in invalid_rows:
        print(f"Row: {row} | Violated Constraint: {ckey}")

    conn.close()


if __name__ == "__main__":
    main()



=== Rows That Violated Constraints ===
Row: (404, '2025-01-12 21:50:14', 1, 'urn:crawlercrane-ontology#Carbody', 'carbodyclearance', 0.37, 'm') | Violated Constraint: minCarbodyClearance
Row: (412, '2025-01-12 21:58:14', 1, 'urn:crawlercrane-ontology#Counterweight', 'counterweight', 67.62842426798905, '470 kg') | Violated Constraint: maxCounterweight
Row: (665, '2025-01-13 02:11:14', 1, 'urn:crawlercrane-ontology#Tubular_Jib', 'jiblength', 32.42195191937523, 'm') | Violated Constraint: maxJibLength
Row: (675, '2025-01-13 02:21:14', 1, 'urn:crawlercrane-ontology#Carbody', 'carbodyclearance', 0.37, 'm') | Violated Constraint: minCarbodyClearance
Row: (974, '2025-01-13 07:20:14', 1, 'urn:crawlercrane-ontology#Boom_Hoist_Drum', 'boomangle', 90.15891922611104, '°') | Violated Constraint: maxBoomAngle
Row: (1088, '2025-01-13 09:14:14', 2, 'urn:crawlercrane-ontology#Tubular_Jib', 'jiblength', 32.8875157913517, 'm') | Violated Constraint: maxJibLength
Row: (1203, '2025-01-13 11:09:14', 2, 'ur