<a href="https://colab.research.google.com/github/PrithiveenKumaarRamkumar/mlops_labs/blob/main/apache_beam/inventory_beam_executed.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🏠 Home Inventory Management System with Apache Beam

A powerful inventory management system built using Apache Beam for scalable data processing.

## Features:
- Chat-like command interface
- Add, remove, update, and query inventory items
- Category-based organization
- Persistent storage with JSON
- Apache Beam pipeline processing

## Step 1: Install Apache Beam

In [1]:
!pip install apache-beam --quiet
print("✓ Apache Beam installed successfully!")

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.1/17.1 MB[0m [31m97.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m68.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m97.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m

## Step 2: Import Required Libraries

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
from datetime import datetime
from typing import Dict, List, Tuple
import os

print("✓ All libraries imported successfully!")

✓ All libraries imported successfully!


## Step 3: Define Inventory Item Class

In [3]:
class InventoryItem:
    """Represents an inventory item"""
    def __init__(self, name: str, quantity: int, unit: str, category: str, last_updated: str = None):
        self.name = name.lower()
        self.quantity = quantity
        self.unit = unit
        self.category = category
        self.last_updated = last_updated or datetime.now().isoformat()

    def to_dict(self):
        return {
            'name': self.name,
            'quantity': self.quantity,
            'unit': self.unit,
            'category': self.category,
            'last_updated': self.last_updated
        }

    @staticmethod
    def from_dict(data):
        return InventoryItem(
            data['name'],
            data['quantity'],
            data['unit'],
            data['category'],
            data.get('last_updated')
        )

    def __repr__(self):
        return f"{self.name}: {self.quantity} {self.unit} ({self.category})"

print("✓ InventoryItem class defined!")

✓ InventoryItem class defined!


## Step 4: Define Apache Beam Transforms

In [4]:
class ParseCommand(beam.DoFn):
    """Parse chat commands into structured operations"""
    def process(self, command: str):
        command = command.strip().lower()
        parts = command.split()

        if len(parts) < 2:
            yield beam.pvalue.TaggedOutput('errors', f"Invalid command: {command}")
            return

        action = parts[0]

        if action == 'add':
            if len(parts) >= 5:
                item = parts[1]
                try:
                    quantity = int(parts[2])
                    unit = parts[3]
                    category = parts[4]
                    yield {
                        'action': 'add',
                        'item': InventoryItem(item, quantity, unit, category).to_dict()
                    }
                except ValueError:
                    yield beam.pvalue.TaggedOutput('errors', f"Invalid quantity: {parts[2]}")
            else:
                yield beam.pvalue.TaggedOutput('errors', "Format: add <item> <quantity> <unit> <category>")

        elif action == 'remove':
            if len(parts) >= 3:
                item = parts[1]
                try:
                    quantity = int(parts[2])
                    yield {
                        'action': 'remove',
                        'item': item,
                        'quantity': quantity
                    }
                except ValueError:
                    yield beam.pvalue.TaggedOutput('errors', f"Invalid quantity: {parts[2]}")
            else:
                yield beam.pvalue.TaggedOutput('errors', "Format: remove <item> <quantity>")

        elif action == 'update':
            if len(parts) >= 3:
                item = parts[1]
                try:
                    quantity = int(parts[2])
                    yield {
                        'action': 'update',
                        'item': item,
                        'quantity': quantity
                    }
                except ValueError:
                    yield beam.pvalue.TaggedOutput('errors', f"Invalid quantity: {parts[2]}")
            else:
                yield beam.pvalue.TaggedOutput('errors', "Format: update <item> <quantity>")

        elif action == 'query':
            if len(parts) >= 2:
                item = parts[1] if parts[1] != 'all' else 'all'
                yield {
                    'action': 'query',
                    'item': item
                }
            else:
                yield beam.pvalue.TaggedOutput('errors', "Format: query <item> or query all")

        else:
            yield beam.pvalue.TaggedOutput('errors', f"Unknown action: {action}")


class CombineInventory(beam.CombineFn):
    """Combine operations to maintain inventory state"""

    def create_accumulator(self):
        return {'inventory': {}, 'messages': []}

    def add_input(self, accumulator, operation):
        inventory = accumulator['inventory']
        messages = accumulator['messages']
        action = operation['action']

        if action == 'add':
            item_dict = operation['item']
            item_name = item_dict['name']

            if item_name in inventory:
                inventory[item_name]['quantity'] += item_dict['quantity']
                inventory[item_name]['last_updated'] = datetime.now().isoformat()
                messages.append(f"✓ Updated {item_name}: {inventory[item_name]['quantity']} {inventory[item_name]['unit']}")
            else:
                inventory[item_name] = item_dict
                messages.append(f"✓ Added {item_name}: {item_dict['quantity']} {item_dict['unit']} ({item_dict['category']})")

        elif action == 'remove':
            item_name = operation['item']
            quantity = operation['quantity']

            if item_name in inventory:
                inventory[item_name]['quantity'] -= quantity
                inventory[item_name]['last_updated'] = datetime.now().isoformat()

                if inventory[item_name]['quantity'] <= 0:
                    del inventory[item_name]
                    messages.append(f"✓ Removed {item_name} from inventory (quantity reached 0)")
                else:
                    messages.append(f"✓ Removed {quantity} from {item_name}. Remaining: {inventory[item_name]['quantity']} {inventory[item_name]['unit']}")
            else:
                messages.append(f"⚠ Item not found: {item_name}")

        elif action == 'update':
            item_name = operation['item']
            quantity = operation['quantity']

            if item_name in inventory:
                old_qty = inventory[item_name]['quantity']
                inventory[item_name]['quantity'] = quantity
                inventory[item_name]['last_updated'] = datetime.now().isoformat()
                messages.append(f"✓ Updated {item_name}: {old_qty} → {quantity} {inventory[item_name]['unit']}")
            else:
                messages.append(f"⚠ Item not found: {item_name}")

        elif action == 'query':
            item_name = operation['item']

            if item_name == 'all':
                if inventory:
                    msg = "\n" + "="*60 + "\nCURRENT INVENTORY\n" + "="*60 + "\n\n"
                    by_category = {}
                    for name, item in inventory.items():
                        cat = item['category']
                        if cat not in by_category:
                            by_category[cat] = []
                        by_category[cat].append((name, item))

                    for category in sorted(by_category.keys()):
                        msg += f"📦 {category.upper()}:\n"
                        for name, item in sorted(by_category[category]):
                            msg += f"   • {name}: {item['quantity']} {item['unit']}\n"
                        msg += "\n"

                    msg += f"Total items: {len(inventory)}\n" + "="*60
                    messages.append(msg)
                else:
                    messages.append("⚠ Inventory is empty")
            else:
                if item_name in inventory:
                    item = inventory[item_name]
                    messages.append(f"\n{item_name.upper()}:\n  Quantity: {item['quantity']} {item['unit']}\n  Category: {item['category']}\n  Last Updated: {item['last_updated']}")
                else:
                    messages.append(f"⚠ Item not found: {item_name}")

        return accumulator

    def merge_accumulators(self, accumulators):
        merged = self.create_accumulator()
        for acc in accumulators:
            merged['inventory'].update(acc['inventory'])
            merged['messages'].extend(acc['messages'])
        return merged

    def extract_output(self, accumulator):
        return accumulator


class FormatResults(beam.DoFn):
    """Format results for display"""
    def process(self, result):
        for message in result['messages']:
            yield message

        # Save inventory to JSON
        with open('inventory_beam.json', 'w') as f:
            json.dump(result['inventory'], f, indent=2)
        yield "\n💾 Inventory saved to inventory_beam.json"


print("✓ Apache Beam transforms defined!")

✓ Apache Beam transforms defined!


## Step 5: Define Pipeline Function

In [5]:
def run_inventory_pipeline(commands: List[str], verbose=True):
    """Run the Apache Beam inventory pipeline"""

    pipeline_options = PipelineOptions()

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Create command stream
        commands_pcoll = pipeline | 'Create Commands' >> beam.Create(commands)

        # Parse commands
        parsed = commands_pcoll | 'Parse Commands' >> beam.ParDo(ParseCommand()).with_outputs('errors', main='operations')

        # Combine all operations into final inventory state
        inventory_state = (
            parsed.operations
            | 'Add Dummy Key' >> beam.Map(lambda x: (1, x))
            | 'Group By Key' >> beam.GroupByKey()
            | 'Combine Operations' >> beam.Map(lambda x: list(x[1]))
            | 'Process All Operations' >> beam.Map(
                lambda ops: CombineInventory().create_accumulator()
                if not ops else
                CombineInventory().extract_output(
                    [CombineInventory().add_input(
                        CombineInventory().create_accumulator() if i == 0
                        else acc, op
                    ) for i, op in enumerate(ops)][len(ops)-1]
                    if ops else CombineInventory().create_accumulator()
                )
            )
        )

        # Format and display results
        results = inventory_state | 'Format Results' >> beam.ParDo(FormatResults())

        # Collect results
        if verbose:
            results | 'Print Results' >> beam.Map(print)

        # Handle errors
        parsed.errors | 'Print Errors' >> beam.Map(lambda x: print(f"❌ {x}"))

    print("\n✓ Pipeline execution completed!")


# Simpler sequential processing version
def run_inventory_pipeline_simple(commands: List[str]):
    """Simplified pipeline with sequential processing"""

    accumulator = CombineInventory().create_accumulator()
    errors = []

    print("\n🚀 Processing commands...\n")
    print("="*60)

    for cmd in commands:
        # Parse command
        parser = ParseCommand()
        parsed_ops = list(parser.process(cmd))

        for parsed in parsed_ops:
            if isinstance(parsed, beam.pvalue.TaggedOutput):
                errors.append(parsed.value)
                print(f"❌ {parsed.value}")
            else:
                # Process operation
                accumulator = CombineInventory().add_input(accumulator, parsed)

    print("="*60)

    # Display results
    print("\n📋 Results:\n")
    for message in accumulator['messages']:
        print(message)

    # Save to file
    with open('inventory_beam.json', 'w') as f:
        json.dump(accumulator['inventory'], f, indent=2)
    print("\n💾 Inventory saved to inventory_beam.json")

    return accumulator['inventory']

print("✓ Pipeline functions defined!")

✓ Pipeline functions defined!


## Step 6: Example Usage - Basic Commands

In [6]:
# Example commands
basic_commands = [
    'add milk 2 liters dairy',
    'add eggs 12 count protein',
    'add bread 1 loaf grains',
    'add chicken 500 grams protein',
    'query all'
]

print("📝 Running basic inventory commands...\n")
inventory = run_inventory_pipeline_simple(basic_commands)

📝 Running basic inventory commands...


🚀 Processing commands...


📋 Results:

✓ Added milk: 2 liters (dairy)
✓ Added eggs: 12 count (protein)
✓ Added bread: 1 loaf (grains)
✓ Added chicken: 500 grams (protein)

CURRENT INVENTORY

📦 DAIRY:
   • milk: 2 liters

📦 GRAINS:
   • bread: 1 loaf

📦 PROTEIN:
   • chicken: 500 grams
   • eggs: 12 count

Total items: 4

💾 Inventory saved to inventory_beam.json


## Step 7: Example Usage - Update and Remove Operations

In [7]:
# More commands with updates and removals
update_commands = [
    'add milk 2 liters dairy',
    'add eggs 12 count protein',
    'remove milk 1',
    'update eggs 6',
    'add rice 5 kg grains',
    'query all'
]

print("📝 Running update and remove operations...\n")
inventory = run_inventory_pipeline_simple(update_commands)

📝 Running update and remove operations...


🚀 Processing commands...


📋 Results:

✓ Added milk: 2 liters (dairy)
✓ Added eggs: 12 count (protein)
✓ Removed 1 from milk. Remaining: 1 liters
✓ Updated eggs: 12 → 6 count
✓ Added rice: 5 kg (grains)

CURRENT INVENTORY

📦 DAIRY:
   • milk: 1 liters

📦 GRAINS:
   • rice: 5 kg

📦 PROTEIN:
   • eggs: 6 count

Total items: 3

💾 Inventory saved to inventory_beam.json


## Step 8: Interactive Command Interface

In [8]:
# Interactive mode
def interactive_inventory():
    """
    Interactive inventory management
    """
    print("\n" + "="*60)
    print("🏠 INTERACTIVE INVENTORY MANAGEMENT")
    print("="*60)
    print("\nAvailable commands:")
    print("  • add <item> <quantity> <unit> <category>")
    print("  • remove <item> <quantity>")
    print("  • update <item> <quantity>")
    print("  • query <item> or query all")
    print("  • done - Process all commands")
    print("\nExamples:")
    print("  add apples 5 count fruits")
    print("  remove apples 2")
    print("  query all")
    print("="*60 + "\n")

    commands = []

    while True:
        cmd = input("💬 Enter command (or 'done' to process): ").strip()

        if cmd.lower() == 'done':
            if commands:
                print(f"\n🔄 Processing {len(commands)} commands...\n")
                run_inventory_pipeline_simple(commands)
                commands = []
            else:
                print("⚠ No commands to process!")
            break
        elif cmd:
            commands.append(cmd)
            print(f"✓ Command queued ({len(commands)} total)")

# Uncomment the line below to run interactive mode
# interactive_inventory()

## Step 9: View Saved Inventory

In [9]:
# Load and display current inventory
def display_inventory():
    if os.path.exists('inventory_beam.json'):
        with open('inventory_beam.json', 'r') as f:
            inventory = json.load(f)

        if inventory:
            print("\n" + "="*60)
            print("📦 CURRENT INVENTORY FROM FILE")
            print("="*60 + "\n")

            by_category = {}
            for name, item in inventory.items():
                cat = item['category']
                if cat not in by_category:
                    by_category[cat] = []
                by_category[cat].append((name, item))

            for category in sorted(by_category.keys()):
                print(f"📦 {category.upper()}:")
                for name, item in sorted(by_category[category]):
                    print(f"   • {name}: {item['quantity']} {item['unit']}")
                print()

            print(f"Total items: {len(inventory)}")
            print("="*60)
        else:
            print("⚠ Inventory is empty")
    else:
        print("⚠ No inventory file found. Run some commands first!")

display_inventory()


📦 CURRENT INVENTORY FROM FILE

📦 DAIRY:
   • milk: 1 liters

📦 GRAINS:
   • rice: 5 kg

📦 PROTEIN:
   • eggs: 6 count

Total items: 3


## Step 10: Custom Commands - Your Turn!

In [10]:
# Add your custom commands here
my_commands = [
    'add tomatoes 4 count vegetables',
    'add cheese 250 grams dairy',
    'add pasta 500 grams grains',
    'query all'
]

print("📝 Running your custom commands...\n")
inventory = run_inventory_pipeline_simple(my_commands)

📝 Running your custom commands...


🚀 Processing commands...


📋 Results:

✓ Added tomatoes: 4 count (vegetables)
✓ Added cheese: 250 grams (dairy)
✓ Added pasta: 500 grams (grains)

CURRENT INVENTORY

📦 DAIRY:
   • cheese: 250 grams

📦 GRAINS:
   • pasta: 500 grams

📦 VEGETABLES:
   • tomatoes: 4 count

Total items: 3

💾 Inventory saved to inventory_beam.json
