Move all image files from one S3 bucket to another S3 bucket, but only if the image has no transparent pixels.

Objective: Write a Python script that uses the Boto3 library to accomplish the following:

* List all the image files in a given S3 bucket
* Check if each image file has transparent pixels
* If an image file has no transparent pixels, copy it to a different S3 bucket
* If an image file has transparent pixels, log it in a separate file

Guidelines:

* Your script should take the name of the source and destination buckets as input
* You should use the Boto3 library to interact with S3
* You should use the Pillow library to check for transparent pixels in an image
* Your script should handle any errors that may occur during the opening of image file, copy process and anywhere else you deem necessary
* Your script should be well commented and easy to understand
* Your script should be executed from the command line

In [116]:
import boto3
import os
from PIL import Image
s3 = boto3.client("s3")

## Creating Buckets

In [117]:
source_bucket = "astro-source-wavebreak"
dest_bucket = "astro-dest-wavebreak"

In [118]:
s3.create_bucket(Bucket = source_bucket) # Source bucket

{'ResponseMetadata': {'RequestId': 'PND58Z658NDTH6A0',
  'HostId': 'bA4hvEXPIgCOFWembTZfG2qzaSfg/DIknTUkUp/1MN8O0Jlq8cSkF+aVLcqS5w0pMC0WXJuCAFc=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'bA4hvEXPIgCOFWembTZfG2qzaSfg/DIknTUkUp/1MN8O0Jlq8cSkF+aVLcqS5w0pMC0WXJuCAFc=',
   'x-amz-request-id': 'PND58Z658NDTH6A0',
   'date': 'Thu, 02 Feb 2023 18:32:50 GMT',
   'location': '/astro-source-wavebreak',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/astro-source-wavebreak'}

In [119]:
s3.create_bucket(Bucket = dest_bucket) # Destination bucket

{'ResponseMetadata': {'RequestId': 'PND6B568ATHAVF2Y',
  'HostId': 'ZLGflegW/OVJGTKa28azs/5j6l8Y7aY4Y7aMVO3ivsTl5TwIdshqig9uvi1KITZi12xjXUw1nxU=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ZLGflegW/OVJGTKa28azs/5j6l8Y7aY4Y7aMVO3ivsTl5TwIdshqig9uvi1KITZi12xjXUw1nxU=',
   'x-amz-request-id': 'PND6B568ATHAVF2Y',
   'date': 'Thu, 02 Feb 2023 18:32:50 GMT',
   'location': '/astro-dest-wavebreak',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/astro-dest-wavebreak'}

Copying all images in the source bucket:

In [120]:
# The local folder that contains the images
local_folder = "C:/Users/astro/Documents/GitHub/Data-engineer-wavebreak/Images/Faces from (Nice Adults1)"

# Iterate through all the files in the local folder
for filename in os.listdir(local_folder):
    # Check if the file is an image
    if filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".png"):
        # Construct the full path to the file
        file_path = os.path.join(local_folder, filename)
        # Upload the file to the S3 bucket
        s3.upload_file(file_path, source_bucket, filename)

## Checking items in a bucket

In [121]:
def check_bucket(bucket_name):
    """
    Prints out the number of items in the bucket."""
    # Connect to S3 using the default session
    s3 = boto3.resource("s3")

    # Specify the name of the bucket
    bucket_name = bucket_name

    # Get a reference to the bucket
    bucket = s3.Bucket(bucket_name)

    # Iterate over the objects in the bucket
    for obj in bucket.objects.all():
        print(obj.key)
    print("**Note:** Make sure to define s3 again.")

In [122]:
# check_bucket(source_bucket)

## Deleting Items in a Bucket:

In [123]:
def delete_items_bucket(bucket_name):

    s3 = boto3.resource("s3")

    # Specify the name of the bucket
    bucket_name = source_bucket

    # Get a reference to the bucket
    bucket = s3.Bucket(bucket_name)

    # Collect a list of objects to delete
    objects_to_delete = [{"Key": obj.key} for obj in bucket.objects.all()]

    # Delete the objects in the list
    bucket.delete_objects(Delete={ "Objects": objects_to_delete })
    print("**Note:** Make sure to define s3 again.")

In [124]:
# delete_items_bucket(source_bucket)

In [125]:
def has_transparency(img_path):
    # Open the image
    img = Image.open(img_path)
    # Check if the image has an alpha channel (i.e., transparency)
    return img.mode in ("RGBA", "LA") and img.getchannel("A").getextrema()[1] > 0

In [126]:
result = s3.list_objects(Bucket=source_bucket)

# Get the list of objects from the result
objects = result.get("Contents")

# Log file to store the names of images with transparent pixels
log_file = open("transparent_images.log", "w")

# Loop through the objects in the source bucket
for obj in objects:
    # Get the key (i.e., the name) of the object
    key = obj.get("Key")

    # Download the object from the source bucket
    s3.download_file(source_bucket, key, key)

    # Check if the image has any transparent pixels
    if has_transparency(key):
        # If the image has transparent pixels, log it in the log file
        log_file.write(f"{key}\n")
        print("Image has transparent pixels. Saving in Log")
    else:
        # If the image does not have transparent pixels, transfer it to the destination bucket
        print("Image has no transparent pixels: Saving in another bucket.")
        s3.upload_file(key, dest_bucket, key)

# Close the log file
log_file.close()

Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.
Image has no transparent pixels: Saving in another bucket.


In [127]:
check_bucket(dest_bucket)

face_1.png
face_2.png
face_3.png
face_4.png
face_5.png
face_6.png
face_7.png
face_8.png
face_9.png
**Note:** Make sure to define s3 again.
