# Setup
Wir installieren Apache Beam mit Google-Support sowie die Python-Bibliothek Pillow, die zum Einlesen, Bearbeiten und Speichern von Bildern verwendet wird.

In [1]:
!pip install "apache-beam[gcp]"
!pip install "apache-beam[interactive]"
!pip install pillow

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [4]:
import apache_beam as beam
import requests

EnumDefinitionError: May only use integers in Enum definitions.  Found: __static_attributes__ = ()

In [None]:
movie_posters = [
    "https://image.tmdb.org/t/p/original/hzXSE66v6KthZ8nPoLZmsi2G05j.jpg",
    "https://image.tmdb.org/t/p/original/sizg1AU8f8JDZX4QIgE4pjUMBvx.jpg",
    "https://image.tmdb.org/t/p/original/6FfCtAuVAW8XJjZ7eWeLibRLWTw.jpg",
    "https://image.tmdb.org/t/p/original/sV3kIAmvJ9tPz4Lq5fuf9LLMxte.jpg",
    "https://image.tmdb.org/t/p/original/rMMB3v6jYHjsvXRNJYESacoTD7j.jpg",
    "https://image.tmdb.org/t/p/original/hQSdrXBYTbLGHYDIseHkBOPXTgL.jpg",
    "https://image.tmdb.org/t/p/original/t57NXzTrwp5rnPXiaX4BThrDxt1.jpg",
    "https://image.tmdb.org/t/p/original/5uD4dxNX8JKFjWKYMHyOsqhi5pN.jpg",
    "https://image.tmdb.org/t/p/original/z8onk7LV9Mmw6zKz4hT6pzzvmvl.jpg",
    "https://image.tmdb.org/t/p/original/3Z0oPHyLnk3Vx6ZMC1MiVwIrKhO.jpg",
    "https://image.tmdb.org/t/p/original/fToQDmkBtiXYGh9xfgAh3gpo6GZ.jpg",
    "https://image.tmdb.org/t/p/original/qfx2EENW1sOpKNVKLzr7VOhlxkt.jpg"
]

# Pipeline 1.0
Hier sehen wir die wohl einfachste denkbare Pipeline: Sie besteht lediglich aus einem Input und einem Output – keine Zwischenschritte, keine Transformationen. Damit lässt sich das Grundprinzip von Apache Beam ideal verstehen: Daten hinein, Verarbeitungsschritt ausführen, Ergebnis hinaus.

In [None]:
with beam.Pipeline() as p:
        (
            p
            | "CreateURLs" >> beam.Create(movie_posters)
            | "PrintURL"   >> beam.Map(print)
        )

# DoFn (Do function): Bilder download
Apache Beam bringt bereits eine Reihe von vorgefertigten Pipeline-Bausteinen mit, die sofort einsatzbereit sind. Wir können jedoch auch eigene Bausteine erstellen – sogenannte DoFn (kurz für Do Function).
Ein DoFn ist die kleinste ausführbare Einheit in einer Pipeline – hier passiert die eigentliche Verarbeitung eines Elements.
Darüber hinaus gibt es PTransform-Objekte: Sie fassen mehrere Schritte oder DoFn-Aufrufe zu einer Art „Mini-Pipeline“ zusammen und können wiederverwendet werden, um komplexere Verarbeitungsschritte sauber zu strukturieren.

In [None]:
class DownloadImageDoFn(beam.DoFn):
    """Downloads an image from a URL and outputs its bytes."""

    def __init__(self, timeout: int = 10, max_bytes: int = 8_000_000):
        self.timeout = timeout
        self.max_bytes = max_bytes

    def process(self, url: str):
        try:
            resp = requests.get(url, timeout=self.timeout, stream=True)
            resp.raise_for_status()
            content = resp.content
            if len(content) > self.max_bytes:
                print(f"Skipping {url}: too large ({len(content)} bytes)")
                return
            yield content  # return the image as bytes
        except Exception as e:
            print(f"Error downloading {url}: {e}")

# Pipeline 2.0
Hier verwenden wir unser eigenes DoFn, um jedes Bild aus dem Internet herunterzuladen und in einen Bytestream umzuwandeln – also eine Folge von Rohdaten, die wir später weiterverarbeiten oder in eine Datei schreiben können.

In [None]:
with beam.Pipeline() as p:
    (
        p
        | "CreateURLs" >> beam.Create(movie_posters)
        | "DownloadImages" >> beam.ParDo(DownloadImageDoFn())
        | "PrintLength" >> beam.Map(lambda b: print(f"Image bytes: {len(b)}"))
    )

# Pipeline 2.1
Wir erweitern die Pipeline um einen weiteren Schritt, der alle Einzelergebnisse sammelt und zu einer gemeinsamen Liste kombiniert. Dadurch stehen uns alle heruntergeladenen Bilder gebündelt für die nächste Verarbeitungsetappe zur Verfügung.

In [None]:
    with beam.Pipeline() as p:
        (
            p
            | "CreateURLs" >> beam.Create(movie_posters)
            | "DownloadImages" >> beam.ParDo(DownloadImageDoFn())
            | "CollectAll" >> beam.combiners.ToList()    # wait for all downloads
            | "NextStep" >> beam.Map(lambda b: print(f"Anzahl Bilder: {len(b)}"))
        )

# Movie Poster Collage Erstellen
Hier kommt ein neuer Verarbeitungsschritt hinzu: Aus dem Array der Bild-Bytes wird mit Hilfe der Pillow-Bibliothek automatisch eine Collage erzeugt.
Die Details des Codes müsst ihr an dieser Stelle nicht verstehen – wichtig ist nur, dass hier alle zuvor geladenen Bilder grafisch zusammengeführt werden.

In [None]:
import io
import random
from PIL import Image
import apache_beam as beam

class CreateCollageDoFn(beam.DoFn):
    """Creates a random collage from a list of image bytes and outputs the final image as bytes."""

    def __init__(self, width=1024, height=768, bg_color=(255, 255, 255)):
        self.width = width
        self.height = height
        self.bg_color = bg_color

    def _open_image(self, data):
        try:
            im = Image.open(io.BytesIO(data)).convert("RGBA")
            return im
        except Exception:
            return None

    def _place_randomly(self, canvas, poster):
        # random scale
        target_w = random.randint(120, 300)
        ratio = target_w / poster.width
        new_h = max(1, int(poster.height * ratio))
        poster = poster.resize((target_w, new_h), resample=Image.LANCZOS)

        # random rotation
        angle = random.uniform(-20, 20)
        poster = poster.rotate(angle, expand=True, resample=Image.BICUBIC)

        # random position
        x = random.randint(0, max(0, self.width - poster.width))
        y = random.randint(0, max(0, self.height - poster.height))

        # paste with alpha
        canvas.alpha_composite(poster, (x, y))

    def process(self, images_bytes_list):
        # images_bytes_list ist eine Liste aller geladenen Bytes (vom ToList-Schritt)
        canvas = Image.new("RGBA", (self.width, self.height), self.bg_color)

        # zufällig mischen und maximal 20 Bilder verwenden
        random.shuffle(images_bytes_list)
        for data in images_bytes_list[:20]:
            im = self._open_image(data)
            if im:
                self._place_randomly(canvas, im)

        # final in Bytes konvertieren
        out_buf = io.BytesIO()
        canvas.convert("RGB").save(out_buf, format="PNG")
        out_buf.seek(0)
        yield out_buf.getvalue()

# Pipeline 3.0
Jetzt setzen wir alle Bausteine zusammen – vom Herunterladen der Bilder bis zur Collage-Erstellung. So entsteht eine vollständige Pipeline, die aus mehreren Verarbeitungsschritten ein fertiges Bild erzeugt.

In [None]:
#bg_color =(30, 31, 34)
bg_color = (0,0,0,0)

with beam.Pipeline() as p:
    (
        p
        | "CreateURLs" >> beam.Create(movie_posters)
        | "DownloadImages" >> beam.ParDo(DownloadImageDoFn())
        | "CollectAll" >> beam.combiners.ToList()
        | "CreateCollage" >> beam.ParDo(CreateCollageDoFn(1024, 768, bg_color))
        | "WriteCollage" >> beam.Map(lambda b: open("collage.png", "wb").write(b))
    )