In [1]:
from ragatouille import RAGPretrainedModel

In [8]:
import os
os.environ["COLBERT_LOAD_TORCH_EXTENSION_VERBOSE"] = "True"
# Then import and load the model

In [7]:
import torch
print(torch.__version__)

2.6.0+cu124


In [11]:
# Check the available parameters
import inspect
from ragatouille import RAGPretrainedModel
print(inspect.signature(RAGPretrainedModel.from_pretrained))

(pretrained_model_name_or_path: Union[str, pathlib.Path], n_gpu: int = -1, verbose: int = 1, index_root: Optional[str] = None)


In [10]:
rag_model = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0", verbose=True)

[May 21, 11:29:11] Loading segmented_maxsim_cpp extension (set COLBERT_LOAD_TORCH_EXTENSION_VERBOSE=True for more info)...


Using /home/julienrm/.cache/torch_extensions/py312_cu124 as PyTorch extensions root...


KeyboardInterrupt: 

In [3]:
import os
import boto3
from dotenv import load_dotenv
from botocore.exceptions import ClientError
import logging
from typing import List, Dict, Any, Optional, Union, BinaryIO
import mimetypes

import json
import io

class S3Handler:
    """
    A class to handle common S3 operations using boto3.
    This includes listing, creating, deleting buckets and objects,
    uploading and downloading files, and generating presigned URLs.
    It also supports using environment variables for configuration.
    Environment variables:
        - S3_ENDPOINT_URL: The endpoint URL for the S3 service
        - S3_REGION_NAME: The region name for the S3 service
        - SCW_ACCESS_KEY: Access key ID for authentication
        - SCW_SECRET_KEY: Secret access key for authentication
    Example usage:
        s3_handler = S3Handler()
        buckets = s3_handler.list_buckets()
        print(buckets)
        s3_handler.create_bucket('my-new-bucket')
        s3_handler.upload_file('local_file.txt', 'my-new-bucket', 's3_file.txt')
        url = s3_handler.generate_presigned_url('my-new-bucket', 's3_file.txt')
        print(url)
        s3_handler.download_file('my-new-bucket', 's3_file.txt', 'downloaded_file.txt')
    This class requires the `boto3` and `python-dotenv` packages.
    Install them using:
        pip install boto3 python-dotenv
    Ensure to set the environment variables or pass them as arguments.
    """

    def __init__(self, endpoint_url=None, region_name=None,
                 access_key_id=None, secret_access_key=None):
        """
        Initialize the S3 handler with optional credentials.
        If not provided, will use environment variables.
        """
        # Load environment variables if not done already
        load_dotenv()

        # Use provided credentials or fall back to environment variables
        self.endpoint_url = endpoint_url or os.getenv("S3_ENDPOINT_URL")
        self.region_name = region_name or os.getenv("S3_REGION_NAME")
        self.access_key_id = access_key_id or os.getenv("SCW_ACCESS_KEY")
        self.secret_access_key = secret_access_key or os.getenv("SCW_SECRET_KEY")

        # Initialize S3 resource and client
        self.s3 = boto3.resource(
            service_name='s3',
            endpoint_url=self.endpoint_url,
            region_name=self.region_name,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key
        )

        self.client = boto3.client(
            service_name='s3',
            endpoint_url=self.endpoint_url,
            region_name=self.region_name,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key
        )

        # Set up logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def list_buckets(self) -> List[str]:
        """
        List all available buckets.

        Returns:
            List of bucket names
        """
        try:
            buckets = [bucket.name for bucket in self.s3.buckets.all()]
            return buckets
        except Exception as e:
            self.logger.error(f"Error listing buckets: {e}")
            raise

    def create_bucket(self, bucket_name: str, region: Optional[str] = None) -> bool:
        """
        Create a new bucket.

        Args:
            bucket_name: Name of the bucket to create
            region: Region to create the bucket in (optional)

        Returns:
            True if successful, False otherwise
        """
        try:
            create_bucket_config = {}
            if region and region != 'us-east-1':
                create_bucket_config['LocationConstraint'] = region

            if create_bucket_config:
                self.s3.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration=create_bucket_config
                )
            else:
                self.s3.create_bucket(Bucket=bucket_name)

            self.logger.info(f"Bucket {bucket_name} created successfully")
            return True
        except ClientError as e:
            self.logger.error(f"Error creating bucket {bucket_name}: {e}")
            return False

    def delete_bucket(self, bucket_name: str, force: bool = False) -> bool:
        """
        Delete a bucket. If force=True, will delete all objects first.

        Args:
            bucket_name: Name of the bucket to delete
            force: If True, delete all objects in the bucket first

        Returns:
            True if successful, False otherwise
        """
        try:
            bucket = self.s3.Bucket(bucket_name)

            if force:
                bucket.objects.all().delete()

            bucket.delete()
            self.logger.info(f"Bucket {bucket_name} deleted successfully")
            return True
        except ClientError as e:
            self.logger.error(f"Error deleting bucket {bucket_name}: {e}")
            return False

    def list_objects(self, bucket_name: str, prefix: str = '') -> List[Dict[str, Any]]:
        """
        List objects in a bucket with optional prefix filter.

        Args:
            bucket_name: Name of the bucket
            prefix: Prefix to filter objects by

        Returns:
            List of objects with key, size, last_modified
        """
        try:
            bucket = self.s3.Bucket(bucket_name)
            objects = []

            for obj in bucket.objects.filter(Prefix=prefix):
                objects.append({
                    'key': obj.key,
                    'size': obj.size,
                    'last_modified': obj.last_modified
                })

            return objects
        except ClientError as e:
            self.logger.error(f"Error listing objects in bucket {bucket_name}: {e}")
            raise

    def upload_file(self, file_path: str, bucket_name: str,
                   object_key: Optional[str] = None,
                   extra_args: Optional[Dict[str, Any]] = None) -> bool:
        """
        Upload a file to S3.

        Args:
            file_path: Path to the local file
            bucket_name: Name of the bucket
            object_key: Key to use in S3 (defaults to filename if not provided)
            extra_args: Additional arguments for upload (ContentType, ACL, etc.)

        Returns:
            True if successful, False otherwise
        """
        if not object_key:
            object_key = os.path.basename(file_path)

        # Determine content type if not specified in extra_args
        if extra_args is None:
            extra_args = {}

        if 'ContentType' not in extra_args:
            content_type, _ = mimetypes.guess_type(file_path)
            if content_type:
                extra_args['ContentType'] = content_type

        try:
            self.s3.meta.client.upload_file(
                file_path, bucket_name, object_key, ExtraArgs=extra_args
            )
            self.logger.info(f"File {file_path} uploaded to {bucket_name}/{object_key}")
            return True
        except ClientError as e:
            self.logger.error(f"Error uploading file {file_path}: {e}")
            return False

    def upload_directory(self, local_dir, bucket_name, s3_prefix):
        """
        Upload a directory and all its contents to S3, preserving the folder structure.

        Args:
            local_dir: Path to local directory
            bucket_name: Name of the S3 bucket
            s3_prefix: Prefix in S3 where files should be uploaded
        """
        # First, collect all files to determine total size
        all_files = []
        total_size = 0

        for root, dirs, files in os.walk(local_dir):
            for file in files:
                local_file_path = os.path.join(root, file)
                file_size = os.path.getsize(local_file_path)
                relative_path = os.path.relpath(local_file_path, local_dir)
                s3_key = os.path.join(s3_prefix, relative_path).replace("\\", "/")
                all_files.append((local_file_path, s3_key, file_size))
                total_size += file_size

        if not all_files:
            print("No files found to upload.")
            return

        # Create a progress callback
        uploaded_size = 0

        def progress_callback(bytes_transferred):
            nonlocal uploaded_size
            uploaded_size += bytes_transferred
            percent = min(100, int(uploaded_size * 100 / total_size))
            progress_bar = f"[{'#' * (percent // 2)}{' ' * (50 - (percent // 2))}]"
            print(f"\rUploading: {progress_bar} {percent}% ({uploaded_size/1024/1024:.2f} MB / {total_size/1024/1024:.2f} MB)", end="")

        # Upload files with progress tracking
        print(f"Uploading {len(all_files)} files ({total_size/1024/1024:.2f} MB) to {bucket_name}/{s3_prefix}")

        for idx, (local_file_path, s3_key, _) in enumerate(all_files, 1):
            try:
                # Create a callback configuration for this file
                callback = boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS['Callback']
                extra_args = {'Callback': progress_callback}

                # Upload the file
                self.s3.meta.client.upload_file(
                    local_file_path, bucket_name, s3_key,
                    ExtraArgs=extra_args
                )

                # Update progress
                print(f"\rUploaded [{idx}/{len(all_files)}]: {local_file_path} to {bucket_name}/{s3_key}")
            except Exception as e:
                self.logger.error(f"Error uploading {local_file_path}: {e}")

        print(f"\nUpload complete! {len(all_files)} files ({total_size/1024/1024:.2f} MB) uploaded to {bucket_name}/{s3_prefix}")

    def upload_fileobj(self, file_obj: BinaryIO, bucket_name: str,
                      object_key: str,
                      extra_args: Optional[Dict[str, Any]] = None) -> bool:
        """
        Upload a file-like object to S3.

        Args:
            file_obj: File-like object to upload
            bucket_name: Name of the bucket
            object_key: Key to use in S3
            extra_args: Additional arguments for upload (ContentType, ACL, etc.)

        Returns:
            True if successful, False otherwise
        """
        try:
            self.s3.meta.client.upload_fileobj(
                file_obj, bucket_name, object_key, ExtraArgs=extra_args or {}
            )
            self.logger.info(f"File object uploaded to {bucket_name}/{object_key}")
            return True
        except ClientError as e:
            self.logger.error(f"Error uploading file object: {e}")
            return False

    def download_file(self, bucket_name: str, object_key: str,
                     file_path: str) -> bool:
        """
        Download a file from S3.

        Args:
            bucket_name: Name of the bucket
            object_key: Key of the object in S3
            file_path: Path to save the file locally

        Returns:
            True if successful, False otherwise
        """
        try:
            # Ensure directory exists
            os.makedirs(os.path.dirname(file_path), exist_ok=True)

            self.s3.meta.client.download_file(
                bucket_name, object_key, file_path
            )
            self.logger.info(f"File {bucket_name}/{object_key} downloaded to {file_path}")
            return True
        except ClientError as e:
            self.logger.error(f"Error downloading file {bucket_name}/{object_key}: {e}")
            return False

    def download_directory(self, bucket_name: str, s3_prefix: str, local_dir: str) -> bool:
        """
        Download all files with a specific prefix (S3 folder) to a local directory.

        Args:
            bucket_name: Name of the bucket
            s3_prefix: Prefix/folder path in S3 to download
            local_dir: Local directory to save files

        Returns:
            True if all files downloaded successfully, False otherwise
        """
        try:
            # List all objects with the given prefix
            objects = self.list_objects(bucket_name, s3_prefix)

            if not objects:
                self.logger.warning(f"No objects found with prefix {s3_prefix} in bucket {bucket_name}")
                return False

            # Calculate total size for progress tracking
            total_size = sum(obj['size'] for obj in objects)
            downloaded_size = 0

            print(f"Downloading {len(objects)} files ({total_size/1024/1024:.2f} MB) from {bucket_name}/{s3_prefix}")

            # Create download callback
            def progress_callback(bytes_transferred):
                nonlocal downloaded_size
                downloaded_size += bytes_transferred
                percent = min(100, int(downloaded_size * 100 / total_size)) if total_size > 0 else 0
                progress_bar = f"[{'#' * (percent // 2)}{' ' * (50 - (percent // 2))}]"
                print(f"\rDownloading: {progress_bar} {percent}% ({downloaded_size/1024/1024:.2f} MB / {total_size/1024/1024:.2f} MB)", end="")

            # Download each object
            success_count = 0
            for idx, obj in enumerate(objects, 1):
                try:
                    # Determine the local file path
                    # Remove the common prefix to maintain directory structure
                    relative_path = obj['key']
                    if s3_prefix and relative_path.startswith(s3_prefix):
                        relative_path = relative_path[len(s3_prefix):].lstrip('/')

                    local_file_path = os.path.join(local_dir, relative_path)

                    # Ensure directory exists
                    os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

                    # Download with progress callback
                    transfer_config = boto3.s3.transfer.TransferConfig(
                        use_threads=True,
                        max_concurrency=10
                    )
                    transfer = boto3.s3.transfer.S3Transfer(
                        self.client,
                        config=transfer_config
                    )

                    transfer.download_file(
                        bucket_name, obj['key'], local_file_path,
                        callback=progress_callback
                    )

                    success_count += 1
                    print(f"\rDownloaded [{idx}/{len(objects)}]: {obj['key']} to {local_file_path}")

                except Exception as e:
                    self.logger.error(f"Error downloading {obj['key']}: {e}")

            print(f"\nDownload complete! {success_count}/{len(objects)} files ({downloaded_size/1024/1024:.2f} MB) downloaded to {local_dir}")
            return success_count == len(objects)

        except Exception as e:
            self.logger.error(f"Error downloading directory {s3_prefix} from {bucket_name}: {e}")
            return False

    def get_object(self, bucket_name: str, object_key: str) -> Dict[str, Any]:
        """
        Get an object and its metadata from S3.

        Args:
            bucket_name: Name of the bucket
            object_key: Key of the object in S3

        Returns:
            Dictionary with object content and metadata
        """
        try:
            response = self.client.get_object(Bucket=bucket_name, Key=object_key)
            return {
                'Body': response['Body'].read(),
                'ContentType': response.get('ContentType'),
                'ContentLength': response.get('ContentLength'),
                'LastModified': response.get('LastModified'),
                'Metadata': response.get('Metadata', {})
            }
        except ClientError as e:
            self.logger.error(f"Error getting object {bucket_name}/{object_key}: {e}")
            raise

    def delete_object(self, bucket_name: str, object_key: str) -> bool:
        """
        Delete an object from S3.

        Args:
            bucket_name: Name of the bucket
            object_key: Key of the object in S3

        Returns:
            True if successful, False otherwise
        """
        try:
            self.s3.Object(bucket_name, object_key).delete()
            self.logger.info(f"Object {bucket_name}/{object_key} deleted successfully")
            return True
        except ClientError as e:
            self.logger.error(f"Error deleting object {bucket_name}/{object_key}: {e}")
            return False

    def delete_objects(self, bucket_name: str, object_keys: List[str]) -> Dict[str, List[str]]:
        """
        Delete multiple objects from S3 in a single request.

        Args:
            bucket_name: Name of the bucket
            object_keys: List of object keys to delete

        Returns:
            Dictionary with 'Deleted' and 'Errors' lists
        """
        if not object_keys:
            return {'Deleted': [], 'Errors': []}

        try:
            objects = [{'Key': key} for key in object_keys]
            response = self.client.delete_objects(
                Bucket=bucket_name,
                Delete={'Objects': objects}
            )

            deleted = [obj.get('Key') for obj in response.get('Deleted', [])]
            errors = [f"{err.get('Key')}: {err.get('Message')}" for err in response.get('Errors', [])]

            if deleted:
                self.logger.info(f"Deleted {len(deleted)} objects from {bucket_name}")
            if errors:
                self.logger.warning(f"Failed to delete {len(errors)} objects from {bucket_name}")

            return {
                'Deleted': deleted,
                'Errors': errors
            }
        except ClientError as e:
            self.logger.error(f"Error batch deleting objects from {bucket_name}: {e}")
            raise

    def copy_object(self, source_bucket: str, source_key: str,
                   dest_bucket: str, dest_key: str) -> bool:
        """
        Copy an object within S3.

        Args:
            source_bucket: Source bucket name
            source_key: Source object key
            dest_bucket: Destination bucket name
            dest_key: Destination object key

        Returns:
            True if successful, False otherwise
        """
        try:
            copy_source = {
                'Bucket': source_bucket,
                'Key': source_key
            }
            self.s3.meta.client.copy(copy_source, dest_bucket, dest_key)
            self.logger.info(f"Object {source_bucket}/{source_key} copied to {dest_bucket}/{dest_key}")
            return True
        except ClientError as e:
            self.logger.error(f"Error copying object: {e}")
            return False

    def generate_presigned_url(self, bucket_name: str, object_key: str,
                              expiration: int = 3600, http_method: str = 'GET') -> Optional[str]:
        """
        Generate a presigned URL for an S3 object.

        Args:
            bucket_name: Name of the bucket
            object_key: Key of the object in S3
            expiration: Time in seconds until the URL expires
            http_method: HTTP method to allow ('GET', 'PUT')

        Returns:
            Presigned URL or None if error
        """
        try:
            url = self.client.generate_presigned_url(
                'get_object' if http_method == 'GET' else 'put_object',
                Params={'Bucket': bucket_name, 'Key': object_key},
                ExpiresIn=expiration
            )
            return url
        except ClientError as e:
            self.logger.error(f"Error generating presigned URL: {e}")
            return None

In [1]:
# # Example usage
#         # """
#         # Upload a directory and all its contents to S3, preserving the folder structure.

#         # Args:
#         #     local_dir: Path to local directory
#         #     bucket_name: Name of the S3 bucket
#         #     s3_prefix: Prefix in S3 where files should be uploaded
#         # """

# s3_handler = S3Handler()
# local_directory = "data/Projects/Projet Demo/Boîte mail de Céline/processed/celine.guyon"
# bucket_name = "olkoa-celine"
# s3_prefix = ""

# s3_handler.upload_directory(local_directory, bucket_name, s3_prefix)

In [7]:
s3_handler = S3Handler()
buckets = s3_handler.list_buckets()
for bucket in buckets:
    print(f"- {bucket}")

s3_handler.list_objects("olkoa-celine")

objects = s3_handler.list_objects("olkoa-celine")
for obj in objects:
    print(f"- {obj['key']} ({obj['size']} bytes, modified: {obj['last_modified']})")


- highco
- olkoa-celine
- olkoa-joel


KeyboardInterrupt: 

In [8]:
s3_handler = S3Handler()
buckets = s3_handler.download_directory(
    bucket_name = "olkoa-celine",
    s3_prefix = "",
    local_dir = "data/Projects/Projet Demo/Boîte mail de Céline/processed/celine.guyon")

Downloading 19327 files (3518.13 MB) from olkoa-celine/
Downloaded [1/19327]: Archive/1.eml to data/Projects/Projet Demo/Boîte mail de Céline/processed/celine.guyon/Archive/1.eml
Downloaded [2/19327]: Archive/10.eml to data/Projects/Projet Demo/Boîte mail de Céline/processed/celine.guyon/Archive/10.eml
Downloading: [                                                  ] 0% (1.19 MB / 3518.13 MB)

KeyboardInterrupt: 