<a href="https://colab.research.google.com/github/Tnsr-Q/Adala-/blob/main/Whitlock_Data_Injection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Of course. I have rewritten the content in the style of the first response, avoiding the previous format and optimizing it for clarity, professional presentation, and use within environments like Google Colab.

Here is the consolidated and professionally formatted guide.

### **Overview**

This guide outlines a robust data ingestion and indexing pipeline designed for processing media episodes. The workflow covers data preparation, bulk loading into Redis, vector indexing for semantic search via Mem0, and essential verification checks. Advanced techniques for handling large-scale datasets and ensuring data integrity are also provided. The code snippets are optimized for use in environments like Google Colab, with notes on handling shell commands and connecting to services.

-----

### **Step 1: Prepare Your Data Files**

First, we'll transform the source data into formats optimized for bulk loading and analysis. We will use NDJSON for episode data due to its stream-friendly nature and a standard CSV for relational "mentions" data.

#### **1.1 Format Episodes as NDJSON**

Convert your existing episode JSON files into Newline Delimited JSON (NDJSON), where each line is a self-contained JSON object.

\<details\>
\<summary\>\<strong\>Google Colab Note\</strong\>\</summary\>
To run shell commands in Colab, prefix them with `!`. You'll first need to upload your data or mount your Google Drive.
\</details\>

```bash
# In your data directory, combine all JSON files into a single NDJSON file.
# The `jq -c .` command compacts each JSON object into a single line.
!jq -c . *.json > episodes.ndjson
```

#### **1.2 Create Celebrity Mentions CSV**

Generate a CSV file that maps celebrity names to the episodes in which they are mentioned, including a timestamp and category.

```bash
# Create the header row for the CSV file.
!echo "name,episode_id,timestamp,category" > mentions.csv

# Append mentions by parsing the NDJSON file.
# This example extracts main topics and categorizes them as 'sports'.
!jq -r '.video_metadata.video_id as $id | .content_analysis.main_topics[] | "\(.),\($id),\(now | strftime("%Y-%m-%dT%H:%M:%SZ")),sports"' episodes.ndjson >> mentions.csv
```

-----

### **Step 2: One-Time Redis Bulk Load**

With the data prepared, we will perform a highly efficient bulk load into Redis using its pipe mode. This minimizes network latency by sending multiple commands in a single operation.

#### **2.1 Install Redis Tools & Connect from Colab**

You'll need `redis-cli`, which is part of the `redis-tools` package.

```bash
# Install redis-tools in the Colab environment
!sudo apt-get install redis-tools -y
!pip install redis
```

\<details\>
\<summary\>\<strong\>Connecting to a Remote Redis from Colab\</strong\>\</summary\>
When running commands like `redis-cli`, you must specify the host and port of your remote Redis instance (e.g., from Redis Cloud or Aiven).

Example: `!redis-cli -h <your-redis-host> -p <port> -a <password> --pipe`

\</details\>

#### **2.2 Load Episode Data & Mentions**

Use the pipe mode for a high-throughput atomic load.

```bash
# 1. Load all episodes. Assumes each line is a valid Redis command (e.g., SET key value).
# Replace with your Redis connection details.
!cat episodes.ndjson | redis-cli -h <host> -p <port> -a <password> --pipe

# 2. Load celebrity mentions into a Redis Sorted Set.
# This allows for powerful time-based querying.
!awk -F, '{print "ZADD person:"$1":episodes "$3" "$2}' mentions.csv | redis-cli -h <host> -p <port> -a <password> --pipe
```

-----

### **Step 3: Automated Vector Indexing with Mem0**

Next, we will index the episode content with Mem0 to enable powerful semantic search capabilities. This Python script iterates through our newly loaded Redis keys, extracts relevant data, and sends it to the Mem0 indexing service.

#### **`mem0_indexer.py`**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import asyncio
import json
import os
import redis
from mem0 import Mem0Client

# --- Configuration ---
# It's best practice to use environment variables for credentials.
# In Colab, you can set these using the 'Secrets' tab.
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
MEM0_API_KEY = os.getenv("MEM0_API_KEY")

# --- Client Initialization ---
try:
    r = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True # Simplifies handling of keys/values
    )
    r.ping() # Verify connection
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Error connecting to Redis: {e}")
    # Exit or handle error appropriately
    exit()

mem0 = Mem0Client(api_key=MEMO_API_KEY)

async def index_episodes():
    """
    Scans for legacy episode keys in Redis, extracts metadata,
    and indexes it as a vector in Mem0.
    """
    print("Starting episode indexing...")
    keys_processed = 0
    # Use scan_iter for memory-efficient iteration over large key sets
    for key in r.scan_iter("episode:*:legacy"):
        try:
            data = json.loads(r.hget(key, "raw"))

            # Extract features for vectoring and metadata with safe .get() calls
            main_topics_vector = data.get("content_analysis", {}).get("main_topics", [])
            video_meta = data.get("video_metadata", {})
            whitlock_tech = data.get("whitlock_techniques", {})

            controversy_elements = whitlock_tech.get("controversy_elements", [])
            spice_level = max(c.get("controversy_level", 0) for c in controversy_elements) if controversy_elements else 0

            await mem0.add(
                id=key, # Use a deterministic ID
                text=", ".join(main_topics_vector), # Text to be embedded by Mem0
                metadata={
                    "duration": video_meta.get("duration_seconds"),
                    "spice_score": spice_level
                }
            )
            keys_processed += 1
            if keys_processed % 100 == 0:
                print(f"Indexed {keys_processed} vectors...")

        except json.JSONDecodeError:
            print(f"Skipping key {key}: Invalid JSON in 'raw' field.")
        except Exception as e:
            print(f"An error occurred while processing key {key}: {e}")

    print(f"Finished indexing. Total vectors processed: {keys_processed}")

if __name__ == "__main__":
    asyncio.run(index_episodes())

#### **Run the Indexer**

```bash
!python mem0_indexer.py
```

-----

### **Step 4: Verification and Quick Checks**

Perform these quick "smoke tests" to validate that the data has been loaded and indexed correctly.

#### **4.1 Check Redis**

Confirm that the keys have been created and populated.

```bash
# Get a summary of large keys, filtering for our episode data
!redis-cli -h <host> -p <port> -a <password> --bigkeys | grep "episode"

# Check the cardinality (count) of a specific celebrity's sorted set.
!redis-cli -h <host> -p <port> -a <password> ZCARD "person:Deion Sanders:episodes"
```

#### **4.2 Check Mem0**

Query the Mem0 API to ensure your vectors have been indexed and are searchable.

\<details\>
\<summary\>\<strong\>Setting API Key in Colab\</strong\>\</summary\>
Store your `MEM0_KEY` in Colab's secrets manager for security. In the `curl` command, you would access it as `$MEM0_KEY`.
\</details\>

```bash
# Example search query for episodes about the "NFL"
!curl -X POST "https://api.mem0.ai/v1/search" \
     -H "Content-Type: application/json" \
     -H "Authorization: Bearer $MEM0_KEY" \
     -d '{
         "query": "episodes about the NFL",
         "limit": 5
     }'
```

-----

### **Critical Pro Tips for Scaling**

#### **Handling 500+ Episodes with Parallel Ingestion**

For very large datasets, split the data into chunks and upload them in parallel.

```bash
# Step 1: Split the master NDJSON file into smaller chunks of 100 lines each
!split -l 100 episodes.ndjson episodes_chunk_

# Step 2: Use GNU Parallel to upload chunks concurrently (install if needed)
!sudo apt-get install parallel -y
!parallel -j 4 "cat {} | redis-cli -h <host> -p <port> -a <password> --pipe" ::: episodes_chunk_*
```

#### **Automated Schema Migration and Enforcement**

To maintain data consistency, you can manage data schemas programmatically. This example shows how to enforce a schema in Google Firestore.

##### **`schema_enforcer.py`**

In [None]:
from google.cloud import firestore
from google.cloud.firestore import SERVER_TIMESTAMP

# Define the expected fields for your collections.
# Best practice: manage this in a separate config file.
SCHEMA_MAP = {
    "episodes": ["title", "air_date", "video_id", "source_url"],
    "people": ["name", "category", "first_seen"],
}

def enforce_schemas():
    """
    Writes a schema document to a Firestore `_schemas` collection.
    This can be used by security rules or backend services to validate documents.
    """
    # In Colab, you would authenticate first:
    # from google.colab import auth
    # auth.authenticate_user()
    db = firestore.Client()
    print("Enforcing Firestore schemas...")

    for collection_name, fields in SCHEMA_MAP.items():
        schema_doc_ref = db.document(f"_schemas/{collection_name}")
        schema_data = {
            "fields": {field_name: {"type": "string", "required": True} for field_name in fields},
            "last_updated": SERVER_TIMESTAMP
        }
        schema_doc_ref.set(schema_data)
        print(f"  - Schema for '{collection_name}' has been written.")

if __name__ == "__main__":
    enforce_schemas()

-----

### **Data Architecture Schemas**

The following sections define the core data structures used across the system, from the detailed episode analysis to the schemas for Redis and the Reinforcement Learning environment.

#### **Core Episode Processing Schema**

This is the canonical JSON structure for a single, fully analyzed episode.

```json
{
  "video_metadata": {
    "video_id": "string",
    "title": "string",
    "upload_date": "datetime_iso8601",
    "duration_seconds": "integer"
  },
  "content_analysis": {
    "main_topics": ["string"],
    "primary_thesis": "string",
    "argument_structure": {
      "opening_hook": "string",
      "evidence_presented": ["string"],
      "conclusion": "string"
    }
  },
  "whitlock_techniques": {
    "signature_phrases": ["string"],
    "controversy_elements": [
      {
        "spicy_quote": "string",
        "controversy_level": "float",
        "timestamp": "string"
      }
    ]
  },
  "performance_analytics": {
    "engagement_rate": "float",
    "comment_sentiment": "float",
    "viral_potential": "float"
  },
  "mobile_optimization": {
    "clip_recommendations": [
      {
        "start_time": "string",
        "end_time": "string",
        "clip_title": "string",
        "viral_potential": "float"
      }
    ]
  }
}
```

#### **Redis Schema Architecture**

This defines how data is structured within Redis for high-performance access.

In [None]:
# This Python dictionary serves as a reference for your Redis key structures.

REDIS_SCHEMAS = {
    "episode_data": {
        "key_format": "episode:{video_id}:legacy",
        "type": "Hash",
        "fields": {
            "raw": "The full JSON object of the episode, compressed (e.g., gzip).",
            "timestamp": "ISO 8601 timestamp for time-based filtering."
        }
    },
    "person_mentions": {
        "key_format": "person:{normalized_name}:episodes",
        "type": "Sorted Set (ZSET)",
        "description": "Stores episode IDs where a person is mentioned.",
        "score": "Unix timestamp of the mention.",
        "value": "episode_id"
    },
    "historical_stats": {
        "key_format": "whitlock:stats:{period}", # e.g., period = "90d"
        "type": "Hash",
        "fields": {
            "avg_spice_score": "float",
            "top_topics": "JSON string array",
            "top_mentions": "JSON string array"
        }
    }
}

#### **Mem0 Indexing & RL Environment Schema**

This defines the structure for Mem0, covering both semantic search indexing and the Reinforcement Learning environment's configuration.

In [None]:
# This file outlines the schemas for Mem0.

# 1. Mem0 Reinforcement Learning (RL) Environment Schema
# This dictionary is stored as metadata against a single vector in Mem0.
RL_ENVIRONMENT_SCHEMA = {
    "state_space": {
        "description": "Defines the inputs the agent observes.",
        "current_episode_content": "embedding:768",
        "live_audience_feedback": "embedding:256",
        "moral_alignment_score": "float"
    },
    "action_space": {
        "description": "Defines the possible actions the agent can take.",
        "actions": [
            {"name": "pivot_topic", "params": {"target_topic": "string", "intensity": "float"}},
            {"name": "introduce_rhetorical_device", "params": {"device_type": "enum"}}
        ]
    },
    "reward_components": {
        "description": "Defines the sources and weights for the reward signal.",
        "engagement": {"weight": 0.6, "source": "redis"},
        "moral_alignment": {"weight": 0.3, "source": "cerebras_api"},
        "novelty": {"weight": 0.1, "source": "mem0_search"}
    }
}

# 2. Example: Storing the RL schema in Mem0
async def initialize_rl_env_in_mem0(mem0_client):
    """Stores the RL environment schema in Mem0 for easy retrieval."""
    await mem0_client.add(
        id="rl_env:whitlock_production_v1",
        text="Configuration for the Whitlock production RL environment.",
        metadata=RL_ENVIRONMENT_SCHEMA
    )
    print("RL Environment schema successfully stored in Mem0.")

#### **Firestore Temporary Article Pipeline**

This schema defines the structure for temporary articles managed in Firestore, which pass through a multi-stage verification process.

In [None]:
# This file defines the schema for the Firestore-based verification pipeline.

from pydantic import BaseModel, Field
from typing import List, Dict

class Claim(BaseModel):
    claim_text: str
    verification_status: str = "unverified"
    cerebras_analysis: Dict = Field(default_factory=dict)

class ProcessingStage(BaseModel):
    completed: bool = False
    approval_status: str = "pending"
    score: float = 0.0
    notes: str = ""

class FirestoreArticle(BaseModel):
    """Pydantic model for a document in the 'temp_articles' collection."""
    raw_content: str
    source_episode_id: str
    created_at: str # ISO 8601 timestamp
    status: str = "pending_verification"
    extracted_claims: List[Claim] = Field(default_factory=list)
    processing_stages: Dict[str, ProcessingStage] = Field(default_factory=dict) # e.g., {"ppo_eval": {...}, "shisa_approval": {...}}

# This Pydantic model can be used in your application to validate data
# before writing to Firestore, ensuring data integrity.
#
# Example Usage:
# article_data = FirestoreArticle(raw_content="...", source_episode_id="abc-123", ...)
# firestore_client.collection("temp_articles").add(article_data.model_dump())

<div class="md-recitation">
  Sources
  <ol>
  <li><a href="https://github.com/Lightning-AI/lightning">https://github.com/Lightning-AI/lightning</a> subject to Apache - 2.0</li>
  </ol>
</div>