# Python Generators

This Jupyter notebook explores how we can read very large S3 buckets - buckets with many, many
files - using Python generators and very elegant data pipelines.

In [None]:
# Install PyFunctional for our data pipeline and boto3 for AWS S3 bucket access
!pip install pyfunctional boto3

In [None]:
# All the boring imports
import gzip
import json
from typing import Any, Generator
from dataclasses import dataclass

# Import the very nice PyFunctional library: https://github.com/EntilZha/PyFunctional
# See also: https://pyfunctional.pedro.ai/#transformations-and-actions-apis
from functional import seq  # type: ignore[reportMissingTypeStubs]

# Import the AWS library boto3: https://pypi.org/project/boto3/
# We want to connect to S3 without any authentication, so we also grab the other two imports
import boto3  # type: ignore[reportMissingTypeStubs]
from botocore import UNSIGNED  # type: ignore[reportMissingTypeStubs]
from botocore.client import Config  # type: ignore[reportMissingTypeStubs]

## Configuration

In [None]:
# S3 constants
BUCKET_NAME = "mccarter-blog"
FILE_PREFIX = "2023-02-18/small-files/"

## Connect to S3

In [None]:
# Connect to S3 anonymously - the S3 bucket has permissions set to allow this
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))  # type: ignore

## Traditional code

In [None]:
def traditional_approach(
        s3,  # type: ignore
        bucket_name: str,
        prefix: str,
    ):
    # List the objects in our S3 Bucket
    s3_response: dict[str, Any] = s3.list_objects_v2(
            Bucket=bucket_name, Prefix=prefix
        )  # type: ignore

    # Iterate through each object in the response
    for obj in s3_response.get("Contents", []):  # type: ignore
        # Get the object key (i.e., file path)
        key: str = obj.get("Key")

        # Skip directories - we only process files directly where we're told to
        if key.endswith("/"):
            continue

        # Download the given file
        print("Downloading file: ", key)
        obj_response = s3.get_object(Bucket=bucket_name, Key=key)  # type: ignore
        content: bytes = obj_response["Body"].read()  # type: ignore

        # Decompress and parse the JSON .gzip data file
        json_data: dict[Any,Any] = json.loads(gzip.decompress(content))

        # Process our JSON data
        for entry in json_data:
            rgb = entry["rgb"]

            # Determine if the colour is reddish
            is_reddish = rgb[0] > rgb[1] and rgb[0] > rgb[2]
            if is_reddish:
                print( entry )

traditional_approach(s3, BUCKET_NAME, FILE_PREFIX)

## Our business / domain layer

Here we define our simple domain model

In [19]:
@dataclass(frozen=True)
class ColourId:
    """A very small data class that holds a colour name and details."""

    id: str
    name: str
    base: str
    hex: str
    rgb: tuple[int, int, int]

### Some simple questions we can ask of our colour

In [20]:
def is_colour_reddish(colour: ColourId) -> bool:
    return colour.rgb[0] > colour.rgb[1] and colour.rgb[0] > colour.rgb[2]


def is_colour_muted(colour: ColourId) -> bool:
    return colour.rgb[0] < 128 and colour.rgb[1] < 128 and colour.rgb[2] < 128

## Helper functions

These are small helper functions.

Each function does exactly one task, following the Single Responsibility Principal.

We can then easily compose these functions into a simple data pipeline

In [21]:
def list_s3_bucket_file_keys(
    s3,  # type: ignore
    bucket_name: str,
    prefix: str,
) -> Generator[str, None, None]:
    """Returns a generator of file names (technically S3 bucket keys) from the given S3 bucket
    that have the given path prefix.

    Args:
        s3: The boto3 S3 client object
        bucket_name (str): The name of the S3 bucket to retrieve data from
        prefix (str): The "file path" prefix of the files (S3 bucket keys) to be returned.
            So all file paths returned from this function will have this prefix.

    Yields:
        Generator[str, None, None]: A sequence of matching file names (S3 bucket keys).
    """

    # List all objects in the bucket with the specified "file/folder" prefix
    # See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_objects_v2.html
    response: dict[str, Any] = s3.list_objects_v2(
        Bucket=bucket_name, Prefix=prefix
    )  # type: ignore

    # Iterate through each object in the response
    for obj in response.get("Contents", []):  # type: ignore
        # Get the object key (i.e., file path)
        key: str = obj.get("Key")

        # Skip directories - this function only returns file paths
        if key.endswith("/"):
            continue

        yield key

test_keys = list_s3_bucket_file_keys(s3, BUCKET_NAME, FILE_PREFIX)
test_results = seq(test_keys).take(3)  # type: ignore
expected = [
    "2023-02-18/small-files/black.json.gz",
    "2023-02-18/small-files/blue.json.gz",
    "2023-02-18/small-files/gray.json.gz",
]
assert test_results == expected, f"Expected {expected} but got {test_results}"

In [22]:
def download_s3_file(s3, bucket_name: str, key: str) -> bytes:  # type: ignore
    """Download the given file (S3 bucket key) from S3 and return the raw bytes

    Args:
        s3: The boto3 S3 client object
        bucket_name (str): The name of the S3 bucket to retrieve data from
        key (str): The S3 key (file name if you will) of the object to download

    Returns:
        bytes: The raw bytes of the given file from S3
    """
    obj_response = s3.get_object(Bucket=bucket_name, Key=key)  # type: ignore
    content: bytes = obj_response["Body"].read()  # type: ignore
    return content


# Do something with the content (e.g., print it)
content = download_s3_file(s3, BUCKET_NAME, "2023-02-18/small-files/blue.json.gz")
assert type(content) == bytes, f"Expected {type(bytes)} but got {type(content)}"

In [23]:
def read_binary_file(file_name: str) -> bytes:
    """A very simple function that reads the byte for the given file name

    Args:
        file_name (str): The name of the file to read

    Returns:
        bytes: The raw bytes of the given binary file
    """
    with open(file_name, "rb") as file:  # r for reading, b for binary file
        return file.read()

## Data access layer

Our data access layer code is very simle - just a helper function to convert a JSON object
into an instance of our `ColourId` domain object

In [24]:
def create_colour_from_json(data: dict[str, Any]) -> ColourId:
    return ColourId(
        data["id"],
        data["name"],
        data["base"],
        data["hex"],
        data["rgb"],
    )

## The usual imperative way improved

In [25]:
result: list[ColourId] = []

for s3_key in list_s3_bucket_file_keys(s3, BUCKET_NAME, FILE_PREFIX):  # type: ignore
    downloaded_file_bytes = download_s3_file(s3, BUCKET_NAME, s3_key)  # type: ignore
    decompressed_text = gzip.decompress(downloaded_file_bytes)
    json_data = json.loads(decompressed_text)

    for colour_dict in json_data:
        colour = create_colour_from_json(colour_dict)

        # Apply our filters
        if not is_colour_reddish(colour):
            continue
        if not is_colour_muted(colour):
            continue

        result.append(colour)

print(f"Found {len(result)} colours")
print(result[0])

Found 76 colours
ColourId(id='b6907772-f286-4df1-b699-1d9dee8fdbb2', name='Asphalt', base='black', hex='#0C0404', rgb=[12, 4, 4])


## The data pipeline examples

Our first data pipeline just reads some test files from our local drive.

In [33]:
test_files = [
    "./small-data-files/purple.json.gz",
    "./small-data-files/gray.json.gz",
    "./small-data-files/green.json.gz",
]

result: list[ColourId] = (  # type: ignore
    seq(test_files)  # type: ignore
    .map(read_binary_file)
    .map(gzip.decompress)  # type: ignore
    .map(json.loads)  # type: ignore
    .flatten()  # type: ignore
    .to_list()  # type: ignore
)

print(f"Found {len(result)} colours")
print(result[0])
# print(json.dumps(result, indent=2))

Found 185 colours
{'id': '1f1b8535-c724-4fd0-8b39-14af954a6642', 'name': 'Lavender', 'base': 'purple', 'hex': '#A689E1', 'rgb': [166, 137, 225]}


Our second data pipeline reads all of our .gz compressed JSON files from the S3 bucket.

We:
1. Get the list of all the files in our S3 bucket (prefix) folder 
2. Download the raw bytes of the file
3. Decompress the in-memory bytes to get the original .json file contents
4. Parse the in-memory JSON data into a Python list of dictionaries
5. Convert each dictionary into our Python `ColourId` domain object
6. Then we convert the entire thing into an in-memory list

In [32]:
result: list[ColourId] = (
    seq(list_s3_bucket_file_keys(s3, BUCKET_NAME, FILE_PREFIX))
    .map(lambda key: download_s3_file(s3, BUCKET_NAME, key))
    .map(gzip.decompress)  # type: ignore
    .map(json.loads)  # type: ignore
    .flatten()  # type: ignore
    .map(create_colour_from_json)  # type: ignore
    .filter(is_colour_reddish)  # type: ignore
    .filter(is_colour_muted)  # type: ignore
    .to_list()  # type: ignore
)

print(f"Found {len(result)} colours")
print(result[0])

Found 76 colours
ColourId(id='b6907772-f286-4df1-b699-1d9dee8fdbb2', name='Asphalt', base='black', hex='#0C0404', rgb=[12, 4, 4])
