## 0. Description
- 우선 기능 단위로 만들고 하나의 모듈(클래스)로 만들어서 작동시켜보자

## 1. Load Library

In [12]:
import os
import sys
import pickle
import numpy as np
import subprocess
from tqdm import tqdm

from safetensors.torch import load_file

## 2. Huggingface 모델 다운로드 로직 구성
- 시나리오는 다음과 같다.
    1. 모델 파일 요청(특정 user, 특정 모델)
        - 특정 유저의 token은 어떻게 가지고 있지? 시스템에서 발급을 진행해줘야 하나..
        - 유저에게 개인 token 파일을 업로드하라 할까... 우리가 수기로 발급하기에는 많은 어려움이 있어 보인다.
    2. 모델 파일 탐색
    3. 모델 파일 취약성 점검 여부 확인(protect AI Knowledge Base or Huggingface)
    4. token 발급 or 기존 데이터 사용
    5. 다운로드 to blob(blob설계 필요. azcopy 특성을 고려한)
    6. p-region copy 수행(MSP의 허가가 필요)

In [None]:
'''
/api/v1/downloadModel
'''

from huggingface_hub import snapshot_download

# token을 사용해서 다운로드 해야하는 경우와, 그렇지 않은 경우가 나뉘는 것으로 보인다.
# 이 둘을 구분하여 다운로드 할 수 있도록 해야할 것으로 보인다.

# path를 지정하기 위해서는 모델의 확장자를 확인해볼 필요가 있음. safetensors 이외에 다른 확장자(점검이 가능한 확장자)가 있다면 common으로 이동
# 아니라면 safetensors 경로로 다운로드
# 일단 확장자 검사 로직 패스하고 원하는 경로에 정상적으로 다운로드 되는지 확인

# API Body
path = "/Users/dataeng/modelscan-test/modelscan/unscaned/3901296/request-01/model/common/yolos-tiny"

# 다음과 같은 request가 왔다고 가정.
request = {"user":3901296
           ,"request_id":"request-02"
           ,"model_name":"detr-resnet-50"
           ,"hugging_face":True
           ,"repo_id":"facebook/detr-resnet-50"}

BASE_PATH = "/Users/dataeng/modelscan-test/modelscan/unscaned" # root directory
# 나중에 임시 공간에 저장한 다음 이동 시키는 로직을 구성해야할 것 같다. 지금은 일단 이렇게 진행.
path = os.path.join(BASE_PATH,str(request['user']),request['request_id'],'model/safetensors',request['model_name'])

if request['hugging_face']:
    if not os.path.exists(path):
        os.makedirs(path)

        model = snapshot_download(repo_id=request['repo_id']
                                ,local_dir=path # 모델이 저장될 경로 지정
                                # ,allow_patterns="" # 다운로드 받을 패턴 지정
                                # ,ignore_patterns="" # 다운로드 시 무시할 패턴 지정
                                ) 
else:
    print("Pass download")

Fetching 6 files: 100%|██████████| 6/6 [01:17<00:00, 12.89s/it]


## 2. Convert safetensors -> pkl

## Summary

| Approach | Popularity | Risk of Model Serialization Attack Exploitability |
| --- | --- | --- |
| Pickle Variants | Very high | Very high |
| Tensorflow SavedModel | High | Medium |
| H5 (Keras) | High | Low (except Keras Lambda layer) |
| Inference Only | Medium | Low |
| Vector/Tensor Only | Low | Very low |

In [None]:
'''
modelscan module
'''

class ModelScan():
    def __init__(self,base_path:str,model_name:str,ext_in:str,ext_out:str):
        self.base_path:str = base_path
        self.model_name:str = model_name
        self.ext_in:str = ext_in
        self.ext_out:str = ext_out
        self.tensors:dict = {}

    def __check_folder_exists(self,path:str)-> None:
        '''
        Checking folder is exists. If folder is not exists, then create folder.
        '''
        if not os.path.exists(path):
            print(f"Not existst model folder in {self.ext_out} : {path}")
            os.makedirs(path)
        else:
            print("Exists folder")

    def __model_file_list(self,isLoad=True) -> list:
        '''
        load model file list
        '''
        model_weight_file_list = None

        # true -> load, false -> store
        extension = self.ext_in if isLoad else self.ext_out

        model_file_list = os.listdir(os.path.join(self.base_path,extension,self.model_name))
        model_weight_file_list = [fn for fn in model_file_list if fn.endswith('.'+extension) and not 'consolidated' in fn]
    
        return model_weight_file_list
    
    def __load_model(self,path:str):
        '''
        다양한 extension input을 지원할 수 있도록 해야함.
        '''
        tensor = None

        if self.ext_in == 'safetensors':
            print("execute load_file")
            tensor = load_file(path)
        elif self.ext_in == 'bin':
            print("execute load_file")
            tensor = load_file(path)
        
        return tensor
    
    def __store_model(self,path:str,model_name:str,tensors=None):
        '''
        tensor는 어떤 library를 사용하여 load 하는지에 따라 형식이 달라질 수 있다.
        추후 다양한 type을 지원할 수 있도록 모듈을 생성해야할 것이다.

        1. torch : collections.OrderedDict
        '''

        self.__check_folder_exists(path=path)
        
        if self.ext_out == 'pkl':
            # pickle을 사용하여 state_dict 저장
            store_path = os.path.join(path,model_name+'.'+self.ext_out)
            with open(store_path, 'wb') as f:
                pickle.dump(tensors, f)
        elif self.ext_out == 'npy':
            for name, array in tensors.items():
                store_path = os.path.join(path,name+'.npy')
                np.save(store_path, array)

    def __convert(self):
        '''
        description:
            Scaning All Model files(like .safetensor,,)
            이 convert 함수를 멀티 thread 혹은 processing으로 구현하면 좋을 것 같은데...
        return:

        '''
        # 1. load model weight with ext_in extension file
        load_model_file_list = self.__model_file_list(isLoad=True)
        
        for f in load_model_file_list:
            load_model_file_path = os.path.join(self.base_path,self.ext_in,self.model_name,f)
            print(f"Source Model File Path : {load_model_file_path}")

            # 각 확장자 별로 load함수를 달리 해야한다.
            model_weight_file_name = f.split('.')[0]
            print(model_weight_file_name)
            self.tensors[model_weight_file_name] = self.__load_model(path=load_model_file_path)

        # 2. store model weight with ext_out extension file
        # .pkl 파일 경로 지정
        for i,f in enumerate(load_model_file_list):
            store_model_file_name = f.split('.')[0]
            store_model_file_path = os.path.join(self.base_path,self.ext_out,self.model_name)
            print(f"Store File Model File Path : {store_model_file_path}")
            self.__store_model(path=store_model_file_path,model_name=store_model_file_name,tensors=self.tensors[store_model_file_name]) 
    
    def scan(self):
        # convert
        # check해야 할 필요가 없는 파일은 pass 하도록 구현
        # convert가 필요한 확장자들을 미리 정의해놓자.
        self.__convert()
        
        # 명령 실행
        store_model_folder_path = os.path.join(self.base_path,self.ext_out,self.model_name)

        # 검사 결과 저장
        report_path = os.path.join(self.base_path.replace("unscaned","scaned"),self.model_name)

        # check
        self.__check_folder_exists(report_path)

        command = f"modelscan -p {store_model_folder_path} -r json -o {report_path}/{self.model_name}-report.json"
        commands = command.split(" ")
        result = subprocess.run(commands, capture_output=True, text=True)

        # 각종 메타 결과는 나중에..
        # with open(path,'w') as f:
        #     f.write

        # 결과 출력
        # print("Return Code:", result.returncode)
        # print("Standard Output:", result.stdout)
        # print("Standard Error:", result.stderr)

In [32]:
'''
/api/v1/modelscan
'''

BASE_PATH = os.path.join(BASE_PATH,str(request['user']),request['request_id'],'model')

ext_in = 'safetensors'
ext_out = 'npy'

MODEL_NAME=request['model_name']

if __name__=="__main__":
    scanner = ModelScan(
                base_path=BASE_PATH,
                model_name=MODEL_NAME,
                ext_in=ext_in,
                ext_out=ext_out
            )

    scanner.scan()

Source Model File Path : /Users/dataeng/modelscan-test/modelscan/unscaned/3901296/request-02/model/safetensors/detr-resnet-50/model.safetensors
model
execute load_file
Store File Model File Path : /Users/dataeng/modelscan-test/modelscan/unscaned/3901296/request-02/model/npy/detr-resnet-50
Not existst model folder in npy : /Users/dataeng/modelscan-test/modelscan/unscaned/3901296/request-02/model/npy/detr-resnet-50
Not existst model folder in npy : /Users/dataeng/modelscan-test/modelscan/scaned/3901296/request-02/model/detr-resnet-50
