# Apache Beam Schemas

In [None]:
! pip install apache_beam apache-beam[gcp] --quiet

In [None]:
import IPython
from IPython.display import display

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

# Create the Pet schema and some test data

In [None]:
import apache_beam as beam
from typing import NamedTuple

class Pet(NamedTuple):
    name: str
    pet_type: str
    breed: str

# Creating 20 pets
pets = [
    Pet(name="Buddy", pet_type="Dog", breed="Golden Retriever"),
    Pet(name="Mittens", pet_type="Cat", breed="Siamese"),
    Pet(name="Max", pet_type="Dog", breed="Beagle"),
    Pet(name="Bella", pet_type="Dog", breed="Labrador"),
    Pet(name="Charlie", pet_type="Dog", breed="Poodle"),
    Pet(name="Lucy", pet_type="Cat", breed="Persian"),
    Pet(name="Daisy", pet_type="Dog", breed="Bulldog"),
    Pet(name="Luna", pet_type="Cat", breed="Maine Coon"),
    Pet(name="Rocky", pet_type="Dog", breed="Rottweiler"),
    Pet(name="Lola", pet_type="Cat", breed="Bengal"),
    Pet(name="Jack", pet_type="Dog", breed="Boxer"),
    Pet(name="Nala", pet_type="Cat", breed="Ragdoll"),
    Pet(name="Zeus", pet_type="Dog", breed="German Shepherd"),
    Pet(name="Chloe", pet_type="Cat", breed="British Shorthair"),
    Pet(name="Buster", pet_type="Dog", breed="Dachshund"),
    Pet(name="Simba", pet_type="Cat", breed="Sphynx"),
    Pet(name="Cooper", pet_type="Dog", breed="Cocker Spaniel"),
    Pet(name="Sasha", pet_type="Cat", breed="Scottish Fold"),
    Pet(name="Milo", pet_type="Dog", breed="Shih Tzu"),
    Pet(name="Oreo", pet_type="Cat", breed="Abyssinian")
]

# Show the first 5 pets.
print(pets[:5])

## Simple Pipeline

Create the pets and print them

In [None]:
# Beam pipeline
with beam.Pipeline() as p:
    pet_collection = (
        p
        | 'Create pets' >> beam.Create(pets)
        | 'Print pets' >> beam.Map(print)
    )


## Use Filter() to return only the dogs

In [None]:
# Beam pipeline
with beam.Pipeline() as p:
    pet_collection = (
        p
        | 'Create pets' >> beam.Create(pets)
        | 'Get only the Dogs' >> beam.Filter(lambda pet: pet.pet_type == 'Dog')
        | 'Print pets' >> beam.Map(print)
    )

## Filter with DoFn

In [None]:
class OnlyCats(beam.DoFn):
    def process(self, pet):
        if pet.pet_type == 'Cat':
            yield pet


# Beam pipeline
with beam.Pipeline() as p:
    pet_collection = (
        p
        | 'Create pets' >> beam.Create(pets)
        | 'Get only the Dogs' >> beam.ParDo(OnlyCats())
        | 'Print pets' >> beam.Map(print)
    )

## Filter with FlatMap()



In [None]:
def NoCatsFilter(pet):
  if pet.pet_type == 'Dog':
    return pet


# Beam pipeline
with beam.Pipeline() as p:
    pet_collection = (
        p
        | 'Create pets' >> beam.Create(pets)
        | 'Get only the Dogs' >> beam.FlatMap(NoCatsFilter)
        | 'Print pets' >> beam.Map(print)
    )

# Branch the pets to return dogs and cats.

In [None]:
class FilterPetsDoFn(beam.DoFn):
    def process(self, pet):
        if pet.pet_type == 'Dog':
            yield beam.pvalue.TaggedOutput('dogs', pet)
        elif pet.pet_type == 'Cat':
            yield beam.pvalue.TaggedOutput('cats', pet)


with beam.Pipeline() as p:
    pet_collection = p | 'Create pets' >> beam.Create(pets)

    filtered_pets = (
        pet_collection
        | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')
    )

    dogs = filtered_pets.dogs | 'Print dogs' >> beam.Map(print)
    # cats = filtered_pets.cats | 'Print cats' >> beam.Map(print)

## Output the dogs and cats into seperate CSV files

In [None]:
class FilterPetsDoFn(beam.DoFn):
    def process(self, pet):
        if pet.pet_type == 'Dog':
            yield beam.pvalue.TaggedOutput('dogs', pet)
        elif pet.pet_type == 'Cat':
            yield beam.pvalue.TaggedOutput('cats', pet)

def format_csv(pet):
    return f'{pet.name},{pet.pet_type},{pet.breed}'

# Beam pipeline
with beam.Pipeline() as p:
    pet_collection = p | 'Create pets' >> beam.Create(pets)

    filtered_pets = (
        pet_collection
        | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')
    )

    dog_collection = filtered_pets.dogs | 'Format dogs to CSV' >> beam.Map(format_csv)
    cat_collection = filtered_pets.cats | 'Format cats to CSV' >> beam.Map(format_csv)

    dog_collection | 'Write dogs to CSV' >> beam.io.WriteToText('./temp_files/only_dogs', file_name_suffix='.csv')
    cat_collection | 'Write cats to CSV' >> beam.io.WriteToText('./temp_files/only_cats',  file_name_suffix='.csv')




! cat ./temp_files/only_dogs-00000-of-00001.csv
! cat ./temp_files/only_cats-00000-of-00001.csv

# Nested data - Owners and Pets

In [None]:
import random
from ctypes import Array
import apache_beam as beam
from typing import NamedTuple, List

class Pet(NamedTuple):
    name: str
    pet_type: str
    breed: str

class Owner(NamedTuple):
    name: str
    pets: List[Pet]

# Sample pet data
pet_names = ["Buddy", "Mittens", "Max", "Bella", "Charlie", "Lucy", "Daisy", "Luna", "Rocky", "Lola", "Jack", "Nala", "Zeus", "Chloe", "Buster", "Simba", "Cooper", "Sasha", "Milo", "Oreo"]
pet_types = ["Dog", "Cat"]
dog_breeds = ["Golden Retriever", "Beagle", "Labrador", "Poodle", "Bulldog", "Rottweiler", "Boxer", "German Shepherd", "Dachshund", "Cocker Spaniel", "Shih Tzu"]
cat_breeds = ["Siamese", "Persian", "Maine Coon", "Bengal", "Ragdoll", "British Shorthair", "Sphynx", "Scottish Fold", "Abyssinian"]

def random_pet() -> Pet:
    pet_type = random.choice(pet_types)
    breed = random.choice(dog_breeds if pet_type == "Dog" else cat_breeds)
    name = random.choice(pet_names)
    return Pet(name=name, pet_type=pet_type, breed=breed)

def random_pets() -> List[Pet]:
    return [random_pet() for _ in range(random.randint(1, 2))]

# Sample owner data
owner_names = ["Alice", "Bob", "Charlie", "Diana", "Doug", "Edward", "Fiona", "George", "Hannah", "Ivan", "Julia", "Michael", "Patrick"]

owners = [Owner(name=name, pets=random_pets()) for name in owner_names]

# Just show one Owner
print(owners[0])

In [None]:
class PrintOwnerPets(beam.DoFn):
    def process(self, owner):
        print(f'Owner: {owner.name}')
        for pet in owner.pets:
            print(f'  Pet: {pet.name}, Type: {pet.pet_type}, Breed: {pet.breed}')

# Beam pipeline
with beam.Pipeline() as p:
    owner_collection = p | 'Create owners' >> beam.Create(owners)
    owner_collection | 'Print owners and pets' >> beam.ParDo(PrintOwnerPets())

## Number of Pets by Owner

In [None]:
from typing import NamedTuple, List, Tuple

class CountPetsDoFn(beam.DoFn):
    def process(self, owner) -> Tuple[str, int]:
        num_pets = len(owner.pets)
        yield (owner.name, num_pets)


with beam.Pipeline() as p:
    owner_collection = p | 'Create owners' >> beam.Create(owners)

    pet_counts = (
        owner_collection
        | 'Count pets' >> beam.ParDo(CountPetsDoFn())
    )

    pet_counts | 'Print pet counts' >> beam.Map(print)