In [5]:
# Initialize flags to track the success of all steps, credential issues, and connectivity issues
all_steps_successful = True
credentials_failed = False
connectivity_failed = False

# Step 1: Import the necessary packages and check if 's3transfer' and 'boto3' are installed
try:
    import boto3
    from s3transfer import S3Transfer
    import os
    import botocore.exceptions
    print("Step 1: Importing 's3transfer', 'boto3', and dependencies passed.")
except ImportError as e:
    print(f"Step 1 failed: {e}")
    all_steps_successful = False

# Step 2: Set up AWS credentials and configure the S3 client
try:
    # Set up a session with explicit credentials
    session = boto3.Session(
        aws_access_key_id='your-access-key-id',
        aws_secret_access_key='your-secret-access-key',
        region_name='your-region'  # e.g., 'us-west-1'
    )

    s3 = session.client('s3')
    transfer = S3Transfer(s3)

    # Define the S3 bucket and file paths
    bucket_name = 'your-test-bucket'
    upload_file_path = 'test_upload.txt'
    download_file_path = 'test_download.txt'
    print("Step 2: AWS S3 client setup passed.")
except botocore.exceptions.NoCredentialsError as e:
    print(f"Step 2 failed: No AWS credentials found. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.PartialCredentialsError as e:
    print(f"Step 2 failed: Incomplete AWS credentials found. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.EndpointConnectionError as e:
    print(f"Step 2 failed: Could not connect to the endpoint. {e}")
    connectivity_failed = True
    all_steps_successful = False
except Exception as e:
    print(f"Step 2 failed: {e}")
    credentials_failed = True
    all_steps_successful = False

# Step 3: Test uploading a file to an S3 bucket
try:
    # Create a test file to upload
    with open(upload_file_path, 'w') as f:
        f.write('This is a test file for s3transfer upload.')

    # Upload the file to S3
    transfer.upload_file(upload_file_path, bucket_name, 'test_upload.txt')
    print("Step 3: File upload to S3 passed.")
except botocore.exceptions.ClientError as e:
    print(f"Step 3 failed: Upload failed due to credentials or other access issues. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.EndpointConnectionError as e:
    print(f"Step 3 failed: Could not connect to the endpoint. {e}")
    connectivity_failed = True
    all_steps_successful = False
except Exception as e:
    print(f"Step 3 failed: {e}")
    all_steps_successful = False

# Step 4: Test downloading a file from an S3 bucket
try:
    # Download the file from S3
    transfer.download_file(bucket_name, 'test_upload.txt', download_file_path)

    # Verify that the file was downloaded correctly
    with open(download_file_path, 'r') as f:
        content = f.read()
    assert content == 'This is a test file for s3transfer upload.', "Downloaded file content does not match."
    print("Step 4: File download from S3 passed.")
except botocore.exceptions.ClientError as e:
    print(f"Step 4 failed: Download failed due to credentials or other access issues. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.EndpointConnectionError as e:
    print(f"Step 4 failed: Could not connect to the endpoint. {e}")
    connectivity_failed = True
    all_steps_successful = False
except Exception as e:
    print(f"Step 4 failed: {e}")
    all_steps_successful = False

# Step 5: Test multipart uploads to an S3 bucket
try:
    # Create a larger test file for multipart upload
    large_upload_file_path = 'large_test_upload.txt'
    with open(large_upload_file_path, 'wb') as f:
        f.seek(1024 * 1024 * 10)  # 10 MB file
        f.write(b'\0')

    # Upload the file to S3 using multipart upload
    transfer.upload_file(large_upload_file_path, bucket_name, 'large_test_upload.txt')
    print("Step 5: Multipart file upload to S3 passed.")
except botocore.exceptions.ClientError as e:
    print(f"Step 5 failed: Upload failed due to credentials or other access issues. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.EndpointConnectionError as e:
    print(f"Step 5 failed: Could not connect to the endpoint. {e}")
    connectivity_failed = True
    all_steps_successful = False
except Exception as e:
    print(f"Step 5 failed: {e}")
    all_steps_successful = False

# Step 6: Test multipart downloads from an S3 bucket
try:
    # Download the large file from S3
    large_download_file_path = 'large_test_download.txt'
    transfer.download_file(bucket_name, 'large_test_upload.txt', large_download_file_path)

    # Verify that the file was downloaded correctly
    assert os.path.getsize(large_download_file_path) == 1024 * 1024 * 10, "Downloaded file size does not match."
    print("Step 6: Multipart file download from S3 passed.")
except botocore.exceptions.ClientError as e:
    print(f"Step 6 failed: Download failed due to credentials or other access issues. {e}")
    credentials_failed = True
    all_steps_successful = False
except botocore.exceptions.EndpointConnectionError as e:
    print(f"Step 6 failed: Could not connect to the endpoint. {e}")
    connectivity_failed = True
    all_steps_successful = False
except Exception as e:
    print(f"Step 6 failed: {e}")
    all_steps_successful = False

# Step 7: Clean up the uploaded files and verify operations
try:
    # Delete files from S3
    s3.delete_object(Bucket=bucket_name, Key='test_upload.txt')
    s3.delete_object(Bucket=bucket_name, Key='large_test_upload.txt')

    # Delete local files
    os.remove(upload_file_path)
    os.remove(download_file_path)
    os.remove(large_upload_file_path)
    os.remove(large_download_file_path)
    print("Step 7: Clean up operations passed.")
except Exception as e:
    print(f"Step 7 failed: {e}")
    all_steps_successful = False

# Final Confirmation
if all_steps_successful and not credentials_failed:
    print("All tests passed.")
elif credentials_failed:
    print("One or more tests failed due to credentials issues.")
else:
    print("One or more tests failed.")


Step 1: Importing 's3transfer', 'boto3', and dependencies passed.
Step 2: AWS S3 client setup passed.
Step 3 failed: Could not connect to the endpoint. Could not connect to the endpoint URL: "https://your-test-bucket.s3.your-region.amazonaws.com/test_upload.txt"
Step 4 failed: Could not connect to the endpoint. Could not connect to the endpoint URL: "https://your-test-bucket.s3.your-region.amazonaws.com/test_upload.txt"
Step 5 failed: Could not connect to the endpoint. Could not connect to the endpoint URL: "https://your-test-bucket.s3.your-region.amazonaws.com/large_test_upload.txt?uploads"
Step 6 failed: Could not connect to the endpoint. Could not connect to the endpoint URL: "https://your-test-bucket.s3.your-region.amazonaws.com/large_test_upload.txt"
Step 7 failed: Could not connect to the endpoint URL: "https://your-test-bucket.s3.your-region.amazonaws.com/test_upload.txt"
One or more tests failed.
