# AI Tinkerers Demo: Streaming and Async Calls with LLMs

## 0. Load env vars and set client

In [5]:
from ai21 import AI21Client
from ai21.models.chat import ChatMessage
import dotenv
import time

# Load environment variables from .env file
dotenv.load_dotenv()

# Set up the AI21 client
client = AI21Client()

model = "jamba-mini-1.6-2025-03"

## 1. Typical calls to API LLMs

In [None]:
# Define the system and user messages
system = "You're a data feature enrichment engineer."
messages = [
    ChatMessage(content=system, role="system"),
    ChatMessage(content="Hello, I need help with enriching CSVs.", role="user"),
]

# Start timing the request
start_time = time.time()

# Make the API call
chat_completions = client.chat.completions.create(
    messages=messages,
    model=model,
)

# Calculate the elapsed time
elapsed_time = time.time() - start_time

# Print the chat completion response
print("Chat Completion Response:")
print("-" * 50)
print(f"Role: {chat_completions.choices[0].message.role}")
print(f"Content: {chat_completions.choices[0].message.content}")
print("-" * 50)

# Display additional metadata about the response
# Using dir() and vars() to safely inspect the object structure
print("\nResponse Metadata:")
print(f"Request Time: {elapsed_time:.2f} seconds")

# Safely access usage information
if hasattr(chat_completions, 'usage'):
    usage = chat_completions.usage
    print(f"Usage - Prompt Tokens: {usage.prompt_tokens}")
    print(f"Usage - Completion Tokens: {usage.completion_tokens}")
    print(f"Usage - Total Tokens: {usage.total_tokens}")

# Print the model name from the request rather than trying to access it from the response
print(f"Model: jamba-mini-1.6-2025-03")

# Print the full response structure for debugging (optional)
print("\nFull Response Structure:")
print(f"Response type: {type(chat_completions)}")

### Now use this to loop through rows in drug_reviews.csv to extract "Positive, Negative" given the text in "review" column

In [None]:
# Import necessary libraries for data processing and sentiment analysis
import pandas as pd
import time
from ai21 import AI21Client
from ai21.models.chat import ChatMessage
import csv
from tqdm import tqdm

# Load the drug reviews dataset
try:
    # Try to load the first few rows to understand the structure
    df = pd.read_csv('drug_reviews.csv', nrows=5)
    print(f"Successfully loaded sample from drug_reviews.csv")
    print(f"Columns: {df.columns.tolist()}")
    
    # Load the full dataset or a subset for processing
    # Limiting to 20 rows for demonstration purposes
    # Remove the nrows parameter to process the entire file
    df = pd.read_csv('drug_reviews.csv', nrows=20)
    print(f"\nLoaded {len(df)} rows for processing")
    
    # Create a new column to store sentiment analysis results
    df['sentiment'] = ""
    
    # Set up the AI21 client (using the client already initialized above)
    client = AI21Client()
    model = "jamba-mini-1.6-2025-03"
    
    # Define the system prompt for sentiment analysis
    system_prompt = "You are a sentiment analysis expert. Analyze the drug review and respond 'Positive' or 'Negative' with reasoning."
    
    # Process each review and extract sentiment
    print("\nProcessing reviews for sentiment analysis...")
    
    # Track time for performance analysis
    start_time = time.time()
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        # Create messages for the API call
        messages = [
            ChatMessage(content=system_prompt, role="system"),
            ChatMessage(content=f"Analyze the sentiment of this drug review: {row['review']}", role="user"),
        ]
        
        # Make the API call
        try:
            response = client.chat.completions.create(
                messages=messages,
                model=model,
            )
            
            # Extract the sentiment from the response
            sentiment = response.choices[0].message.content.strip()
            
            # Store the sentiment in the dataframe
            df.at[idx, 'sentiment'] = sentiment
            
            # Add a small delay to avoid rate limiting (if needed)
            time.sleep(0.1)
            
        except Exception as e:
            print(f"Error processing row {idx}: {str(e)}")
            df.at[idx, 'sentiment'] = "Error"
    
    # Calculate elapsed time
    elapsed_time = time.time() - start_time
    print(f"\nProcessing completed in {elapsed_time:.2f} seconds")
    
    # Display the results
    print("\nSentiment Analysis Results:")
    print(df[['review', 'sentiment']].head(10))
    
    # Save the results to a new CSV file
    output_file = 'drug_reviews_with_sentiment.csv'
    df.to_csv(output_file, index=False)
    print(f"\nResults saved to {output_file}")
    
    # Calculate some statistics
    sentiment_counts = df['sentiment'].value_counts()
    print("\nSentiment Distribution:")
    print(sentiment_counts)
    
    # Calculate percentage of positive reviews
    if len(df) > 0:
        positive_percentage = (sentiment_counts.get('Positive', 0) / len(df)) * 100
        print(f"\nPercentage of Positive Reviews: {positive_percentage:.2f}%")
    
except FileNotFoundError:
    print("Error: drug_reviews.csv file not found. Please check the file path.")
except Exception as e:
    print(f"An unexpected error occurred: {str(e)}")

## 2. Now with streaming

In [None]:
# Define a streaming function to demonstrate real-time responses from the LLM
# This builds on the previous section by showing how to stream responses
# rather than waiting for the complete response

import time
from ai21 import AI21Client
from ai21.models.chat import ChatMessage

# Define the system and user messages (similar to previous example)
system = "You're a data feature enrichment engineer."
messages = [
    ChatMessage(content=system, role="system"),
    ChatMessage(content="Explain how to enrich a CSV with reviews with sentiment data in 5 steps.", role="user"),
]

# Start timing the request
start_time = time.time()

# Make the streaming API call
# The stream=True parameter enables streaming responses
print("Streaming Response:")
print("-" * 50)

# Initialize variables to track token usage
total_tokens = 0
response_content = ""

# Stream the response
for chunk in client.chat.completions.create(
    messages=messages,
    model=model,
    stream=True,
):
    # Extract and print each chunk of the response as it arrives
    if chunk.choices and chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        print(content, end="", flush=True)
        response_content += content
        total_tokens += 1  # Approximate token count

# Calculate the elapsed time
elapsed_time = time.time() - start_time
print("\n" + "-" * 50)

# Display metadata about the streaming response
print("\nStreaming Response Metadata:")
print(f"Request Time: {elapsed_time:.2f} seconds")
print(f"Approximate Completion Tokens: {total_tokens}")
print(f"Model: {model}")

# Compare streaming vs non-streaming
print("\nAdvantages of Streaming:")
print("1. Immediate feedback to users")
print("2. Better perceived performance")
print("3. Ability to process partial responses")
print("4. Can cancel long-running requests early")

### Loop through rows in drug_reviews.csv to extract "Positive, Negative" given the text in "review" column while streaming each one

In [None]:
# Import necessary libraries for streaming sentiment analysis
import pandas as pd
import time
from ai21 import AI21Client
from ai21.models.chat import ChatMessage
from tqdm import tqdm

# Load the drug reviews dataset
try:
    # Load the dataset (limited to 20 rows for demonstration)
    print("Loading drug reviews dataset...")
    df = pd.read_csv('drug_reviews.csv', nrows=20)
    print(f"Loaded {len(df)} rows for processing")
    
    # Create a new column to store sentiment analysis results
    df['sentiment'] = ""
    
    # Set up the AI21 client (using the client already initialized above)
    client = AI21Client()
    model = "jamba-mini-1.6-2025-03"
    
    # Define the system prompt for sentiment analysis
    system_prompt = "You are a sentiment analysis expert. Analyze the drug review and respond with 'Positive' or 'Negative' with reasoning."
    
    # Process each review and extract sentiment with streaming
    print("\nProcessing reviews with streaming sentiment analysis...")
    
    # Track time for performance analysis
    start_time = time.time()
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        # Create messages for the API call
        messages = [
            ChatMessage(content=system_prompt, role="system"),
            ChatMessage(content=f"Analyze the sentiment of this drug review: {row['review']}", role="user"),
        ]
        
        # Make the streaming API call
        try:
            print(f"\nAnalyzing review {idx+1}/{len(df)}: ", end="", flush=True)
            
            # Initialize variables to collect the streamed response
            full_response = ""
            
            # Stream the response
            for chunk in client.chat.completions.create(
                messages=messages,
                model=model,
                stream=True,
            ):
                # Extract and print each chunk of the response as it arrives
                if chunk.choices and chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    print(content, end="", flush=True)
                    full_response += content
            
            # Store the sentiment in the dataframe
            df.at[idx, 'sentiment'] = full_response.strip()
            
            # Add a small delay to avoid rate limiting (if needed)
            time.sleep(0.1)
            
        except Exception as e:
            print(f"\nError processing row {idx}: {str(e)}")
            df.at[idx, 'sentiment'] = "Error"
    
    # Calculate elapsed time
    elapsed_time = time.time() - start_time
    print(f"\n\nProcessing completed in {elapsed_time:.2f} seconds")
    
    # Display the results
    print("\nSentiment Analysis Results:")
    print(df[['review', 'sentiment']].head(10))
    
    # Save the results to a new CSV file
    output_file = 'drug_reviews_with_streaming_sentiment.csv'
    df.to_csv(output_file, index=False)
    print(f"\nResults saved to {output_file}")
    
    # Calculate some statistics
    sentiment_counts = df['sentiment'].value_counts()
    print("\nSentiment Distribution:")
    print(sentiment_counts)
    
    # Calculate percentage of positive reviews
    if len(df) > 0:
        positive_count = sum(1 for s in df['sentiment'] if s.lower().startswith('positive'))
        positive_percentage = (positive_count / len(df)) * 100
        print(f"\nPercentage of Positive Reviews: {positive_percentage:.2f}%")
    
except FileNotFoundError:
    print("Error: drug_reviews.csv file not found. Please check the file path.")
except Exception as e:
    print(f"An unexpected error occurred: {str(e)}")

## 3. Now with Concurrent Calls with Async Client

In [None]:
# Import necessary libraries for async operations
import asyncio
import time
from ai21 import AsyncAI21Client
from ai21.models.chat import ChatMessage

# Define a function to make async API calls to AI21
async def make_async_call(client, messages, model_name):
    start_time = time.time()
    
    # Make the async API call
    response = await client.chat.completions.create(
        messages=messages,
        model=model_name,
    )
    
    elapsed_time = time.time() - start_time
    
    # Return both the response and timing information
    return response, elapsed_time

# Set up the async client using environment variables (already loaded above)
async_client = AsyncAI21Client()

# Define different prompts to demonstrate parallel processing
system_prompt1 = "You're a data feature enrichment engineer."
system_prompt2 = "You're a customer support specialist."
system_prompt3 = "You're a technical documentation writer."

# Create message sets for each prompt
messages1 = [
    ChatMessage(content=system_prompt1, role="system"),
    ChatMessage(content="Hello, I need help with enriching CSVs.", role="user"),
]
messages2 = [
    ChatMessage(content=system_prompt2, role="system"),
    ChatMessage(content="Hello, I need help with enriching CSVs.", role="user"),
]
messages3 = [
    ChatMessage(content=system_prompt3, role="system"),
    ChatMessage(content="Hello, I need help with enriching CSVs.", role="user"),
]

# Define the main async function to run multiple calls concurrently
async def run_concurrent_calls():
    # Start timing the entire concurrent operation
    total_start_time = time.time()
    
    # Run all three API calls concurrently
    tasks = [
        make_async_call(async_client, messages1, model),
        make_async_call(async_client, messages2, model),
        make_async_call(async_client, messages3, model),
    ]
    
    # Gather results from all tasks
    results = await asyncio.gather(*tasks)
    
    total_elapsed_time = time.time() - total_start_time
    
    # Print results from each call
    for i, (response, call_time) in enumerate(results, 1):
        print(f"\nResponse {i}:")
        print("-" * 50)
        print(f"Role: {response.choices[0].message.role}")
        print(f"Content: {response.choices[0].message.content}")
        print(f"Request Time: {call_time:.2f} seconds")
        
        # Print usage information
        if hasattr(response, 'usage'):
            usage = response.usage
            print(f"Usage - Prompt Tokens: {usage.prompt_tokens}")
            print(f"Usage - Completion Tokens: {usage.completion_tokens}")
            print(f"Usage - Total Tokens: {usage.total_tokens}")
    
    print("\nConcurrent Execution Summary:")
    print(f"Total time for all 3 requests: {total_elapsed_time:.2f} seconds")
    print(f"Average time per request if sequential: {sum(time for _, time in results)/len(results):.2f} seconds")
    print(f"Time saved with async: {(sum(time for _, time in results) - total_elapsed_time):.2f} seconds")

# Run the async function
await run_concurrent_calls()

### Concurrently extract "Positive, Negative" with reasoning given the text in "review" column in drug_reviews.csv

In [None]:
# Import necessary libraries for async sentiment analysis
import pandas as pd
import asyncio
import time
from ai21 import AsyncAI21Client
from ai21.models.chat import ChatMessage
from tqdm.asyncio import tqdm_asyncio

# Define an async function to process a single review
async def process_review(client, model, review, idx, total):
    """
    Process a single drug review for sentiment analysis using async API calls.
    
    Args:
        client: The AsyncAI21Client instance
        model: The AI model to use
        review: The text of the review to analyze
        idx: The index of the current review
        total: The total number of reviews to process
        
    Returns:
        The sentiment analysis result as a string
    """
    # Define the system prompt for sentiment analysis
    system_prompt = "You are a sentiment analysis expert. Analyze the drug review and respond 'Positive' or 'Negative' with reasoning."
    
    # Create messages for the API call
    messages = [
        ChatMessage(content=system_prompt, role="system"),
        ChatMessage(content=f"Analyze the sentiment of this drug review: {review}", role="user"),
    ]
    
    # Make the API call with streaming
    try:
        print(f"\nAnalyzing review {idx+1}/{total}: ", end="", flush=True)
        
        # Initialize variable to collect the streamed response
        full_response = ""
        
        # Stream the response
        async for chunk in await client.chat.completions.create(
            messages=messages,
            model=model,
            stream=True,
        ):
            # Extract and print each chunk of the response as it arrives
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                print(content, end="", flush=True)
                full_response += content
        
        # Return the sentiment analysis result
        return full_response.strip()
        
    except Exception as e:
        print(f"\nError processing review {idx+1}: {str(e)}")
        return "Error"

# Define the main async function to process all reviews concurrently
async def process_reviews_concurrently(df, batch_size=5):
    """
    Process multiple reviews concurrently in batches using async API calls.
    
    Args:
        df: DataFrame containing the reviews to process
        batch_size: Number of reviews to process concurrently in each batch
        
    Returns:
        Tuple of (processed DataFrame, elapsed time)
    """
    # Set up the async client
    async_client = AsyncAI21Client()
    model = "jamba-mini-1.6-2025-03"
    
    # Create a new column to store sentiment analysis results
    df['sentiment'] = ""
    
    # Track time for performance analysis
    start_time = time.time()
    
    # Process reviews in batches to avoid overwhelming the API
    for i in range(0, len(df), batch_size):
        batch_df = df.iloc[i:i+batch_size]
        
        # Create tasks for concurrent processing
        tasks = [
            process_review(
                async_client, 
                model, 
                row['review'], 
                idx, 
                len(df)
            ) for idx, row in batch_df.iterrows()
        ]
        
        # Process the batch concurrently and get results
        results = await tqdm_asyncio.gather(*tasks)
        
        # Store results in the dataframe
        for (idx, _), sentiment in zip(batch_df.iterrows(), results):
            df.at[idx, 'sentiment'] = sentiment
    
    # Calculate elapsed time
    elapsed_time = time.time() - start_time
    return df, elapsed_time

# Main execution function
async def main():
    """
    Main function to load data, process reviews, and save results.
    """
    try:
        # Load the drug reviews dataset (limited to 20 rows for demonstration)
        print("Loading drug reviews dataset...")
        df = pd.read_csv('drug_reviews.csv', nrows=20)
        print(f"Loaded {len(df)} rows for processing")
        
        # Process the reviews concurrently
        print("\nProcessing reviews with concurrent streaming sentiment analysis...")
        result_df, elapsed_time = await process_reviews_concurrently(df)
        
        print(f"\n\nProcessing completed in {elapsed_time:.2f} seconds")
        
        # Display the results
        print("\nSentiment Analysis Results:")
        print(result_df[['review', 'sentiment']].head(10))
        
        # Save the results to a new CSV file
        output_file = 'drug_reviews_with_concurrent_sentiment.csv'
        result_df.to_csv(output_file, index=False)
        print(f"\nResults saved to {output_file}")
        
        # Calculate some statistics
        positive_count = sum(1 for s in result_df['sentiment'] if 'positive' in s.lower())
        positive_percentage = (positive_count / len(result_df)) * 100
        print(f"\nPercentage of Positive Reviews: {positive_percentage:.2f}%")
        
        # Compare with previous non-concurrent approach
        print("\nPerformance Comparison:")
        print(f"Concurrent processing time: {elapsed_time:.2f} seconds")
        print("Non-concurrent processing time from previous cell: ~14 seconds")
        print(f"Speed improvement: ~{14/elapsed_time:.1f}x faster with concurrent processing")
        
    except FileNotFoundError:
        print("Error: drug_reviews.csv file not found. Please check the file path.")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

# Run the main async function
await main()