In [9]:
!pip install -q pipecat-ai transformers torch accelerate numpy

import asyncio
import logging
from typing import AsyncGenerator
import numpy as np

print("🔍 Checking available Pipecat frames...")

try:
    from pipecat.frames.frames import (
        Frame,
        TextFrame,
    )
    print("✅ Basic frames imported successfully")
except ImportError as e:
    print(f"⚠️  Import error: {e}")
    from pipecat.frames.frames import Frame, TextFrame

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

from transformers import pipeline as hf_pipeline
import torch

🔍 Checking available Pipecat frames...
✅ Basic frames imported successfully


In [10]:
class SimpleChatProcessor(FrameProcessor):
    """Simple conversational AI processor using HuggingFace"""
    def __init__(self):
        super().__init__()
        print("🔄 Loading HuggingFace text generation model...")
        self.chatbot = hf_pipeline(
            "text-generation",
            model="microsoft/DialoGPT-small",
            pad_token_id=50256,
            do_sample=True,
            temperature=0.8,
            max_length=100
        )
        self.conversation_history = ""
        print("✅ Chat model loaded successfully!")

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if isinstance(frame, TextFrame):
            user_text = getattr(frame, "text", "").strip()
            if user_text and not user_text.startswith("AI:"):
                print(f"👤 USER: {user_text}")
                try:
                    if self.conversation_history:
                        input_text = f"{self.conversation_history} User: {user_text} Bot:"
                    else:
                        input_text = f"User: {user_text} Bot:"

                    response = self.chatbot(
                        input_text,
                        max_new_tokens=50,
                        num_return_sequences=1,
                        temperature=0.7,
                        do_sample=True,
                        pad_token_id=self.chatbot.tokenizer.eos_token_id
                    )

                    generated_text = response[0]["generated_text"]
                    if "Bot:" in generated_text:
                        ai_response = generated_text.split("Bot:")[-1].strip()
                        ai_response = ai_response.split("User:")[0].strip()
                        if not ai_response:
                            ai_response = "That's interesting! Tell me more."
                    else:
                        ai_response = "I'd love to hear more about that!"

                    self.conversation_history = f"{input_text} {ai_response}"
                    await self.push_frame(TextFrame(text=f"AI: {ai_response}"), direction)
                except Exception as e:
                    print(f"⚠️  Chat error: {e}")
                    await self.push_frame(
                        TextFrame(text="AI: I'm having trouble processing that. Could you try rephrasing?"),
                        direction
                    )
        else:
            await self.push_frame(frame, direction)

In [11]:
class TextDisplayProcessor(FrameProcessor):
    """Displays text frames in a conversational format"""
    def __init__(self):
        super().__init__()
        self.conversation_count = 0

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if isinstance(frame, TextFrame):
            text = getattr(frame, "text", "")
            if text.startswith("AI:"):
                print(f"🤖 {text}")
                self.conversation_count += 1
                print(f"    💭 Exchange {self.conversation_count} complete\n")
        await self.push_frame(frame, direction)


class ConversationInputGenerator:
    """Generates demo conversation inputs"""
    def __init__(self):
        self.demo_conversations = [
            "Hello! How are you doing today?",
            "What's your favorite thing to talk about?",
            "Can you tell me something interesting about AI?",
            "What makes conversation enjoyable for you?",
            "Thanks for the great chat!"
        ]

    async def generate_conversation(self) -> AsyncGenerator[TextFrame, None]:
        print("🎭 Starting conversation simulation...\n")
        for i, user_input in enumerate(self.demo_conversations):
            yield TextFrame(text=user_input)
            if i < len(self.demo_conversations) - 1:
                await asyncio.sleep(2)

In [12]:
class SimpleAIAgent:
    """Simple conversational AI agent using Pipecat"""
    def __init__(self):
        self.chat_processor = SimpleChatProcessor()
        self.display_processor = TextDisplayProcessor()
        self.input_generator = ConversationInputGenerator()

    def create_pipeline(self) -> Pipeline:
        return Pipeline([self.chat_processor, self.display_processor])

    async def run_demo(self):
        print("🚀 Simple Pipecat AI Agent Demo")
        print("🎯 Conversational AI with HuggingFace")
        print("=" * 50)

        pipeline = self.create_pipeline()
        runner = PipelineRunner()
        task = PipelineTask(pipeline)

        async def produce_frames():
            async for frame in self.input_generator.generate_conversation():
                await task.queue_frame(frame)
            await task.stop_when_done()

        try:
            print("🎬 Running conversation demo...\n")
            await asyncio.gather(
                runner.run(task),
                produce_frames(),
            )
        except Exception as e:
            print(f"❌ Demo error: {e}")
            logging.error(f"Pipeline error: {e}")

        print("✅ Demo completed successfully!")

In [13]:
async def main():
    logging.basicConfig(level=logging.INFO)
    print("🎯 Pipecat AI Agent Tutorial")
    print("📱 Google Colab Compatible")
    print("🤖 Free HuggingFace Models")
    print("🔧 Simple & Working Implementation")
    print("=" * 60)
    try:
        agent = SimpleAIAgent()
        await agent.run_demo()
        print("\n🎉 Tutorial Complete!")
        print("\n📚 What You Just Saw:")
        print("✓ Pipecat pipeline architecture in action")
        print("✓ Custom FrameProcessor implementations")
        print("✓ HuggingFace conversational AI integration")
        print("✓ Real-time text processing pipeline")
        print("✓ Modular, extensible design")
        print("\n🚀 Next Steps:")
        print("• Add real speech-to-text input")
        print("• Integrate text-to-speech output")
        print("• Connect to better language models")
        print("• Add memory and context management")
        print("• Deploy as a web service")
    except Exception as e:
        print(f"❌ Tutorial failed: {e}")
        import traceback
        traceback.print_exc()


try:
    import google.colab
    print("🌐 Google Colab detected - Ready to run!")
    ENV = "colab"
except ImportError:
    print("💻 Local environment detected")
    ENV = "local"

print("\n" + "="*60)
print("🎬 READY TO RUN!")
print("Execute this cell to start the AI conversation demo")
print("="*60)

print("\n🚀 Starting the AI Agent Demo...")

await main()

🌐 Google Colab detected - Ready to run!

🎬 READY TO RUN!
Execute this cell to start the AI conversation demo

🚀 Starting the AI Agent Demo...
🎯 Pipecat AI Agent Tutorial
📱 Google Colab Compatible
🤖 Free HuggingFace Models
🔧 Simple & Working Implementation
🔄 Loading HuggingFace text generation model...


Device set to use cpu
[32m2025-08-13 08:34:31.372[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m505[0m - [34m[1mLinking PipelineSource#3 -> SimpleChatProcessor#4[0m
[32m2025-08-13 08:34:31.375[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m505[0m - [34m[1mLinking SimpleChatProcessor#4 -> TextDisplayProcessor#3[0m
[32m2025-08-13 08:34:31.377[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m505[0m - [34m[1mLinking TextDisplayProcessor#3 -> PipelineSink#3[0m
[32m2025-08-13 08:34:31.381[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m505[0m - [34m[1mLinking PipelineTaskSource#2 -> Pipeline#3[0m
[32m2025-08-13 08:34:31.382[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m505[0m - [34m[1mLinking Pipeline#3 -> PipelineTaskSink#2[0m
[32m2025-08-1

✅ Chat model loaded successfully!
🚀 Simple Pipecat AI Agent Demo
🎯 Conversational AI with HuggingFace
🎬 Running conversation demo...

🎭 Starting conversation simulation...

👤 USER: Hello! How are you doing today?
🤖 AI: I don't know.
    💭 Exchange 1 complete

👤 USER: What's your favorite thing to talk about?
🤖 AI: I love you.
    💭 Exchange 2 complete

👤 USER: Can you tell me something interesting about AI?
🤖 AI: How are you doing today?
    💭 Exchange 3 complete

👤 USER: What makes conversation enjoyable for you?


[32m2025-08-13 08:36:09.801[0m | [34m[1mDEBUG   [0m | [36mpipecat.pipeline.task[0m:[36mstop_when_done[0m:[36m423[0m - [34m[1mTask PipelineTask#2 scheduled to stop when done[0m


🤖 AI: It's fun!
    💭 Exchange 4 complete

👤 USER: Thanks for the great chat!


[32m2025-08-13 08:36:51.737[0m | [34m[1mDEBUG   [0m | [36mpipecat.pipeline.runner[0m:[36mrun[0m:[36m88[0m - [34m[1mRunner PipelineRunner#3 finished running PipelineTask#2[0m


🤖 AI: It was a pleasure!
    💭 Exchange 5 complete

✅ Demo completed successfully!

🎉 Tutorial Complete!

📚 What You Just Saw:
✓ Pipecat pipeline architecture in action
✓ Custom FrameProcessor implementations
✓ HuggingFace conversational AI integration
✓ Real-time text processing pipeline
✓ Modular, extensible design

🚀 Next Steps:
• Add real speech-to-text input
• Integrate text-to-speech output
• Connect to better language models
• Add memory and context management
• Deploy as a web service
