### Install packages: Kafka, Postgres, NLP models, PyTorch, async tools, sentence embeddings, LLM access, vector search, progress bars.

In [None]:
!pip -q install kafka-python psycopg2-binary transformers torch asyncio aiokafka sentence-transformers litellm nest_asyncio pgvector tqdm


### This section imports the necessary Python modules and performs initial setup for our data science tasks. These include modules for interacting with Kafka, PostgreSQL (with vector support), generating text embeddings, performing sentiment analysis, and general asynchronous operations.

**Purpose:** Prepare the environment for data ingestion, processing, and storage.

In [3]:
import nest_asyncio
nest_asyncio.apply()

import asyncio
import json
import random
import os
import logging
import traceback  # Add this line

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.errors import KafkaError
import psycopg2
from pgvector.psycopg2 import register_vector  # For registering vector type                 # Correct import for Vector class
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import requests



### This section defines the configuration parameters for various components of the system, including Kafka, PostgreSQL, vllm and other settings relevant for data processing. We will use environment variables where applicable for sensitive information like database credentials and api keys

**Purpose:**  Configure external services and application parameters.

In [None]:
import os
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Kafka configuration
KAFKA_TOPIC = "traffic_data"
KAFKA_BROKER = "kafka-kafka-brokers.smart-city-traffic-management.svc.cluster.local:9092"

# PostgreSQL configuration (use environment variables or replace with your actual values)
PG_HOST = os.getenv('PG_HOST', 'postgresql.pgvector.svc.cluster.local')
PG_PORT = os.getenv('PG_PORT', '5432')
PG_USER = os.getenv('PG_USER', 'vectordb')
PG_PASSWORD = os.getenv('PG_PASSWORD', 'vectordb')
PG_DATABASE = os.getenv('PG_DATABASE', 'traffic_data')

# vllm endpoint configuration
VLLM_MODEL = "mistralai/Mistral-7B-Instruct-v0.2"  # Replace with your model name

# Set the vllm API key
os.environ["VLLM_API_KEY"] = "your-vllm-api-key"  # Update as needed

# Set the vllm endpoint
os.environ["VLLM_API_BASE"] = "http://vllm.vllm-gpu.svc.cluster.local:8000"  # Adjust the endpoint as necessary
VLLM_ENDPOINT = os.getenv('VLLM_API_BASE', 'http://vllm.vllm-gpu.svc.cluster.local:8000')

# Number of iterations for the main loop
ITERATIONS = 3  # Adjust as needed

INTERVAL = 1  # Time interval between iterations in seconds


### Explanation of `generate_traffic_data`

 This function creates a dictionary representing a snapshot of traffic conditions:

 *   **Location:** Randomly selects a location type (Downtown, Suburbs, Airport, Train Station).
 *   **Road Transportation:**
     *   `TrafficVolume:`  A random integer between 0 and 100.
     *   `TrafficSpeed:`  A random float between based off a inverse relationship with `TrafficVolume`
     *   `PublicTransitPassengers:`  Randomly generated with more passengers during "Peak" hours.
 *   **Time of Day:**  Randomly choose between "Peak," "Off-Peak," and "Night."
 *   **Weather Conditions:**  Randomly selects a weather condition.

 The simulated data is returned as a Python dictionary.

In [5]:
def generate_traffic_data():
    location_types = ["Downtown", "Suburbs", "Airport", "Train Station"]
    time_of_day = random.choice(["Peak", "Off-Peak", "Night"])
    weather_conditions = ["Sunny", "Cloudy", "Raining", "Snowing"]

    traffic_volume = random.randint(0, 100)
    traffic_speed = max(0, random.uniform(60 - (traffic_volume * 0.6), 60))
    public_transit_passengers = (
        random.randint(50, 100) if time_of_day == "Peak" else random.randint(0, 50)
    )

    return {
        "Location": random.choice(location_types),
        "Road Transportation": {
            "TrafficVolume": traffic_volume,
            "TrafficSpeed": traffic_speed,
            "PublicTransitPassengers": public_transit_passengers,
        },
        "Time of Day": time_of_day,
        "Weather Conditions": random.choice(weather_conditions)
    }


### The vllm_generate_insights function generates insights from traffic data using the vLLM model. It takes a list of traffic data summaries and a custom prompt, concatenates them, and sends the prompt to the vLLM model endpoint to generate insights. The generated insights are then logged and printed.

In [None]:
def vllm_generate_insights(similar_data_texts, custom_prompt):
    # Concatenate the texts
    concatenated_text = '\n'.join(similar_data_texts)
    prompt = f"Based on the following traffic data summaries:\n{concatenated_text}\n{custom_prompt}"

    try:
        response = requests.post(
            VLLM_ENDPOINT,  # Updated to use VLLM_ENDPOINT
            json={
                "model": VLLM_MODEL,  # Updated to use VLLM_MODEL
                "prompt": prompt,
                "max_length": 500,
                "num_return_sequences": 1
            }
        )
        response.raise_for_status()
        generated_text = response.json().get('generated_text', '')  # Added default value in case key is absent
        logger.info("VLLM Model Generated Insight:")
        logger.info(generated_text)
        print("VLLM Model Generated Insight:")
        print(generated_text)
    except Exception as e:
        logger.error(f"Error querying vllm endpoint: {e}")  # Updated log message
        print("Error generating insight with VLLM model.")


### Kafka Producer

#### This section contains the function responsible for asynchronously producing messages to a Kafka topic.

**Environment:**  OpenShift AI (assumed)
**Purpose:**  Send traffic data messages to a Kafka topic for stream processing.

In [8]:
async def produce_to_kafka(producer, data):
    try:
        key = str(data["Time of Day"]).encode()
        await producer.send_and_wait(KAFKA_TOPIC, json.dumps(data).encode(), key=key)
        logger.info(f"Data sent to Kafka: {data}")
    except Exception as e:
        logger.error(f"Error sending data to Kafka: {e}")


### Data Processing and Storage

#### This section focuses on processing incoming traffic data, generating embeddings, and storing both the raw data and embeddings in a PostgreSQL database.

  **Environment:** OpenShift AI (assumed)
  **Purpose:**

      *   Transform raw traffic data into a text format.
      *   Generate text embeddings using a Sentence Transformer model.
      *   Store the raw data and embeddings in a PostgreSQL database for analysis and similarity search.
      
  **Assumptions:**
  
      *   A PostgreSQL database is deployed and accessible within the OpenShift cluster, and the necessary table (`traffic_data`) has been created with columns for `data` (JSONB) and `embedding` (VECTOR).
      *   An embedding model from the `sentence-transformers` library is initialized and available.

In [9]:
async def process_traffic_data(data, cur, conn, embedding_model):
    text = (
        f"Location: {data['Location']}, "
        f"Traffic volume: {data['Road Transportation']['TrafficVolume']}, "
        f"Speed: {data['Road Transportation']['TrafficSpeed']:.2f}, "
        f"Public transit passengers: {data['Road Transportation']['PublicTransitPassengers']}, "
        f"Time: {data['Time of Day']}, "
        f"Weather: {data['Weather Conditions']}"
    )

    # Generate embedding
    embedding = embedding_model.encode(text).tolist()

    # Store data and embedding in PostgreSQL
    cur.execute(
        """
        INSERT INTO traffic_data (data, embedding)
        VALUES (%s, %s)
        """,
        (json.dumps(data), embedding)
    )
    conn.commit()

    logger.info(f"Processed and stored data: {data}")


### Similarity Search in PostgreSQL

This section defines a function to query the PostgreSQL database for similar traffic data based on vector similarity search using embeddings.

**Environment:** OpenShift AI (assumed)
**Purpose:** Retrieve the top *k* most similar traffic data entries from the database based on a given embedding vector. This leverages PostgreSQL's vector similarity search capabilities.

**Assumptions:**
  *   A PostgreSQL database is running and accessible, with a `traffic_data` table containing `data` (JSONB) and `embedding` (VECTOR) columns.
  *   The `embedding` column is of a vector type compatible with PostgreSQL's `<#>` operator for cosine similarity calculations (e.g., `pgvector`).


In [10]:
def query_similar_traffic_data(conn, embedding, top_k=5):
    cur = conn.cursor()
    # Execute vector similarity search
    cur.execute(
        """
        SELECT data, embedding <#> %s::vector AS distance
        FROM traffic_data
        ORDER BY embedding <#> %s::vector ASC
        LIMIT %s
        """,
        (embedding, embedding, top_k)
    )
    results = cur.fetchall()
    cur.close()
    return results


###   This function takes a list of traffic data summaries and a custom prompt, concatenates them, and sends the prompt to the vLLM model to generate insights.

In [None]:
from vllm import Completion

def vllm_generate_insights(similar_data_texts, custom_prompt):
    """
    Generate insights from traffic data using the vLLM model.

    This function takes a list of traffic data summaries and a custom prompt,
    concatenates them, and sends the prompt to the vLLM model to generate insights.

    Args:
        similar_data_texts (list of str): A list of text summaries related to traffic data.
        custom_prompt (str): A custom prompt provided by the user to guide the generation.

    Returns:
        None: The generated insights are printed to the console.

    Example:
        similar_data = [
            "Traffic was heavier than usual during rush hour.",
            "Multiple accidents were reported on the main highway."
        ]
        prompt = "What are the potential impacts on traffic flow?"
        vllm_generate_insights(similar_data, prompt)
    """
    # Concatenate the texts
    concatenated_text = '\n'.join(similar_data_texts)

    try:
        response = Completion.create(
            model=VLLM_MODEL,
            messages=[{"role": "user", "content": f"{custom_prompt}\n{concatenated_text}"}],
            max_tokens=500,
            n=1
        )
        generated_text = response['choices'][0]['message']['content']
        print("vLLM Model Generated Insight:")
        print(generated_text)
    except Exception as e:
        print(f"Error generating insight with vLLM model: {e}")

INFO:httpx:HTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"


### Kafka Consumer and Data Processing Loop
This section defines the asynchronous function that consumes messages from a Kafka topic, processes them, and stores the data and embeddings in PostgreSQL.

**Purpose:** Continuously consume traffic data messages from Kafka, generate embeddings, perform similarity searches, get LLM insights, and store results. The function is designed to run asynchronously for efficient processing of streaming data.

**Dependencies:** Requires the `aiokafka` library for Kafka interaction, `psycopg2` for PostgreSQL interaction, `sentence-transformers` for embedding generation, and an vllm LLM (or another model via `litellm`).

**Assumptions:**
  *   Kafka broker, PostgreSQL database, and vllm LLM are running and configured.
  *   The `traffic_data` table exists in PostgreSQL.
  *   `KAFKA_TOPIC`, `vllm_MODEL`, `EMBEDDING_MODEL_NAME`, and other relevant environment variables are set.



In [12]:
async def consume_messages(consumer, cur, conn, embedding_model, num_messages):
    count = 0
    async for msg in consumer:
        data = json.loads(msg.value.decode('utf-8'))
        await process_traffic_data(data, cur, conn, embedding_model)
        count += 1
        if count >= num_messages:
            break


### Kafka Topic Creation

This section defines a function to create a Kafka topic if it doesn't already exist. This is useful for setting up the necessary Kafka infrastructure for your data pipeline.

**Purpose:**  Ensures that the required Kafka topic exists before starting the data streaming process. This simplifies deployment and avoids errors related to missing topics.
**Dependencies:**  `kafka-python` library. Make sure it's installed (`pip install kafka-python`).

**Assumptions:**
  *   Kafka broker is running and accessible.
  *   The `KAFKA_BROKER` environment variable is set with the correct broker address(es).


In [13]:
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1, bootstrap_servers=KAFKA_BROKER):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id='test')
    topic_list = [NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)]
    try:
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        print(f"Topic '{topic_name}' created successfully.")
    except TopicAlreadyExistsError:
        print(f"Topic '{topic_name}' already exists.")
    finally:
        admin_client.close()


### Main Application Logic

This section defines the main asynchronous function that orchestrates the entire data pipeline.


**Purpose:** This function sets up the necessary connections (PostgreSQL, Kafka), initializes the embedding model, creates the Kafka topic, produces sample traffic data, and consumes/processes it. It serves as the entry point for the complete data pipeline.

**Dependencies:**
*   `aiokafka`: For asynchronous Kafka interaction.
*   `psycopg2`:  For PostgreSQL database interaction.
*   `sentence-transformers`: For embedding generation.
*   Other functions defined earlier:  `create_kafka_topic`, `produce_to_kafka`, `consume_messages`, `generate_traffic_data`, `process_traffic_data`, `query_similar_traffic_data`, `vllm_generate_insights`.

In [None]:
async def main():
    # Connect to PostgreSQL
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        user=PG_USER,
        password=PG_PASSWORD,
        database=PG_DATABASE
    )
    register_vector(conn)
    cur = conn.cursor()

    # Create the pgvector extension
    cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
    conn.commit()

    # Create table for traffic data if it doesn't exist
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS traffic_data (
            id SERIAL PRIMARY KEY,
            data JSONB,
            embedding vector(384)
        )
        """
    )
    conn.commit()

    # Initialize embedding model
    embedding_model = SentenceTransformer('all-MiniLM-L6-v2')  # Dimension: 384

    # Create Kafka topic if it doesn't exist
    create_kafka_topic(KAFKA_TOPIC)

    # Create Kafka producer
    producer = AIOKafkaProducer(bootstrap_servers=[KAFKA_BROKER])

    await producer.start()

    try:
        for _ in range(ITERATIONS):
            # Generate and produce traffic data
            traffic_data = generate_traffic_data()
            await produce_to_kafka(producer, traffic_data)
            await asyncio.sleep(INTERVAL)

        # Consume and process traffic data
        consumer = AIOKafkaConsumer(
            KAFKA_TOPIC,
            bootstrap_servers=[KAFKA_BROKER],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='traffic-consumer-group'
        )
        await consumer.start()
        try:
            # Adjust the number of messages to consume
            await consume_messages(consumer, cur, conn, embedding_model, ITERATIONS)
        finally:
            await consumer.stop()

    except Exception as e:
        logger.error(f"An error occurred in main: {e}")
        traceback.print_exc()
    finally:
        await producer.stop()
        cur.close()
        conn.close()
        logger.info("Main function execution completed.")


### Run the main function to start the data pipeline.

In [None]:
# Run the main function
await main()


### Standalone Traffic Data Query and Analysis

This section provides a standalone script to query the PostgreSQL database for similar traffic data based on a sample text input and then generate insights using the vllm LLM.


**Purpose:**  This script allows users to manually input a description of traffic conditions, find similar historical data in the database, and get insights from an LLM (vllm via `litellm`). It's useful for ad-hoc analysis and testing the similarity search and insight generation components.

In [None]:
from tqdm import tqdm
import time  # For simulating processing time

# After data processing, perform the vllm model query

# Connect to PostgreSQL
conn = psycopg2.connect(
    host=PG_HOST,
    port=PG_PORT,
    user=PG_USER,
    password=PG_PASSWORD,
    database=PG_DATABASE
)

# Generate an embedding for a sample query
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Sample text describing traffic conditions
# City managers can modify this text to test different traffic scenarios, such as:
# - "Heavy congestion on Main Street during morning rush hour due to construction."
# - "Increased pedestrian activity near Central Park on weekends."
# - "Accident on Highway 50 causing significant delays in both directions."
# - "Road closures downtown for the annual marathon event."
# - "Snowstorm expected to impact traffic flow in the northern districts."
#sample_text = "High traffic volume in downtown during peak hours with raining conditions."
sample_text = "Accident on Highway 50 causing significant delays in both directions."
custom_prompt=""
# Display progress bar for embedding generation
with tqdm(total=1, desc="Generating embedding") as pbar:
    sample_embedding = embedding_model.encode(sample_text).tolist()
    pbar.update(1)

# Query similar traffic data
similar_data = query_similar_traffic_data(conn, sample_embedding, top_k=5)

# Extract the data texts
similar_data_texts = []
for row in similar_data:
    data = row[0]
    data_text = (
        f"Location: {data['Location']}, "
        f"Traffic volume: {data['Road Transportation']['TrafficVolume']}, "
        f"Speed: {data['Road Transportation']['TrafficSpeed']:.2f}, "
        f"Public transit passengers: {data['Road Transportation']['PublicTransitPassengers']}, "
        f"Time: {data['Time of Day']}, Weather: {data['Weather Conditions']}"
    )
    similar_data_texts.append(data_text)

# Use vllm model to generate insights
# Display progress bar while waiting for response
with tqdm(total=1, desc="Generating insights with vllm") as pbar:
    vllm_generate_insights(similar_data_texts,custom_prompt)
    pbar.update(1)

# Close the database connection
conn.close()
