양자화 하는 파일

In [1]:
import onnx
import onnxruntime
from onnxruntime.quantization import QuantType, quantize_static, CalibrationDataReader, QuantFormat
from onnxruntime.quantization.shape_inference import quant_pre_process

import funs

import os

import pandas as pd 
import numpy as np  

In [2]:
#### Name Parameters ####

model_fp32_name = 'FRFconv-TDS_onnx'
model_fp32 = f'quant_models/{model_fp32_name}.onnx'

# 전처리된 모델
model_fp32_prep_name = "FRFconv-TDS_onnx_prep"
model_fp32_prep = f'quant_models/{model_fp32_prep_name}.onnx'

# 양자화된 모델
model_quant_name = f'{model_fp32_name}v2.quant'
model_quant = f'quant_models/{model_quant_name}.onnx'


### 기타 파라미터 ###
config = funs.load_yaml('./config.yaml')

In [3]:
def check_model(model_path):
    onnx.checker.check_model(model_path)
    print(f"{model_path} is valid!")

def preprocess_model(model_path, model_prep_path):
    quant_pre_process(model_path, model_prep_path)
    print(f"Preprocessed model saved to {model_prep_path}")

In [4]:
# 원본 모델 확인
check_model(model_fp32)

# 모델 전처리 및 확인
preprocess_model(model_fp32, model_fp32_prep)
check_model(model_fp32_prep)

quant_models/FRFconv-TDS_onnx.onnx is valid!
Preprocessed model saved to quant_models/FRFconv-TDS_onnx_prep.onnx
quant_models/FRFconv-TDS_onnx_prep.onnx is valid!


In [5]:
# Calibration dataset 제작
funs.set_seed(config.seed)

data_root_dirs = os.path.join(config.dataset_root)

Cylindrical_dirs = funs.get_bearing_paths(data_root_dirs, 'CylindricalRoller', config.rpm, config.sampling_rate)
DepGroove_dirs = funs.get_bearing_paths(data_root_dirs, 'DeepGrooveBall', config.rpm, config.sampling_rate)
Tapered_dirs = funs.get_bearing_paths(data_root_dirs, 'TaperedRoller', config.rpm, config.sampling_rate)

print("Making dataframes...")

Cylindrical_df = funs.make_dataframe(config, Cylindrical_dirs)
DepGroove_df =funs.make_dataframe(config, DepGroove_dirs)
Tapered_df = funs.make_dataframe(config, Tapered_dirs)  

print("concat dataframes...")
all_df = pd.concat([Cylindrical_df, DepGroove_df, Tapered_df], ignore_index=True)

train_df, val_df, test_df = funs.split_dataframe(all_df, 0.6, 0.2)

# cali_data, cali_label = funs.build_from_dataframe(val_df, config.sample_size, config.overlap, False)
# cali_data = cali_data.astype(np.float32)

Making dataframes...
concat dataframes...


In [None]:
grouped_data = all_df.groupby('fault_type')

import pandas as pd
import numpy as np

# Assuming 'all_df' is your DataFrame
# For each fault type, create a separate CSV file
for fault_type, group in grouped_data:
    # Concatenate all 'data' arrays in the current group
    # 'group' is a DataFrame containing only rows with the current fault_type
    combined_data = np.concatenate(group['data'].values)

    # Convert the combined NumPy array to a pandas Series
    # This makes it easy to save to a CSV with a single column
    output_series = pd.Series(combined_data)

    # Define the output file name
    # We use an f-string to include the fault_type in the filename
    output_filename = f'data_{fault_type}.csv'

    # Save the data to a CSV file.
    # 'header=False' and 'index=False' prevent pandas from writing the column name and row numbers.
    output_series.to_csv(output_filename, header=False, index=False)
    
    print(f'Successfully saved data for fault_type {fault_type} to {output_filename}')

Successfully saved data for fault_type H to data_H.csv
Successfully saved data for fault_type L to data_L.csv
Successfully saved data for fault_type M1 to data_M1.csv
Successfully saved data for fault_type M2 to data_M2.csv
Successfully saved data for fault_type M3 to data_M3.csv
Successfully saved data for fault_type U1 to data_U1.csv
Successfully saved data for fault_type U2 to data_U2.csv
Successfully saved data for fault_type U3 to data_U3.csv


In [15]:
DepGroove_df

Unnamed: 0,data,fault_type,label,RPM,bearing_type
0,"[-0.017151908651472068, -0.017903773140303718,...",U2,6,600,DeepGrooveBall
1,"[0.001237443637868761, 0.021612971285206537, 0...",L,1,600,DeepGrooveBall
2,"[0.013420781125644992, 0.02163490066613079, 0....",M2,3,600,DeepGrooveBall
3,"[-0.10739444392349112, -0.12554257302266514, -...",U1,5,600,DeepGrooveBall
4,"[-0.014955837790342949, -0.013943953499123684,...",U3,7,600,DeepGrooveBall
5,"[0.02400014103724703, 0.03109586215059575, 0.0...",H,0,600,DeepGrooveBall
6,"[0.025366028191957865, 0.02000586094032888, 0....",M1,2,600,DeepGrooveBall
7,"[-0.030300138899915584, -0.03137781133390762, ...",M3,4,600,DeepGrooveBall
8,"[0.00445479709632754, 0.009843159266287713, 0....",M3,4,800,DeepGrooveBall
9,"[0.02574822597378062, 0.03418790486091592, 0.0...",H,0,800,DeepGrooveBall


In [20]:
DepGroove_df =funs.make_dataframe(config, DepGroove_dirs)

grouped_data = DepGroove_df.groupby('fault_type')

import pandas as pd
import numpy as np

# Assuming 'all_df' is your DataFrame
# For each fault type, create a separate CSV file
for fault_type, group in grouped_data:
    # Concatenate all 'data' arrays in the current group
    # 'group' is a DataFrame containing only rows with the current fault_type
    combined_data = np.concatenate(group['data'].values)

    # Convert the combined NumPy array to a pandas Series
    # This makes it easy to save to a CSV with a single column
    output_series = pd.Series(combined_data)

    # Define the output file name
    # We use an f-string to include the fault_type in the filename
    output_filename = f'data_{fault_type}.csv'

    # Save the data to a CSV file.
    # 'header=False' and 'index=False' prevent pandas from writing the column name and row numbers.
    output_series.to_csv(output_filename, header=False, index=False)
    
    print(f'Successfully saved data for fault_type {fault_type} to {output_filename}')

KeyboardInterrupt: 

In [32]:
import csv
import os

# 1. 원본 CSV 파일들이 있는 폴더
input_folder = "data_dg_1200"
# 2. 변환 후 CSV를 저장할 폴더
output_folder = "data_dg_1200_buffered"
os.makedirs(output_folder, exist_ok=True)

# 3. 버퍼 크기
buffer_size = 2048

# 4. 폴더 내 CSV 파일 처리
for filename in os.listdir(input_folder):
    if filename.endswith(".csv"):
        input_path = os.path.join(input_folder, filename)
        output_path = os.path.join(output_folder, filename)  # 같은 이름으로 저장

        # CSV 읽기
        with open(input_path, 'r') as f:
            reader = csv.reader(f)
            data = [float(row[0]) for row in reader]  # 1축 데이터 기준

        # 버퍼 단위로 나누기
        buffers = []
        for i in range(0, len(data), buffer_size):
            buffer = data[i:i+buffer_size]
            if len(buffer) < buffer_size:
                buffer += [0] * (buffer_size - len(buffer))  # 부족하면 0으로 채움
            buffers.append(buffer)

        # 새로운 CSV로 저장
        with open(output_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerows(buffers)

        print(f"{filename} → {len(buffers)}개의 버퍼 생성 완료")


data_U2_1200RPM.csv → 625개의 버퍼 생성 완료
data_H_1200RPM.csv → 625개의 버퍼 생성 완료
data_L_1200RPM.csv → 625개의 버퍼 생성 완료
data_M1_1200RPM.csv → 625개의 버퍼 생성 완료
data_U1_1200RPM.csv → 625개의 버퍼 생성 완료
data_U3_1200RPM.csv → 625개의 버퍼 생성 완료
data_M3_1200RPM.csv → 625개의 버퍼 생성 완료
data_M2_1200RPM.csv → 625개의 버퍼 생성 완료


In [None]:
import csv

# 원본 CSV 파일
input_file = "original.csv"
# 변환 후 CSV 파일
output_file = "buffered.csv"
# 버퍼 크기
buffer_size = 2048

# 1. 원본 CSV 읽기
with open(input_file, 'r') as f:
    reader = csv.reader(f)
    # float 값으로 변환
    data = [float(row[0]) for row in reader]

# 2. 버퍼 단위로 나누기
buffers = []
for i in range(0, len(data), buffer_size):
    buffer = data[i:i+buffer_size]
    # 버퍼가 부족하면 0으로 채우기
    if len(buffer) < buffer_size:
        buffer += [0] * (buffer_size - len(buffer))
    buffers.append(buffer)

# 3. 새로운 CSV로 저장
with open(output_file, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(buffers)

print(f"총 {len(buffers)}개의 버퍼가 생성되어 {output_file}에 저장되었습니다.")


In [30]:
df_1200_rpm = DepGroove_df[DepGroove_df['RPM'] == '1200']
print(df_1200_rpm)

                                                 data fault_type  label   RPM  \
24  [-0.011111930591191126, 0.00013784182295246958...         M2      3  1200   
25  [-0.14379721625775696, -0.15139731313236357, -...         U1      5  1200   
26  [0.01648149614893051, 0.015084281307185022, 0....         M1      2  1200   
27  [0.7894514477358279, 0.7823087350919272, 0.778...         U3      7  1200   
28  [-0.13537633398284243, -0.11220011111460677, -...          H      0  1200   
29  [-0.2528238326757535, -0.28032014358606766, -0...         M3      4  1200   
30  [0.21619550099483814, 0.17186369107210184, 0.1...         U2      6  1200   
31  [-0.16740262843836737, -0.14425460048846286, -...          L      1  1200   

      bearing_type  
24  DeepGrooveBall  
25  DeepGrooveBall  
26  DeepGrooveBall  
27  DeepGrooveBall  
28  DeepGrooveBall  
29  DeepGrooveBall  
30  DeepGrooveBall  
31  DeepGrooveBall  


In [29]:
print(DepGroove_df['RPM'].unique())

['600' '800' '1000' '1200' '1400' '1600']


In [25]:
df_1200_rpm

Unnamed: 0,data,fault_type,label,RPM,bearing_type


In [6]:
class MyCalibrationDataReader(CalibrationDataReader):
    def __init__(self, data, model_path):
        self.enum_data = None
        self.data = data 

        # Use inference session to get input shape.
        session = onnxruntime.InferenceSession(model_path, None)
        batch_size, channel, length = session.get_inputs()[0].shape
        self.input_name = session.get_inputs()[0].name
        self.datasize = len(data)

    def get_next(self):
        if self.enum_data is None:
            self.enum_data = iter([
                {self.input_name: sample[np.newaxis, np.newaxis, :].astype(np.float32)}  # (2048,) → (1, 1, 2048)
                for sample in self.data
            ])
        return next(self.enum_data, None)
    
    def rewind(self):
        self.enum_data = None  # Reset the enumeration of calibration data


In [7]:
dr = MyCalibrationDataReader(cali_data, model_fp32_prep)

In [8]:
quantize_static(
    model_fp32_prep,
    model_quant,
    dr,
    quant_format=QuantFormat.QDQ,
    per_channel=False,
    weight_type=QuantType.QInt8, 
    activation_type=QuantType.QInt8, 
    reduce_range=False,
    extra_options={'WeightSymmetric': True, 'ActivationSymmetric': False},
)

In [9]:
check_model(model_quant)

quant_models/FRFconv-TDS_onnxv2.quant.onnx is valid!


In [10]:
original_model = onnx.load(model_fp32)
# 양자화 모델  
quant_model = onnx.load(model_quant)

def get_weights_size(model):
    total_size = 0
    for initializer in model.graph.initializer:
        # 실제 가중치 데이터 크기
        total_size += len(initializer.raw_data) if initializer.raw_data else 0
    return total_size

print(f"원본 가중치 크기: {get_weights_size(original_model)} bytes")
print(f"양자화 가중치 크기: {get_weights_size(quant_model)} bytes")

원본 가중치 크기: 35296 bytes
양자화 가중치 크기: 9344 bytes
