# Module 1.1: Pipecat Core Concepts & Your First Pipeline

In this module, we introduce how to stream responses from a large language model (LLM) using NVIDIA Pipecat. The goal is to build foundational familiarity with Pipecat and its extension, `nvidia-pipecat`, which simplifies integration with NVIDIA's multimodal and avatar pipelines.

This notebook focuses on the text-only interaction path using the `NvidiaLLMService`, which wraps access to NIM chat-capable models such as LLaMA 3.3. In later modules, this will be extended into speech-to-speech pipelines, real-time animation, and digital human interfaces.

## What is Pipecat?

Pipecat is an asynchronous dataflow framework for building real-time pipelines. It lets developers define and connect streaming components like ASR, LLMs, TTS, and more using a modular and testable architecture.

Key Pipecat concepts include:

- **Processors**: building blocks that transform, generate, or route data.
- **Frames**: units of data (e.g., text, audio) passed between processors.
- **Pipelines**: ordered sequences of processors.
- **PipelineTasks**: runnable instances of pipelines with per-session metadata.
- **PipelineRunner**: orchestrates execution of multiple concurrent tasks.

`nvidia-pipecat` extends Pipecat with services and helpers for working with NVIDIA-hosted endpoints (e.g., NIMs), transcript synchronization, avatar controllers, and multi-modal transport layers (e.g., WebRTC or ACE Transport).

## Objective of This Notebook

You will:
- Understand Pipecat's core architectural principles: real-time streaming, frames, processors, and pipelines, based on the official Pipecat Core Concepts.
- Implement a basic `FrameProcessor`.
- Construct, run, and push data through a simple text-based Pipecat `Pipeline`.
- Define an LLM context and stream outputs from the `NvidiaLLMService` using `nvidia-pipecat`.
- Build a minimal streaming chatbot loop that maintains conversation history.

This sets the stage for integrating text responses with downstream services such as TTS and animation in future modules.


# Module 1.1: Pipecat Core Concepts & Your First Pipeline

Welcome to your hands-on introduction to **Pipecat**! In Module 1.0, we discussed the high-level architecture of Digital Humans. Now, we'll explore the fundamental building blocks of Pipecat, the open-source Python framework designed for real-time, streaming AI applications. This notebook will guide you through Pipecat's core concepts and help you build your very first pipeline.

Understanding these base Pipecat mechanics is essential before we move on to using `nvidia-pipecat` to integrate powerful NVIDIA AI technologies.

## Prerequisites
- Python environment set up as per `0-0-Environment-Setup-Guide.md`.
- Jupyter kernel `nv-pipecat-env` selected.

Prior to getting started, you will need an NVIDIA API Key from the NVIDIA API Catalog to access the models used in this notebook.  

Need an API Key? It's Free!
  1. Navigate to **[NVIDIA API Catalog](https://build.nvidia.com/explore/discover)**.
  2. Select any model, such as `llama-3.3-70b-instruct`.
  3. On the right panel above the sample code snippet, click on "Get API Key". This will prompt you to log in if you have not already.

In [1]:
import asyncio
import nest_asyncio
import os
import getpass
from dotenv import load_dotenv

nest_asyncio.apply() # For running asyncio in Jupyter

load_dotenv() # Load environment variables from a .env file if available

# Try to get the API key from the environment
api_key = os.getenv("NVIDIA_API_KEY")

# Prompt if not set or invalid
if not api_key or not api_key.startswith("nvapi-"):
    print("NVIDIA API key not found or invalid.")
    api_key = getpass.getpass("🔐 Enter your NVIDIA API key: ").strip()
    if not api_key.startswith("nvapi-"):
        raise ValueError(f"{api_key[:5]}... is not a valid NVIDIA API key")
    # Set in environment for the current session
    os.environ["NVIDIA_API_KEY"] = api_key

## Pipecat Core Concepts: Frames, Processors, and Pipelines

Pipecat uses a pipeline-based architecture to handle real-time AI processing. Instead of waiting for complete responses at each step, Pipecat processes data in small units called **frames** that flow through the pipeline. This streaming approach is key to creating natural, responsive interactions.

*(The following explanations are adapted from the official Pipecat Core Concepts guide).*

### Real-time Processing in Action
Consider a voice assistant:
- Speech is transcribed in real-time as the user speaks.
- Transcription is sent to an LLM as it becomes available.
- LLM responses are processed as they stream in.
- Text-to-speech begins generating audio for early sentences while later ones are still being generated.
This creates a fluid, low-latency experience.

### Architecture Overview
Pipecat organizes these processes using three key components:

### 1. Frames: The Data Containers
Frames are chunks of data moving through your application. Examples include:

- Audio data from a microphone
- Text from transcription
- LLM responses
- Generated speech audio
- Images or video
- Control signals (e.g., `EndFrame`, `UserStartedSpeakingFrame`)

Frames can flow **downstream** (normal processing) or **upstream** (for errors and control signals).

**Key Frame Types (from `pipecat.frames.frames` and `nvidia_pipecat.frames.riva` we'll use initially):**

| Frame Type         | Description                                                  |
|--------------------|--------------------------------------------------------------|
| `TextFrame`        | A string of text.                                            |
| `TranscriptionFrame`| Contains speech-to-text results                             |
| `TTSAudioRawFrame`  | Contains audio data from text-to-speech (bytes).            |
| `ActionFrame`  | Represents actions to be performed by an agent.            |
| `EndFrame`         | Control signal indicating the end of a stream or processing. |
*(We'll introduce more specialized frames as needed.)*

### Initial Imports
Let's import the necessary Pipecat components.

In [2]:
import asyncio
from pipecat.frames.frames import Frame, TextFrame, EndFrame, StartFrame
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask, PipelineParams, FrameDirection
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameProcessor

### 2. Processors (Services)
`Frame processors` are the building blocks of a pipeline. They are responsible for transforming or responding to the data (or Frames) flowing through the system. They can perform a variety of tasks, such as:
	•	Simple operations like joining partial transcripts into full sentences
	•	Invoking AI services, such as an LLM that generates a response from a message array
	•	Converting text into other modalities, such as audio or images

If a frame processor encounters a frame it doesn’t need to handle, it can pass the frame along the pipeline unchanged. This allows downstream processors to operate on it and helps maintain a smooth, uninterrupted flow through the pipeline.

Implementing a single processor, like the `SentenceAggregator` looks like:

```python
from pipecat.processors.aggregators.sentence import SentenceAggregator  # Example of a processor 
  
# This processor aggregates text frames into a complete sentence: "Hello" "World" -> "Hello World"
aggregator = SentenceAggregator()
```

You can use predefined Processors, or create a custom one to fit your logic.

### 3. Pipelines - The Assembly Line
We can connect multiple processors together within a `Pipeline`. Pipelines connect multiple processors together, creating a path for frames to flow. Pipecat supports simple linear pipelines as well as more complex parallel ones (though we'll start with linear).

```python
# Simple linear pipeline structure
pipeline = Pipeline([
    Processor1(), # ex: aggregator
    Processor2(),
    # ... and so on
])
```
The pipeline often includes a **transport** at the beginning and/or end, which is the connection to the real world (microphone, speakers, WebSocket).

## Running a Pipecat Pipeline: Tasks and Runners
"tasks" and "runners" are core components of the pipeline architecture that manage the execution of AI communication pipelines.

1.  **`Pipeline`**: An instance of your defined pipeline (list of processors).
2.  **`PipelineTask`**: Connects to a pipeline using source and sink processors, which allow it to push frames into the pipeline and receive frames from it.
3.  **`PipelineRunner`**: Manages the execution of one or more `PipelineTask` instances within an `asyncio` event loop.

Let's create and run our first pipeline!

## Building a Simple Pipeline

We'll construct a pipeline: `String input -> Sentence Aggregator -> Output`.
We'll manually push `TextFrame`s to demonstrate the flow.

Let's piece everything together and create a simple pipeline that aggregates text:

### Setting Up the Pipeline - Create an Observer
First, we need a way to *see* the aggregated sentence once the Pipeline task is completed. Here, we define a simple `Observer` that listens for frames as they are pushed between processors. This prints out the aggregated text after they've been processed.

In [3]:

class FramePrinter(BaseObserver):
    async def on_push_frame(self, src: FrameProcessor, dst: FrameProcessor, frame: Frame, direction: FrameDirection, timestamp: int):
        
        # Only print frames coming from the SentenceAggregator
        if isinstance(frame, TextFrame) and isinstance(src, SentenceAggregator):
            
            print(f"Aggregated sentence: {frame.text}")
            

### Step 3: Define the Pipeline Task

Now we build a simple pipeline using a `SentenceAggregator`, which joins text fragments into full sentences. We attach our `FramePrinter` as an observer so we can see the output.


In [4]:
async def test_simple_pipeline():
    
    aggregator = SentenceAggregator() # Create a sentence aggregator processor

    
    pipeline = Pipeline([aggregator]) # Create a pipeline with our aggregator processor

    # Create a pipeline task with our Observer
    task = PipelineTask(
        pipeline,
        params=PipelineParams(observers=[FramePrinter()])
    )
    
    runner = PipelineRunner() # We use PipelineRunner to execute the pipeline.

    run_task = asyncio.create_task(runner.run(task)) # Start the pipeline by running it in the background

    await asyncio.sleep(0.1) # Give a little time for the pipeline to initialize

    # Queue a few text frames to simulate user input, and end the stream with an EndFrame.
    await task.queue_frame(TextFrame("Hello, "))
    await task.queue_frame(TextFrame("world!"))

    # End the pipeline
    await task.queue_frame(EndFrame())

    # Wait for the pipeline to complete. The result is printed by the Observer.
    await run_task
    print("Pipeline execution completed")


### Execute the Pipeline

We now run the function that initializes and runs our sample pipeline.


In [5]:
# Run our test pipeline
await test_simple_pipeline()

[32m2025-05-14 14:14:33.153[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m177[0m - [34m[1mLinking PipelineSource#0 -> SentenceAggregator#0[0m
[32m2025-05-14 14:14:33.154[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m177[0m - [34m[1mLinking SentenceAggregator#0 -> PipelineSink#0[0m
[32m2025-05-14 14:14:33.154[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m177[0m - [34m[1mLinking PipelineTaskSource#0 -> Pipeline#0[0m
[32m2025-05-14 14:14:33.154[0m | [34m[1mDEBUG   [0m | [36mpipecat.processors.frame_processor[0m:[36mlink[0m:[36m177[0m - [34m[1mLinking Pipeline#0 -> PipelineTaskSink#0[0m
[32m2025-05-14 14:14:33.154[0m | [34m[1mDEBUG   [0m | [36mpipecat.pipeline.runner[0m:[36mrun[0m:[36m39[0m - [34m[1mRunner PipelineRunner#0 started running PipelineTask#0[0m
[32m2025-05-14 14:14:33.257[0m | [34m[1mDEBUG   

Aggregated sentence: Hello, world!
Pipeline execution completed


Lets visualize the Data Flow:  
![viz](../../../docs/images/aggregator-nvidia-pipecat-flow.png)

| Component                    | Purpose                                               |
|-----------------------------|--------------------------------------------------------|
| `TextFrame("Hello, ")`      | Raw text input to the pipeline (a frame)                         |
| `TextFrame("world!")`       | Another input frame                                 |
| `SentenceAggregator`        | Processor that joins fragments into one sentence                      |
| `TextFrame("Hello, world!")`| The processed output frame                             |
| `FramePrinter` (observer)   | Monitors the output and logs it to the console         |
| Console Output              | Where you see the final result                         |

This pattern is fundamental in Pipecat: data flows through processors, and observers can monitor the flow at any point without interfering with it. You’ll see this same logic apply in more advanced pipelines that include ASR, TTS, LLMs, and avatar animation.

We strongly recommend getting comfortable with pipecat first, leveraging their documentation and example pipelines to get started.


## Next: What is ACE Controller and `nvidia-pipecat`?

NVIDIA-pipecat is an extension of the open-source Pipecat framework that adds NVIDIA-specific capabilities for creating real-time, multimodal conversational AI agents with a focus on avatar interactions. It's part of the ACE Controller SDK, which allows developers to build services that manage multimodal, real-time interactions with voice bots and avatars using NVIDIA ACE.

`nvidia-pipecat` extends Pipecat in three main areas:
1. Additional Frame Processors and Services: Services like NvidiaLLM, Riva, and Audio2Face are added.
2. Addition of multimodal frames: Adds support for new frame types for avatar interactions 
3. HTTP and WebSocket Server Implementation: A FastAPI-based implementation for communication to ACE (Avatar Cloud Engine)

nvidia-pipecat is part of the ACE Controller SDK, which allows developers to create controllers that leverage NVIDIA services for voice-enabled and multimodal conversational AI agents.

Starting in **Module 2**, we will use these `nvidia-pipecat` services to build a functional voice agent.

### Building a Basic LLM NIM Pipeline with Llama
Now, let's build a simple chat application using nvidia-pipecat and Llama. We'll use the `NvidiaLLMService` class, which provides an interface to NVIDIA's language model endpoints with support for streaming responses and parameter customization.

The `NvidiaLLMService` class allows us to connect to NVIDIA NIM LLM endpoints. For our example, we'll use the Llama 3.3-70B model:  
>*If you are unfamiliar with NIM endpoints, available NIMs, or advanced configurations, see our [NIM-Intro](../0-Setup/NIM-Intro.ipynb) in Module 0.*

In [6]:
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext  
from pipecat.services.ai_services import LLMService  

from nvidia_pipecat.services.nvidia_llm import NvidiaLLMService

In [7]:
# Initialize the NVIDIA LLM service with Llama 3.3 70B
llm = NvidiaLLMService(  
    model="meta/llama-3.3-70b-instruct",
    api_key=os.getenv("NVIDIA_API_KEY"),
    base_url=None
)

We use nvidia-pipecats `NvidiaLLMService` as our LLM provider to easily interface with NIM endpoints. If you are following along with a locally run NIM, you can set the `base_url` to your local endpoint.

Next, lets define a simple system prompt to provide instruction on how the LLM should behave during the conversation.

In [8]:
messages = [{
    "role": "system",
    "content": "You are a helpful assistant. Keep responses short and polite.",
}]

context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

The `OpenAILLMContext` manages the conversation history between the system, user, and assistant, acting like a chat memory manager. The `context_aggregator` uses this context and the LLM to convert user input into a full message history and will pass it to the LLM for a response.

### Create a Simple Chat Function
This function creates a simple streaming chat loop. It takes user input, sends it to the LLM with the full message history, streams the response token by token, prints it in real time, and saves the assistant’s reply to the conversation context. The loop continues until the user types “exit”.

In [9]:
async def chat():
    print("Streaming LLM Chat (type 'exit' to quit)\n")

    while True:
        user_input = input("You: ")
        if user_input.lower() == "exit":
            print("Goodbye!")
            break

        context.add_message({"role": "user", "content": user_input})

        print("Assistant: ", end="", flush=True)
        response = ""

        # Pipecat handles streaming
        stream = await llm.get_chat_completions(context, context.get_messages())

        async for chunk in stream:
            if chunk.text():
                print(chunk.text(), end="", flush=True)
                response += chunk.text()

        print()
        context.add_message({"role": "assistant", "content": response})

In [10]:
# Run the interactive chat. Type exit to quit
await chat()

Streaming LLM Chat (type 'exit' to quit)



You:  Hey there!


Assistant: Hello! How can I help you today?


You:  la la la


Assistant: That's a happy tune! Is everything okay?


You:  What's the first word I said?


Assistant: Your first word was "Hey".


You:  Thanks! Bye


Assistant: Bye! Have a great day!


You:  exit


Goodbye!


In this example, we do **not** use a `Pipeline` because the chat function is manually controlling the flow of input and output using the `NvidiaLLMService` directly. This works well for simple, linear use cases like basic text chat, where you just need to collect input, call the LLM, and print the response.

You would use a `Pipeline` when you want to connect multiple Services or Processors in a structured, real-time flow — such as chaining speech-to-text (ASR), LLM, and text-to-speech (TTS), or adding processors like sentence aggregation, speculative generation, or streaming animation.  

Pipelines are especially useful when building scalable or reusable systems like those used in digital human interfaces, where data needs to be transformed and routed through many processors in sequence. We will implement `nvidia-pipeca`

## Exercises & Further Exploration

1. **Build a Custom Frame Processor**  
   Create a new processor that uppercases all `TextFrame` contents.  
   *Hint: override `process_frame()`.*

2. **Chain Multiple Text Transforms**  
   Create a pipeline with two processors: one that trims whitespace, and one that adds punctuation.  
   Try it with a sentence like `"hello there "`.

3. **Use Observers to Log Debug Info**  
   Attach an observer to print every frame that passes between two processors.  
   Can you filter logs to show only frames containing a keyword?

4. **Implement a Simple Echo Bot Pipeline**  
   Write a Pipecat pipeline that returns every `TextFrame` input as a response.  
   Extend it to store previous messages in memory.

5. **Swap in the NVIDIA LLM**  
   Replace the echo logic with `NvidiaLLMService` using `OpenAILLMContext`.  
   Prompt it with custom system instructions and experiment with tone changes.

---

### Key Documentation to Review

- **Pipecat Core Concepts:** [Pipecat GitHub Docs - Core Concepts](https://github.com/pipecat-ai/pipecat/blob/main/docs/concepts.md)  
- **Pipecat Processors:** [Pipecat GitHub Docs - Processors](https://github.com/pipecat-ai/pipecat/blob/main/docs/processors.md)  
- **NVIDIA ACE Controller User Guide (for `nvidia-pipecat` overview):** [ACE Controller User Guide](https://docs.nvidia.com/ace/ace-controller-microservice/1.0/user-guide.html)

---

This lesson introduced you to NVIDIA Pipecat and showed how to build a basic LLM NIM pipeline using the Llama 3.3 70B model for simple chat functionality. The `nvidia-pipecat` extension makes it easy to integrate NVIDIA's powerful AI services into your applications, enabling you to create sophisticated conversational agents.

The next module will focus on building a complete speech-to-speech voice agents using the services provided by `nvidia-pipecat`.