In [30]:
# app/backend/services/csv_to_ontology_service.py

from pathlib import Path
from datetime import datetime as dt
from typing import Optional, Dict, List
import polars as pl
from owlready2 import *

class CSVToOntologyService:
    """CSV 데이터를 mapping.owl 기반 온톨로지로 변환하는 서비스"""
    
    def __init__(self, ontology_path: str = "mapping.owl"):
        """
        Args:
            ontology_path: mapping.owl 파일 경로
        """
        self.ontology_path = Path(ontology_path)
        self.onto = None
        self.industry = None
        
    def load_mapping_ontology(self):
        """mapping.owl 파일을 로드"""
        if not self.ontology_path.exists():
            raise FileNotFoundError(f"온톨로지 파일을 찾을 수 없습니다: {self.ontology_path}")
        
        # 기존 온톨로지 로드
        self.onto = get_ontology(f"file://{self.ontology_path.absolute()}").load()
        
        # 네임스페이스 가져오기
        with self.onto:
            # 클래스 참조 가져오기
            self.Industry = self.onto.Industry
            self.Machine = self.onto.Machine
            self.Data = self.onto.Data
            self.MLModel = self.onto.MLModel
            self.MLMetricData = self.onto.MLMetricData
            
            # 속성 참조 가져오기
            self.has_machine = self.onto.has_machine
            self.has_data = self.onto.has_data
            self.has_ml_model = self.onto.has_ml_model
            self.has_ml_metric_data = self.onto.has_ml_metric_data
            
            # 데이터 속성 가져오기
            self.timestamp = self.onto.timestamp if hasattr(self.onto, 'timestamp') else None
            self.float_value = self.onto.float_value if hasattr(self.onto, 'float_value') else None
        
        return self.onto
    
    def process_csv_file(
        self, 
        csv_path: str | Path,
        industry_name: str = "manufacturing",
        machine_name: Optional[str] = None,
        time_column: str = "time",
        sample_size: Optional[int] = None
    ) -> Dict[str, int]:
        """
        CSV 파일을 처리하여 온톨로지 인스턴스 생성
        
        Args:
            csv_path: CSV 파일 경로
            industry_name: 산업 이름 (기본값: "manufacturing")
            machine_name: 기계 이름 (None이면 CSV 파일명 사용)
            time_column: 시간 컬럼명
            sample_size: 처리할 행 수 (None이면 전체)
            
        Returns:
            생성된 인스턴스 개수 딕셔너리
        """
        if self.onto is None:
            self.load_mapping_ontology()
        
        csv_path = Path(csv_path)
        if not csv_path.exists():
            raise FileNotFoundError(f"CSV 파일을 찾을 수 없습니다: {csv_path}")
        
        # CSV 파일 읽기
        df = pl.read_csv(csv_path)
        
        # 샘플링 (대용량 파일 처리용)
        if sample_size and len(df) > sample_size:
            df = df.head(sample_size)
        
        # 기계 이름 설정
        if machine_name is None:
            machine_name = csv_path.stem.replace("_", "_").lower()
        
        counts = {
            "industry": 0,
            "machine": 0,
            "data": 0,
            "ml_model": 0,
            "ml_metric_data": 0
        }
        
        with self.onto:
            # 1. Industry 인스턴스 생성 또는 가져오기
            industry_uri = f"#{industry_name}"
            industry_inst = self.onto.search_one(iri=industry_uri)
            
            if industry_inst is None:
                industry_inst = self.Industry(industry_name)
                counts["industry"] = 1
            else:
                industry_inst = industry_inst[0]
            
            # 2. Machine 인스턴스 생성
            machine_inst_name = machine_name.replace("-", "_").replace(" ", "_")
            machine_inst = self.Machine(machine_inst_name)
            industry_inst.has_machine.append(machine_inst)
            counts["machine"] = 1
            
            # 3. Data 인스턴스 생성 (CSV의 각 행)
            data_instances = []
            time_col = time_column if time_column in df.columns else df.columns[0]
            
            for idx, row in enumerate(df.iter_rows(named=True)):
                data_name = f"{machine_inst_name}_data_{idx}"
                data_inst = self.Data(data_name)
                
                # 시간 데이터 설정
                if time_col in row and row[time_col] is not None:
                    try:
                        # 시간 파싱
                        if isinstance(row[time_col], str):
                            # ISO 형식 또는 다른 형식 처리
                            time_str = row[time_col].replace("T", " ").split(".")[0]
                            time_obj = dt.strptime(time_str, "%Y-%m-%d %H:%M:%S")
                        else:
                            time_obj = row[time_col]
                        
                        if self.timestamp:
                            data_inst.timestamp = [time_obj]
                    except Exception as e:
                        print(f"시간 파싱 오류 (행 {idx}): {e}")
                
                # 센서 데이터를 float_value로 설정
                sensor_columns = [col for col in df.columns 
                                if col != time_col and df[col].dtype in [pl.Float64, pl.Float32, pl.Int64, pl.Int32]]
                
                for col in sensor_columns[:5]:  # 처음 5개 컬럼만 (예시)
                    if row[col] is not None:
                        try:
                            value = float(row[col])
                            if self.float_value:
                                # 여러 float_value를 리스트로 추가
                                if not hasattr(data_inst, 'float_value') or data_inst.float_value is None:
                                    data_inst.float_value = []
                                data_inst.float_value.append(value)
                        except (ValueError, TypeError):
                            pass
                
                machine_inst.has_data.append(data_inst)
                data_instances.append(data_inst)
                counts["data"] += 1
            
            # 4. MLModel 인스턴스 생성 (예시 - 실제로는 ML 모델 정보가 있어야 함)
            ml_model_name = f"{machine_inst_name}_ml_model"
            ml_model_inst = self.MLModel(ml_model_name)
            
            # 첫 번째 데이터에 ML 모델 연결
            if data_instances:
                data_instances[0].has_ml_model.append(ml_model_inst)
                counts["ml_model"] = 1
                
                # 5. MLMetricData 인스턴스 생성 (예시)
                ml_metric_name = f"{machine_inst_name}_ml_metric_data"
                ml_metric_inst = self.MLMetricData(ml_metric_name)
                
                # 예시 메트릭 값 설정
                if hasattr(self.onto, 'accuracy'):
                    ml_metric_inst.accuracy = [0.95]  # 예시 값
                if hasattr(self.onto, 'precision'):
                    ml_metric_inst.precision = [0.92]  # 예시 값
                if hasattr(self.onto, 'f1_score'):
                    ml_metric_inst.f1_score = [0.93]  # 예시 값
                
                ml_model_inst.has_ml_metric_data.append(ml_metric_inst)
                counts["ml_metric_data"] = 1
        
        return counts
    
    def save_ontology(self, output_path: Optional[str | Path] = None):
        """온톨로지를 파일로 저장"""
        if self.onto is None:
            raise ValueError("온톨로지가 로드되지 않았습니다.")
        
        output_path = Path(output_path) if output_path else self.ontology_path
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        self.onto.save(file=str(output_path), format="rdfxml")
        print(f"온톨로지가 저장되었습니다: {output_path}")
    
    def process_multiple_csvs(
        self,
        csv_directory: str | Path,
        industry_name: str = "manufacturing",
        pattern: str = "*.csv"
    ) -> Dict[str, Dict[str, int]]:
        """여러 CSV 파일을 일괄 처리"""
        csv_dir = Path(csv_directory)
        if not csv_dir.exists():
            raise FileNotFoundError(f"디렉토리를 찾을 수 없습니다: {csv_dir}")
        
        results = {}
        csv_files = list(csv_dir.glob(pattern))
        
        print(f"총 {len(csv_files)}개의 CSV 파일을 처리합니다...")
        
        for csv_file in csv_files:
            print(f"\n처리 중: {csv_file.name}")
            try:
                counts = self.process_csv_file(
                    csv_file,
                    industry_name=industry_name,
                    machine_name=None,  # 파일명 사용
                    sample_size=100  # 예시: 처음 100행만 처리
                )
                results[csv_file.name] = counts
                print(f"  완료: {counts}")
            except Exception as e:
                print(f"  오류: {e}")
                results[csv_file.name] = {"error": str(e)}
        
        return results

In [31]:
# 서비스 초기화
service = CSVToOntologyService(ontology_path="../owl/mapping.owl")

In [32]:
service.load_mapping_ontology()

get_ontology("http://pcnrnd.org/mapping.owl#")

In [33]:
# 단일 CSV 파일 처리
counts = service.process_csv_file(
    csv_path="../data/deoksan.csv",
    industry_name="manufacturing",
    machine_name="deoksan_machine",
    time_column="time",
    sample_size=100  # 처음 100행만 처리 (테스트용)
)

In [34]:
print("생성된 인스턴스:")
print(f"  Industry: {counts['industry']}")
print(f"  Machine: {counts['machine']}")
print(f"  Data: {counts['data']}")
print(f"  MLModel: {counts['ml_model']}")
print(f"  MLMetricData: {counts['ml_metric_data']}")


생성된 인스턴스:
  Industry: 1
  Machine: 1
  Data: 100
  MLModel: 1
  MLMetricData: 1


In [51]:
counts

{'industry': 1, 'machine': 1, 'data': 100, 'ml_model': 1, 'ml_metric_data': 1}

In [44]:
# 온톨로지 저장
service.save_ontology("mapping_with_data.owl")

온톨로지가 저장되었습니다: mapping_with_data.owl


In [50]:
results = service.process_multiple_csvs(
    csv_directory=".",
    industry_name="manufacturing",
    pattern="*.csv"
)

총 1개의 CSV 파일을 처리합니다...

처리 중: deoksan.csv
  완료: {'industry': 1, 'machine': 1, 'data': 100, 'ml_model': 1, 'ml_metric_data': 1}


In [52]:
print("\n전체 처리 결과:")
for filename, counts in results.items():
    print(f"  {filename}: {counts}")


전체 처리 결과:
  deoksan.csv: {'industry': 1, 'machine': 1, 'data': 100, 'ml_model': 1, 'ml_metric_data': 1}


In [None]:
import pandas as pd
from owlready2 import *
import types

def update_owl_from_csv(existing_owl_path, csv_file_path, output_owl_path):
    # 1. 기존 OWL 파일 로드
    # 로컬 파일 경로를 입력하면 자동으로 파싱해서 메모리에 올립니다.
    print(f">>> 기존 온톨로지 로딩 중: {existing_owl_path}")
    onto = get_ontology(existing_owl_path).load()
    
    # base_iri 확인 (URL 끝에 #이나 /가 붙어있는지 확인)
    print(f">>> Base IRI: {onto.base_iri}")

    # 2. CSV 데이터 로드
    df = pd.read_csv(csv_file_path)
    df = df.where(pd.notnull(df), None) # NaN 처리

    # 캐시 생성 (기존에 이미 존재하는 클래스들도 캐시에 넣어두면 좋음)
    # onto.classes()를 통해 이미 있는 모든 클래스를 {이름: 객체} 형태로 매핑
    classes_cache = {cls.name: cls for cls in onto.classes()}

    with onto:
        # --- [Pass 1] 클래스 찾기 또는 생성 ---
        print(">>> 데이터 업데이트 중...")
        for index, row in df.iterrows():
            class_id = str(row['Class_ID']) # CSV의 ID (예: Sonata)
            label = row['Label']
            description = row.get('Description') # 컬럼이 없을 수도 있으니 get 사용

            # 1. 이미 존재하는 클래스인지 확인
            if class_id in classes_cache:
                current_class = classes_cache[class_id]
                # print(f"UPDATE: {class_id} (기존 클래스 수정)")
            else:
                # 2. 없으면 새로 생성 (일단 Thing의 자식으로)
                current_class = types.new_class(class_id, (Thing,))
                classes_cache[class_id] = current_class
                # print(f"CREATE: {class_id} (신규 클래스 생성)")
            
            # 3. 속성 업데이트 (Label 등)
            # 주의: 기존 라벨을 유지할지, 덮어쓸지 결정해야 함. 여기선 추가(append) 방식
            if label and label not in current_class.label:
                current_class.label.append(label)
            
            # 주석(Comment)이나 사용자 정의 속성 추가 예시
            if description:
                current_class.comment.append(description)

        # --- [Pass 2] 계층 구조(Parent) 연결 ---
        # 기존 관계를 끊고 새로 연결할지, 추가할지 결정 필요 (여기선 추가)
        for index, row in df.iterrows():
            child_id = str(row['Class_ID'])
            parent_id = row['Parent_ID']

            if parent_id and parent_id in classes_cache:
                child_cls = classes_cache[child_id]
                parent_cls = classes_cache[parent_id]

                # 이미 상속 관계가 있는지 체크 후 없으면 추가
                if parent_cls not in child_cls.is_a:
                    child_cls.is_a.append(parent_cls)

    # 3. 변경된 내용을 파일로 저장
    onto.save(file=output_owl_path, format="rdfxml")
    print(f"OWL 업데이트 완료: {output_owl_path}")

# 실행 예시
# update_owl_from_csv('base_ontology.owl', 'update_data.csv', 'final_ontology.owl')

In [11]:
from owlready2 import *
import polars as pl
ONTO_PATH = "../owl/mapping.owl"
onto = get_ontology(ONTO_PATH).load()

In [13]:
pdf = pl.read_csv("./deoksan.csv")

In [24]:
pdf.describe()

statistic,time,curr,currR,currS,currT,Ground,PT100,Vibra,Volt,VoltR,VoltS,VoltT
str,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""count""","""4925624""",4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0,4925624.0
"""null_count""","""0""",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
"""mean""",,449.999188,224.07948,225.983931,227.467525,224.196732,225.407101,223.848204,221.278312,221.837921,220.032607,223.469311
"""std""",,287.228663,3.099336,2.896538,2.871692,3.015846,3.077511,2.162268,4.577975,3.134805,4.626281,2.933306
"""min""","""2021-04-01T00:00:00.200""",0.0,209.8,211.1,211.9,86.7,210.8,205.6,78.9,80.7,81.5,88.6
"""25%""",,200.0,221.9,224.0,225.6,222.1,223.4,222.5,218.5,219.8,217.2,221.8
"""50%""",,400.0,224.2,226.0,227.4,224.2,225.4,223.9,219.9,221.1,218.6,223.8
"""75%""",,700.0,226.3,228.2,229.7,226.3,227.4,225.5,221.6,222.7,220.5,225.7
"""max""","""2021-04-06T16:49:28.000""",900.0,230.7,231.7,233.3,230.3,231.9,228.2,232.0,229.9,231.1,228.6


In [20]:
with onto:
    machine_cache = {}
    machine_inst = onto.Machine("unique_01")
    machine_cache["unique_id"] = machine_inst

In [21]:
machine_cache

{'unique_id': mapping.unique_01}

In [34]:
pdf.iter_rows(named=True)


<generator object DataFrame.iter_rows at 0x00000233F6461540>