In [52]:
import boto3
import os
import cv2
import io
from botocore.exceptions import ClientError

In [53]:
minio_client = boto3.client(
    "s3",
    endpoint_url="http://localhost:9000",
    aws_access_key_id="admin",
    aws_secret_access_key="admin123"
)

In [None]:
def upload_img(image_path, bucket_name="imgs"):
    try:
        minio_client.head_bucket(Bucket=bucket_name)
    except ClientError:
        minio_client.create_bucket(Bucket=bucket_name)

    object_name = image_path.replace("imgs/", "", 1)

    _, extension = os.path.splitext(object_name)
    extension = extension.lstrip(".")

    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Imagem {image_path} não encontrada!")

    _, buffer = cv2.imencode(f".{extension}", image)
    image_bytes = io.BytesIO(buffer)

    minio_client.put_object(
        Bucket=bucket_name,
        Key=object_name,
        Body=image_bytes.getvalue(),
        ContentType=f"image/{extension}"
    )
    print(f"Imagem {object_name} enviada com sucesso para o bucket '{bucket_name}'!")

def upload_imgs(images_path, bucket_name="imgs"):
    for image_path in images_path:
        upload_img(image_path, bucket_name)

In [None]:
paths = ["imgs/animals/cat.jpeg", "imgs/animals/dog.jpg"]
upload_imgs(paths)

In [59]:
import io # Para lidar com o stream de bytes
bucket_name = 'dataset'

def get_single_object(object_name):
    try:
        response = minio_client.get_object(
            Bucket=bucket_name,
            Key=object_name
        )

        file_data = response['Body'].read()
        response['Body'].close() 
        
        return file_data

    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'NoSuchKey':
            print(f"Erro: Objeto '{object_name}' não encontrado no bucket '{bucket_name}'.")
        else:
            print(f"Erro ao baixar objeto: {e}")
        return None


In [None]:
import cv2
import numpy as np
np_array = np.frombuffer(get_single_object('0a958e0c02198e07ce2e86d47'), np.uint8)
img_original = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
img_original

array([[[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]],

       [[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]],

       [[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]],

       ...,

       [[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]],

       [[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]],

       [[255, 255, 255],
        [255, 255, 255],
        [255, 255, 255],
        ...,
        [255, 255, 255],
        [255, 255, 255],
        [255, 255, 255]]

In [62]:
def list_all_objects(prefix=''):
    all_objects = []

    paginator = minio_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                all_objects.append({
                    'Key': obj['Key'],
                    'Size': obj['Size'],
                    'LastModified': obj['LastModified']
                })
    
    if not all_objects:
        print("Atenção: Nenhum objeto encontrado.")
        
    return all_objects

list_all_objects()

[{'Key': '0a958e0c02198e07ce2e86d47',
  'Size': 4138,
  'LastModified': datetime.datetime(2025, 10, 28, 17, 33, 43, 539000, tzinfo=tzutc())},
 {'Key': 'object_name',
  'Size': 4138,
  'LastModified': datetime.datetime(2025, 10, 28, 17, 22, 25, 733000, tzinfo=tzutc())}]