In [128]:
data_path = r"dmv_site_data_base"
def convert_backslashes(input_string):
    return input_string.replace("\\", "/")
data_path = convert_backslashes(data_path)

In [129]:
import redis

# Basic connection to Redis Stack running locally
client = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

# Test the connection
try:
    response = client.ping()
    print("Connection to Redis successful!")
    print("Redis PING response:", response)
except redis.exceptions.ConnectionError as e:
    print(f"Connection to Redis failed: {e}")

Connection to Redis successful!
Redis PING response: True


In [130]:
#non recursive search for json files. Use this when providing the specific directory of the json 
import json 
from pathlib import Path
def load_jsons_to_redis(directory_path, client):
    """
    Load all JSON files from a directory into Redis
    
    Args:
        directory_path (str): Path to directory containing JSON files
        client: Redis client instance
    
    Returns:
        tuple: (number of successful insertions, list of failed files)
    """
    # Convert to Path object for better path handling
    json_dir = Path(directory_path)
    
    # Ensure directory exists
    if not json_dir.is_dir():
        raise ValueError(f"Directory not found: {directory_path}")
    
    pipeline = client.pipeline()
    file_count = 0
    failed_files = []
    
    # Process each JSON file in directory
    for json_file in json_dir.glob('*.json'):
        try:
            with open(json_file, 'r') as file:
                sections = json.load(file)
                
                # Handle both single objects and arrays of objects
                if isinstance(sections, dict):
                    sections = [sections]
                
                # Add each section to pipeline
                for i, section in enumerate(sections, start=file_count + 1):
                    redis_key = f"{data_path}:{i:05}"
                    pipeline.json().set(redis_key, "$", section)
                
                file_count += len(sections)
                
        except json.JSONDecodeError as e:
            print(f"Invalid JSON in file {json_file.name}: {e}")
            failed_files.append((json_file.name, "Invalid JSON"))
        except Exception as e:
            print(f"Error processing file {json_file.name}: {e}")
            failed_files.append((json_file.name, str(e)))
    
    # Execute pipeline if there are commands
    if file_count > 0:
        try:
            res = pipeline.execute()
            print(f"Inserted {len(res)} documents successfully!")
        except Exception as e:
            print(f"Redis pipeline execution error: {e}")
            return 0, failed_files
    else:
        print("No JSON files found in directory")
        return 0, failed_files
    
    return file_count, failed_files

load_jsons_to_redis(data_path, client)

No JSON files found in directory


(0, [])

In [131]:
#recursive search. will insert all jsons found in subdirectories. Note that this code creates redis keys that look different than the non-recursive code
import json 
from pathlib import Path
import os

def load_jsons_to_redis(directory_path, client):
    """
    Load all JSON files from a directory and its subdirectories into Redis
    
    Args:
        directory_path (str): Path to root directory containing JSON files
        client: Redis client instance
    
    Returns:
        tuple: (number of successful insertions, list of failed files)
    """
    json_dir = Path(directory_path)
    
    if not json_dir.is_dir():
        raise ValueError(f"Directory not found: {directory_path}")
    
    pipeline = client.pipeline()
    file_count = 0
    failed_files = []
    
    # Recursively search for JSON files
    for json_file in json_dir.rglob('*.json'):
        try:
            with open(json_file, 'r') as file:
                sections = json.load(file)
                
                if isinstance(sections, dict):
                    sections = [sections]
                
                # Generate Redis key based on file path
                relative_path = json_file.relative_to(json_dir)
                key_suffix = str(relative_path.with_suffix('')).replace(os.sep, ':')
                base_key = str(json_dir).replace(os.sep, ':')
                
                for i, section in enumerate(sections, start=file_count + 1):
                    redis_key = f"{base_key}:{key_suffix}:{i:05}"
                    pipeline.json().set(redis_key, "$", section)
                
                file_count += len(sections)
                
        except json.JSONDecodeError as e:
            print(f"Invalid JSON in {json_file.relative_to(json_dir)}: {e}")
            failed_files.append((str(json_file.relative_to(json_dir)), "Invalid JSON"))
        except Exception as e:
            print(f"Error processing {json_file.relative_to(json_dir)}: {e}")
            failed_files.append((str(json_file.relative_to(json_dir)), str(e)))
    
    if file_count > 0:
        try:
            res = pipeline.execute()
            print(f"Inserted {len(res)} documents successfully!")
        except Exception as e:
            print(f"Redis pipeline execution error: {e}")
            return 0, failed_files
    else:
        print("No JSON files found in directory structure")
        return 0, failed_files
    
    return file_count, failed_files

load_jsons_to_redis(data_path, client)

Inserted 320 documents successfully!


(320, [])

In [132]:
import numpy as np
from openai import OpenAI
import os
from dotenv import load_dotenv

load_dotenv(dotenv_path="c:/Users/benja/startup_projects/civgen/.env")
openai_client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY")
)

# Retrieve keys from Redis
keys = sorted(client.keys(f"{data_path}:*"))
content = client.json().mget(keys, "$.text_content")

# Flatten list while keeping track of corresponding keys
flattened_content = []
flattened_keys = []
for key, sublist in zip(keys, content):
    for item in sublist:
        flattened_content.append(item)
        flattened_keys.append(key)

# Filter and track invalid entries with their corresponding keys
valid_content = []
valid_keys = []
invalid_entries = []

for key, text in zip(flattened_keys, flattened_content):
    if isinstance(text, str):
        if text.strip():
            valid_content.append(text)
            valid_keys.append(key)
        else:
            invalid_entries.append(('empty_string', key, text))
    else:
        invalid_entries.append(('non_string', key, text))

# Print detailed invalid entries report
if invalid_entries:
    print(f"\nFound {len(invalid_entries)} invalid entries:")
    print("\nDetailed Invalid Entries Report:")
    print("-" * 60)
    for i, (error_type, key, value) in enumerate(invalid_entries, 1):
        print(f"{i}. Redis Key: {key}")
        print(f"   Error Type: {error_type}")
        print(f"   Value: {repr(value)}")
        print("-" * 60)
else:
    print("\nAll entries had valid string content")

# Then continue with your existing code...
content = valid_content
if not content:
    raise ValueError("No valid content available for generating embeddings.")

# Call OpenAI's embedding API
response = openai_client.embeddings.create(
    input=content,
    model="text-embedding-3-large"
)

# Extract embeddings using correct attribute access
embeddings = [item.embedding for item in response.data]

# Convert embeddings to float32 and list format
embeddings_array = np.array(embeddings, dtype=np.float32).tolist()

VECTOR_DIMENSION = len(embeddings_array[0]) if embeddings_array else 0
print(f"Generated {len(embeddings_array)} embeddings with dimension {VECTOR_DIMENSION}")


All entries had valid string content
Generated 320 embeddings with dimension 3072


In [133]:
pipeline = client.pipeline()
for key, embedding in zip(keys, embeddings):
  #note that its named section embeddings and not content embeddings
  pipeline.json().set(key, "$.section_embeddings", embedding)
pipeline.execute()


[True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,

In [134]:
import redis
from redis.commands.search.field import (
    TextField,
    VectorField,
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

# Define the schema fields based on your JSON structure
schema = (
    TextField("$.url", no_stem=True, as_name="url"),
    TextField("$.title", no_stem=True, as_name="title"),
    TextField("$.text_content", no_stem=True, as_name="text_content"),
    TextField("$.attachments.*", no_stem=True, as_name="attachment_paths"),
    VectorField(
        "$.section_embeddings",  # You'll need to add this to your JSON
        "FLAT",
        {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIMENSION,  # Replace with your actual dimension
            "DISTANCE_METRIC": "COSINE",
        },
        as_name="vector"
    ),
    TextField("$.page", no_stem=True, as_name="page")
)

# Define the index
definition = IndexDefinition(prefix=[f"{data_path}:"], index_type=IndexType.JSON)

# Create the index
res = client.ft(f"idx:{data_path}_vss").create_index(fields=schema, definition=definition)

In [135]:
import numpy as np
from redis.commands.search.query import Query

# 1. Encode query (keep this the same)
query_text = "How do I register a low powered bike?"
query_vector = np.array(
    openai_client.embeddings.create(
        input=query_text,
        model="text-embedding-3-large"
    ).data[0].embedding, 
    dtype=np.float32
)

# 2. Update query to match schema fields
query = (
    Query('(*)=>[KNN 2 @vector $query_vector AS vector_score]')
    .sort_by('vector_score')
    .return_fields(
        'vector_score', 
        'url',         # From TextField("$.url")
        'title',       # From TextField("$.title")
        'text_content',# From TextField("$.text_content")
        'attachment_paths',  # From TextField("$.attachments.*")
        'page'
    )
    .dialect(2)
)

# 3. Execute search (keep the same)
result = client.ft(f'idx:{data_path}_vss').search(
    query,
    {'query_vector': query_vector.tobytes()}
)

# 4. Process results with schema-aligned fields
for doc in result.docs:
    print(f"""
    Score: {doc.vector_score}
    URL: {doc.url}
    Title: {doc.title}
    Content: {doc.text_content}
    Attachments: {doc.attachment_paths}
    Page: {doc.page}
    """)



    Score: 0.536603569984
    URL: https://www.dmv.virginia.gov/vehicles/registration/moped
    Title: All About Mopeds
    Content: A moped is a small motorized vehicle that can be powered by pedals or an engine. Different rules apply to mopeds than bicycles or motorcycles.
Moped Rules of the Road
You must be at least age 16 to drive a moped.
Every moped driver operating on Virginia roadways must carry a government-issued photo ID (does not have to be a driver's license).
Every driver and passenger must wear a Virginia State Police-approved helmet while riding a moped. Drivers must also wear a face shield, safety glasses or goggles unless the moped has a windshield.
It is illegal to drive a mopeds on the interstate.
It is illegal to drive a moped if your license is suspended or revoked for convictions of DUI, underage consumption of alcohol, refusing a blood/breath test, or driving while suspended/revoked for a DUI-related offense.
If your driving privilege is suspended or revoked fo