# 1. SageMaker Processing Job

In [8]:
install_needed = True
# install_needed = False

In [9]:
%%bash
#!/bin/bash

DAEMON_PATH="/etc/docker"
MEMORY_SIZE=10G

FLAG=$(cat $DAEMON_PATH/daemon.json | jq 'has("data-root")')
# echo $FLAG

if [ "$FLAG" == true ]; then
    echo "Already revised"
else
    sudo service docker stop
    echo "Add data-root and default-shm-size=$MEMORY_SIZE"
    sudo cp $DAEMON_PATH/daemon.json $DAEMON_PATH/daemon.json.bak
    sudo cat $DAEMON_PATH/daemon.json.bak | jq '. += {"data-root":"/home/ec2-user/SageMaker/.container/docker","default-shm-size":"'$MEMORY_SIZE'"}' | sudo tee $DAEMON_PATH/daemon.json > /dev/null
    sudo rsync -aP /var/lib/docker /home/ec2-user/SageMaker/.container
    sudo service docker start
    echo "Docker Restart"
fi

# sudo curl -L "https://github.com/docker/compose/releases/download/v2.7.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# sudo chmod +x /usr/local/bin/docker-compose

Already revised


In [10]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
    !{sys.executable} -m pip install --upgrade pip --quiet
    !{sys.executable} -m pip install -U sagemaker huggingface_hub transformers --quiet
    IPython.Application.instance().kernel.do_shutdown(True)

installing deps and restarting kernel


![image](./imgs/processing-job-image.png)

In [1]:
import os
import sagemaker
import huggingface_hub
from pathlib import Path
from time import strftime

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer
)
import torch


source_dir = f"{Path.cwd()}/src"
os.makedirs(source_dir, exist_ok=True)

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = "240929-owl-vit"

role = sagemaker.get_execution_role()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [2]:
os.environ['HF_DATASETS_CACHE'] = '/home/ec2-user/SageMaker/.cache'
os.environ['HF_CACHE_HOME'] = '/home/ec2-user/SageMaker/.cache'
os.environ['HUGGINGFACE_HUB_CACHE'] = '/home/ec2-user/SageMaker/.cache'
# os.environ['TRANSFORMERS_HOME'] = '/home/ec2-user/SageMaker/.cache'
# os.environ['HF_HOME'] = '/home/ec2-user/SageMaker/.cache'

In [3]:
test_model_id = 'google/owlvit-base-patch32'

In [4]:
huggingface_hub.login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [5]:
registered_model = test_model_id.split("/")[-1].lower().replace(".", "-")
print(f"registered_model : {registered_model}")
os.makedirs(registered_model, exist_ok=True)

huggingface_hub.snapshot_download(
    repo_id=test_model_id,
    revision="main",
    local_dir=registered_model
)

registered_model : owlvit-base-patch32


Fetching 10 files:   0%|          | 0/10 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/613M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/392 [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/613M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.42k [00:00<?, ?B/s]

.gitattributes:   0%|          | 0.00/1.23k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/460 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/775 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.06M [00:00<?, ?B/s]

README.md:   0%|          | 0.00/5.15k [00:00<?, ?B/s]

'/home/ec2-user/SageMaker/owl-vit-on-sagemaker/owlvit-base-patch32'

In [6]:
local_model_weight = test_model_id.split("/")[-1].lower().replace(".", "-")
local_model_weight

'owlvit-base-patch32'

In [7]:
s3_model_weight_path = sagemaker_session.upload_data(path=f'./{local_model_weight}', bucket=bucket, key_prefix=f"{prefix}/{local_model_weight}")
print('Model weight spec (in this case, just an S3 path): {}'.format(s3_model_weight_path))

Model weight spec (in this case, just an S3 path): s3://sagemaker-us-east-1-714932599119/240929-owl-vit/owlvit-base-patch32


In [None]:
s3_input_data_path = sagemaker_session.upload_data(path=f'./ecommerce-products', bucket=bucket, key_prefix=f"{prefix}/ecommerce-products")
print('input spec (in this case, just an S3 path): {}'.format(s3_input_data_path))

In [None]:
import glob
len(glob.glob("./ecommerce-products/tv/*"))

In [None]:
%%writefile src/requirements.txt
transformers

In [None]:
%%writefile src/evaluation.py
import os
import csv
import glob
import requests
from PIL import Image
import torch
from time import strftime
from pathlib import Path

from transformers import OwlViTProcessor, OwlViTForObjectDetection

current_time = strftime("%m%d-%H%M%s")
hostname = os.environ.get('HOSTNAME').split(".")[0]

weights_path = "/opt/ml/processing/weights"
input_path = "/opt/ml/processing/data"
output_path = Path(f"/opt/ml/processing/output/{current_time}-test_result-{hostname}.csv")


# processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
# model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
processor = OwlViTProcessor.from_pretrained(weights_path)
model = OwlViTForObjectDetection.from_pretrained(weights_path)

# url = "http://images.cocodataset.org/val2017/000000039769.jpg"
# image = Image.open(requests.get(url, stream=True).raw)

res = []
for image_path in glob.glob(f"{input_path}/*"):
    print(f"image_path : {image_path}")
    image = Image.open(image_path)
    image = image.convert("RGB")  # RGB로 변환
    
    texts = [["a photo of a tv", "a photo of a dog"]]
    inputs = processor(text=texts, images=image, return_tensors="pt")
    outputs = model(**inputs)

    # Target image sizes (height, width) to rescale box predictions [batch_size, 2]
    target_sizes = torch.Tensor([image.size[::-1]])
    # Convert outputs (bounding boxes and class logits) to Pascal VOC format (xmin, ymin, xmax, ymax)
    results = processor.post_process_object_detection(outputs=outputs, target_sizes=target_sizes, threshold=0.1)
    i = 0  # Retrieve predictions for the first image for the corresponding text queries
    text = texts[i]
    boxes, scores, labels = results[i]["boxes"], results[i]["scores"], results[i]["labels"]
        
    for box, score, label in zip(boxes, scores, labels):
        box = [round(i, 2) for i in box.tolist()]
        confidence = round(score.item(), 3)
        label_name = texts[0][label]
        print(f"Detected {label_name} in {image_path} with confidence {confidence} at location {box}")
        res.append([str(image_path), label_name, confidence, box])

print(f"num of results : {len(res)}")

fields = ['image_name', 'label_name', 'confidence', 'location']
with output_path.open('w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(fields)
    writer.writerows(res)

In [None]:
# %%writefile src/evaluation.py
# import os
# import csv
# import numpy as np
# import glob
# import argparse
# from pathlib import Path
# from PIL import Image
# import torch
# from transformers import OwlViTProcessor, OwlViTForObjectDetection
# from datetime import datetime
# from time import strftime
# import multiprocessing as mp

# def process_image(image_path, texts, threshold, weights_path):
#     processor = OwlViTProcessor.from_pretrained(weights_path)
#     model = OwlViTForObjectDetection.from_pretrained(weights_path)
    
#     print(f"image_path : {image_path}")
#     image = Image.open(image_path).convert("RGB")
#     image_size = image.size
#     image = np.array(image)
    
#     if image.ndim == 2:
#         image = np.stack((image,)*3, axis=-1)

#     inputs = processor(text=texts, images=image, return_tensors="pt")
#     outputs = model(**inputs)

#     target_sizes = torch.Tensor([image_size[::-1]])
#     results = processor.post_process_object_detection(outputs=outputs, target_sizes=target_sizes, threshold=threshold)[0]

#     boxes, scores, labels = results["boxes"], results["scores"], results["labels"]
#     detections = []
#     for box, score, label in zip(boxes, scores, labels):
#         label_name = texts[0][label]
#         confidence = round(score.item(), 3)
#         box_coords = [round(i, 2) for i in box.tolist()]
#         print(f"Detected {label_name} in {image_path} with confidence {confidence} at location {box_coords}")
#         detections.append((str(image_path), label_name, confidence, box_coords)) 
    
#     return detections

# current_time = strftime("%m%d-%H%M%s")
# hostname = os.environ.get('HOSTNAME').split(".")[0]

# weights_path = "/opt/ml/processing/weights"
# input_path = "/opt/ml/processing/data"
# output_path = Path(f"/opt/ml/processing/output/{current_time}-test_result-{hostname}.csv")

# start_time = datetime.now()
# print(f"Job started at: {start_time}")

# parser = argparse.ArgumentParser()
# parser.add_argument("--threshold", type=float, default=0.1, help="confidence threshold")
# args = parser.parse_args()

# texts = [["a photo of a tv", "a photo of a dog"]]

# image_paths = glob.glob(f"{input_path}/*")
# print(f"num of image_paths : {len(image_paths)}")

# with mp.Pool(processes=mp.cpu_count()) as pool:
#     results = pool.starmap(process_image, [(path, texts, args.threshold, weights_path) for path in image_paths])

# # Flatten the results list
# res = [item for sublist in results if sublist for item in sublist]
# print(f"num of results : {len(res)}")
# end_time = datetime.now()
# total_time = end_time - start_time

# fields = ['image_name', 'label_name', 'confidence', 'location']

# # Check if file exists to determine whether to write headers
# file_exists = output_path.exists()

# with output_path.open('w', newline='') as f:
#     writer = csv.writer(f)
#     if not file_exists:
#         writer.writerow(fields)
#     writer.writerows(res)

# print(f"Job started at: {start_time}")
# print(f"Job ended at: {end_time}")
# print(f"Total execution time: {total_time}")

In [None]:
instance_type = "ml.m5.2xlarge"
# instance_type = "local"
instance_count=1

In [None]:
if instance_type =='local':
    import os
    from sagemaker.local import LocalSession

    sagemaker_session = LocalSession()
    input_image_path = f"{Path.cwd()}/ecommerce-products/tv"
    model_weight_path = f"{Path.cwd()}/{local_model_weight}"
    output_path = f"{Path.cwd()}/output"
    s3_data_distribution_type="FullyReplicated"
else:
    sagemaker_session = sagemaker.Session()
    input_image_path = f"{s3_input_data_path}/tv"
    model_weight_path = s3_model_weight_path
    output_path = f"s3://{bucket}/{prefix}/output"
    s3_data_distribution_type="ShardedByS3Key"
input_image_path, model_weight_path, output_path

In [None]:
!aws s3 rm $output_path --recursive

In [None]:
from sagemaker.pytorch import PyTorch
from sagemaker.processing import Processor, ScriptProcessor, FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

from time import strftime

In [None]:
current_time = strftime("%m%d-%H%M%s")
i_type = instance_type.replace('.','-')
job_name = f'owl-vit-{i_type}-{instance_count}-{current_time}'

eval_processor = FrameworkProcessor(
    PyTorch,
    framework_version="2.3",
    py_version="py311",
    role=role, 
    instance_count=instance_count,
    instance_type=instance_type,
    sagemaker_session=sagemaker_session
    )


eval_processor.run(
    code="evaluation.py",
    source_dir=source_dir,
    wait=False,
    inputs=[ProcessingInput(source=input_image_path, 
                            input_name="test_data", 
                            destination="/opt/ml/processing/data", 
                            s3_data_distribution_type=s3_data_distribution_type),
            ProcessingInput(source=model_weight_path, 
                            input_name="model_weight", 
                            destination="/opt/ml/processing/weights")
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output", destination=output_path),
    ],
    arguments=["--threshold", "0.1"],
    job_name=job_name
)

In [None]:
eval_processor.sagemaker_session.logs_for_processing_job(job_name, wait=True)

In [None]:
!rm -rf ./output && mkdir ./output
!aws s3 cp $output_path ./output --recursive

In [None]:
import pandas as pd
import os

dfs = []
folder_path = "./output"
for filename in os.listdir(folder_path):
    if filename.endswith('.csv'):
        file_path = os.path.join(folder_path, filename)
        try:
            df = pd.read_csv(file_path)
            dfs.append(df)
            print(f"Successfully read: {filename}")
        except Exception as e:
            print(f"Error reading {filename}: {str(e)}")

if dfs:
    combined_df = pd.concat(dfs, ignore_index=True)
    
    # 결과 출력
    print("\nCombined DataFrame:")
    print(combined_df)
    
    # 추가 정보 출력
    print(f"\n총 행 수: {len(combined_df)}")
    print(f"컬럼: {combined_df.columns.tolist()}")
    
    # 처음 5행 보기
    print("\n처음 5행:")
    print(combined_df.head())
else:
    print("No CSV files found in the output folder.")