## Credentials

In [4]:
!mkdir ~/.aws

In [5]:
%%writefile ~/.aws/credentials

[default]
aws_access_key_id=
aws_secret_access_key=

Writing /home/cyprien/.aws/credentials


In [6]:
%%writefile ~/.aws/config

[default]
region=eu-west-3

Writing /home/cyprien/.aws/config


## Export to s3

In [17]:
import os

# Set the directory you want to search
folder_path = '../../basalt_neurips_data/MineRLBasaltMakeWaterfall-v0/'

# Set the file extensions you want to search for
extensions = ["_preprocessed.mp4", "_preprocessed.jsonl", "_preprocessed_annotations.jsonl"]

files = []

# Iterate through the files in the directory
for file in os.listdir(folder_path):
    # Check if the file name ends with any of the specified extensions
    if any(file.endswith(ext) for ext in extensions):
        # If the file name ends with any of the extensions, print the file name
        files.append(file)

len(files), len(set(files))

(12654, 12654)

In [None]:
from tqdm import tqdm
import sys
sys.path.append("../")
from utils.invalid_files import pp_video, pp_actions, pp_annotations

from concurrent.futures import ThreadPoolExecutor
import boto3
import os


def export_files_to_s3(bucket_name, s3_prefix, local_folder, file_extensions):
    # Create an S3 client
    s3 = boto3.client('s3')

    files = []

    for file_name in os.listdir(local_folder):
        if any([file_name.endswith(file_extension) for file_extension in file_extensions]):
            files.append(file_name)

    # Create a ThreadPoolExecutor with a certain number of threads
    with ThreadPoolExecutor(max_workers=50) as executor:
      # Iterate through all the files in the folder
        for file_name in tqdm(files[7500:]):
        # Check if the file has the desired extension
            if any([file_name.endswith(ext) for ext in file_extensions]):
              # Construct the full path to the file
              file_path = os.path.join(folder_path, file_name)
              # Construct the full S3 object key (including the path within the bucket)
              s3_key = os.path.join(s3_prefix, file_name)
              # Check if the file already exists in the S3 bucket
            try:
                s3_object = s3.head_object(Bucket=bucket_name, Key=s3_key)
                # If the file exists, check if the size of the local file is the same as the size of the S3 object
                if os.path.getsize(file_path) == s3_object['ContentLength']:
                # If the sizes are the same, skip the file
                    continue
            except s3.exceptions.ClientError as e:
              # If the file does not exist, or if there is any other error, catch the exception and proceed with the upload
                pass # TODO

            # Submit a task to the executor to upload the file to S3
            executor.submit(s3.upload_file, file_path, bucket_name, s3_key)
            # try:
            #     future.result()
            # except Exception as e:
            #     print(f'Warning: failed to upload file {file_name} with function call export_files_to_s3({bucket_name}, {s3_prefix}, {local_folder}, {file_extensions})')



bucket_name = 'basalt-neurips'
folder_path = '../../basalt_neurips_data/MineRLBasaltMakeWaterfall-v0/'
s3_prefix = 'basalt_neurips_data/MineRLBasaltMakeWaterfall-v0/'

export_files_to_s3(bucket_name, s3_prefix, folder_path, [pp_video, pp_actions, pp_annotations])

 75%|███████▍  | 3854/5154 [07:05<04:45,  4.56it/s]  

## Import from s3

In [None]:
from tqdm import tqdm
import sys
sys.path.append("../")

from utils.invalid_files import pp_video, pp_actions, pp_annotations
import boto3
import os


import boto3
import os
from concurrent.futures import ThreadPoolExecutor

def download_files_from_s3(bucket_name, s3_prefix, local_folder, file_extensions):
    # Create an S3 client
    s3 = boto3.client('s3')

    continuation_token = None
    objects = []

    # Loop until all objects have been retrieved
    while True:
        # List the objects in the specified S3 prefix
        response = s3.list_objects(
            Bucket=bucket_name, Prefix=s3_prefix, ContinuationToken=continuation_token
        )
        # Append the objects to the list
        objects.extend(response['Contents'])
        # Check if there are more objects to retrieve
        if 'NextContinuationToken' in response:
            # If there are more objects, set the continuation token and continue the loop
            continuation_token = response['NextContinuationToken']
        else:
            # If there are no more objects, break out of the loop
            break

    # Create a ThreadPoolExecutor with a certain number of threads
    with ThreadPoolExecutor(max_workers=30) as executor:
        # Iterate through the objects
        for obj in objects['Contents']:
            # Get the object key (i.e., the file name)
            key = obj['Key']
            # Check if the file has the desired extension
            if any([key.endswith(file_extension) for file_extension in file_extensions]):
                # Construct the full path to the local file
                local_file_path = os.path.join(local_folder, key)
                # Check if the file already exists locally
                try:
                    local_file_size = os.path.getsize(local_file_path)
                    # If the file exists, check if the size of the local file is the same as the size of the S3 object
                    if local_file_size == obj['Size']:
                        # If the sizes are the same, skip the file
                        continue
                except FileNotFoundError:
                    # If the file does not exist, proceed with the download
                    pass
                # Submit a task to the executor to download the file from S3
                future = executor.submit(s3.download_file, bucket_name, key, local_file_path)


download_files_from_s3('basalt-neurips', 'basalt_neurips_data/MineRLBasaltMakeWaterfall-v0/', '../../', [pp_video, pp_actions, pp_annotations])