diff --git a/README.md b/README.md index 74aa716..6b9ed0c 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,20 @@ The API is at . - **Python 3.11+** - **Postgres** -- **tsvector** - Superfast search +- **tsvector** - Superfast full-text search (with GIN index) +### Full-Text Search (tsvector) + +The prospects table includes a `search_vector` column (type: tsvector) that is automatically computed from all text fields on insert. A GIN index is created for this column, enabling fast and scalable full-text search queries. + +**How it works:** +- On every insert (via `/prospects/seed` or `/prospects/process`), the `search_vector` is computed from all text columns using PostgreSQL's `to_tsvector('english', ...)`. +- The GIN index (`idx_prospects_search_vector`) allows efficient search queries like: + +```sql +SELECT * FROM prospects WHERE search_vector @@ plainto_tsquery('english', 'search terms'); +``` + +This makes searching across all text fields in the prospects table extremely fast, even for large datasets. - **FastAPI** — RESTful API framework - **Uvicorn** — ASGI server - **Pytest** — testing framework @@ -60,5 +73,24 @@ requirements.txt | GET | `/` | Welcome message | | GET | `/health` | Health check — returns `ok` | | POST | `/echo` | Echoes the JSON `message` field | +| GET | `/prospects/seed` | (Re)create prospects table and seed with sample data | +| DELETE | `/prospects/process` | (Legacy) Empties the prospects table | +| GET | `/prospects/process` | Process and insert all records from big.csv into prospects table | + +### Processing Large CSV Files + +The `/prospects/process` endpoint is designed for robust, scalable ingestion of large CSV files (e.g., 1300+ rows, 300KB+). It follows the same normalization and insertion pattern as `/prospects/seed`, but is optimized for large files: + + +#### Example usage + +1. Seed the table structure: + - `GET /prospects/seed` +2. (Optional) Empty the table: + - `DELETE /prospects/empty` +3. Process the large CSV: + - `GET /prospects/process` + +The endpoint will return the number of records inserted. This is the core ingestion workflow for production-scale data. diff --git a/app/__init__.py b/app/__init__.py index 068066f..7c58eef 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,4 +1,4 @@ """NX AI - FastAPI/Python/Postgres/tsvector""" # Current Version -__version__ = "1.1.2" +__version__ = "1.1.3" diff --git a/app/api/prospects/database/alter.py b/app/api/prospects/database/alter.py new file mode 100644 index 0000000..ad7b308 --- /dev/null +++ b/app/api/prospects/database/alter.py @@ -0,0 +1,64 @@ +from fastapi import APIRouter, status +from app.utils.db import get_db_connection + +router = APIRouter() + +@router.get("/prospects/alter", status_code=status.HTTP_200_OK) +def alter_prospects_table() -> dict: + """ + Checks if the 'prospects' table exists, then checks if the 'secondary_email' column exists. + If both exist, attempts to drop the 'secondary_email' column and returns the result. + """ + import psycopg2 + column_name = 'tertiary_email_verification_source' # Change this variable to alter a different column + conn_gen = get_db_connection() + conn = next(conn_gen) + cur = conn.cursor() + try: + # Check if 'prospects' table exists + cur.execute(""" + SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'prospects' + ); + """) + table_row = cur.fetchone() + if table_row is None: + result = {"detail": "Error: Could not fetch table existence result."} + return result + table_exists = table_row[0] + if not table_exists: + result = {"detail": "Table 'prospects' does not exist."} + return result + + # Check if the column exists + cur.execute(f""" + SELECT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'prospects' AND column_name = %s + ); + """, (column_name,)) + column_row = cur.fetchone() + if column_row is None: + result = {"detail": "Error: Could not fetch column existence result."} + return result + column_exists = column_row[0] + if not column_exists: + result = {"detail": f"Column '{column_name}' does not exist in 'prospects' table."} + return result + + # Try to drop the column + try: + cur.execute(f'ALTER TABLE prospects DROP COLUMN {column_name};') + conn.commit() + result = {"detail": f"Column '{column_name}' dropped successfully from 'prospects' table."} + except Exception as e: + conn.rollback() + result = {"detail": f"Failed to drop column: {str(e)}"} + except Exception as e: + conn.rollback() + result = {"detail": f"Error: {str(e)}"} + finally: + cur.close() + conn.close() + return result \ No newline at end of file diff --git a/app/api/prospects/data/big.csv b/app/api/prospects/database/big.csv similarity index 100% rename from app/api/prospects/data/big.csv rename to app/api/prospects/database/big.csv diff --git a/app/api/prospects/empty.py b/app/api/prospects/database/empty.py similarity index 100% rename from app/api/prospects/empty.py rename to app/api/prospects/database/empty.py diff --git a/app/api/prospects/database/process.py b/app/api/prospects/database/process.py new file mode 100644 index 0000000..3a469b5 --- /dev/null +++ b/app/api/prospects/database/process.py @@ -0,0 +1,71 @@ +import os, time +from fastapi import APIRouter, status +from app.utils.db import get_db_connection + +router = APIRouter() + +CSV_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data/big.csv')) + + +import csv +import io + +def normalize_column(col): + import re + col = col.strip().lower().replace(' ', '_') + col = re.sub(r'[^a-z0-9_]', '', col) + if col and col[0].isdigit(): + col = '_' + col + return col + +@router.get("/prospects/process", status_code=status.HTTP_200_OK) +def process_prospects() -> dict: + """ + Process and insert data from the large CSV file (big.csv) into the prospects table. + The table must already exist with the correct columns (run seed and empty first). + This endpoint is robust and scalable for large files. + """ + import psycopg2 + BATCH_SIZE = 200 + conn_gen = get_db_connection() + conn = next(conn_gen) + cur = conn.cursor() + inserted = 0 + try: + with open(CSV_PATH, newline='', encoding='utf-8') as csvfile: + reader = csv.reader(csvfile) + columns_raw = next(reader) + remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'} + columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols] + col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols] + placeholders = ', '.join(['%s'] * len(columns)) + batch = [] + for row in reader: + filtered_row = [row[i] for i in col_indices] + text_content = ' '.join([str(val) for val in filtered_row if val is not None]) + batch.append(filtered_row + [text_content]) + if len(batch) >= BATCH_SIZE: + cur.executemany( + f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))", + batch + ) + inserted += len(batch) + batch = [] + if batch: + cur.executemany( + f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))", + batch + ) + inserted += len(batch) + conn.commit() + result = {"detail": f"Inserted {inserted} records from big.csv into prospects table."} + except psycopg2.errors.UndefinedTable: + conn.rollback() + result = {"detail": "Table 'prospects' does not exist. No records inserted."} + except Exception as e: + conn.rollback() + result = {"detail": f"Error: {str(e)}"} + finally: + cur.close() + conn.close() + return result \ No newline at end of file diff --git a/app/api/prospects/data/seed.csv b/app/api/prospects/database/seed.csv similarity index 100% rename from app/api/prospects/data/seed.csv rename to app/api/prospects/database/seed.csv diff --git a/app/api/prospects/seed.py b/app/api/prospects/database/seed.py similarity index 74% rename from app/api/prospects/seed.py rename to app/api/prospects/database/seed.py index 88bbb32..955221b 100644 --- a/app/api/prospects/seed.py +++ b/app/api/prospects/database/seed.py @@ -29,19 +29,36 @@ def seed_prospects() -> dict: import io reader = csv.reader(io.StringIO(csv_data)) columns_raw = next(reader) - columns = [normalize_column(col) for col in columns_raw] + # Remove 'Secondary Email' column and its variants + remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'} + columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols] + col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols] - # Drop and recreate table + + # Drop and recreate table with tsvector column cur.execute('DROP TABLE IF EXISTS prospects;') create_cols = ',\n '.join([f'{col} TEXT' for col in columns]) - cur.execute(f'''CREATE TABLE prospects (\n id SERIAL PRIMARY KEY,\n {create_cols}\n);''') + cur.execute(f''' + CREATE TABLE prospects ( + id SERIAL PRIMARY KEY, + {create_cols}, + search_vector tsvector + ); + ''') + # Create GIN index for full-text search + cur.execute('CREATE INDEX IF NOT EXISTS idx_prospects_search_vector ON prospects USING GIN (search_vector);') + - # Insert rows + # Insert rows with tsvector for row in reader: + # Only keep values for columns we want + filtered_row = [row[i] for i in col_indices] placeholders = ', '.join(['%s'] * len(columns)) + # Concatenate all text fields for tsvector + text_content = ' '.join([str(val) for val in filtered_row if val is not None]) cur.execute( - f"INSERT INTO prospects ({', '.join(columns)}) VALUES ({placeholders})", - row + f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))", + filtered_row + [text_content] ) conn.commit() diff --git a/app/api/prospects/prospects.py b/app/api/prospects/prospects.py index b911ab4..69b702c 100644 --- a/app/api/prospects/prospects.py +++ b/app/api/prospects/prospects.py @@ -6,6 +6,35 @@ router = APIRouter() + +# Endpoint to get unique values for specified fields +from fastapi import Query + +@router.get("/prospects/unique") +def get_unique_fields(fields: list[str] = Query(..., description="List of field names to get unique values for")) -> dict: + """Return lists of unique values and their counts for specified fields in the prospects table.""" + conn_gen = get_db_connection() + conn = next(conn_gen) + cur = conn.cursor() + result = {} + errors = {} + try: + for field in fields: + try: + cur.execute(f'SELECT "{field}", COUNT(*) FROM prospects WHERE "{field}" IS NOT NULL GROUP BY "{field}" ORDER BY COUNT(*) DESC;') + values = [ + {"value": row[0], "count": row[1]} for row in cur.fetchall() + ] + result[field] = values + except Exception as e: + errors[field] = str(e) + meta = make_meta("success", f"Unique values and counts for fields: {fields}") + return {"meta": meta, "data": result, "errors": errors if errors else None} + finally: + cur.close() + conn.close() + + @router.get("/prospects") def root() -> dict: """Return all prospects table records""" @@ -24,13 +53,13 @@ def root() -> dict: }, ] try: - cur.execute('SELECT * FROM prospects;') + cur.execute('SELECT * FROM prospects LIMIT 200;') if cur.description is None: prospects = [] else: columns = [desc[0] for desc in cur.description] prospects = [dict(zip(columns, row)) for row in cur.fetchall()] - meta = make_meta("success", "Prospects List") + meta = make_meta("success", "Prospects List (max 200)") result = {"meta": meta, "data": prospects} except Exception as e: import psycopg2 @@ -44,3 +73,12 @@ def root() -> dict: cur.close() conn.close() return result + + +# New endpoint: /prospects/init +@router.get("/prospects/init") +def prospects_init() -> dict: + """Initialize prospects (placeholder endpoint)""" + meta = make_meta("success", "Initialized prospects (placeholder)") + data = {"message": "This is a placeholder for prospects/init."} + return {"meta": meta, "data": data} diff --git a/app/api/routes.py b/app/api/routes.py index 6c40fbb..94ad73f 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -13,13 +13,17 @@ from app.api.root import router as root_router from app.api.health import router as health_router -from app.api.prospects.prospects import router as prospects_router -from app.api.prospects.seed import router as prospects_seed_router -from app.api.prospects.empty import router as prospects_empty_router +from app.api.prospects.prospects import router as prospects_router +from app.api.prospects.database.alter import router as prospects_alter_router +from app.api.prospects.database.seed import router as prospects_seed_router +from app.api.prospects.database.empty import router as prospects_empty_router +from app.api.prospects.database.process import router as prospects_process_router router.include_router(root_router) router.include_router(health_router) router.include_router(prospects_router) +router.include_router(prospects_alter_router) router.include_router(prospects_seed_router) -router.include_router(prospects_empty_router) \ No newline at end of file +router.include_router(prospects_empty_router) +router.include_router(prospects_process_router) diff --git a/app/static/favicon.ico b/app/static/favicon.ico old mode 100755 new mode 100644 index 8e4f671..aba0d8c Binary files a/app/static/favicon.ico and b/app/static/favicon.ico differ diff --git a/tests/prospects/test_prospects.py b/tests/prospects/test_prospects.py new file mode 100644 index 0000000..c4cdd2e --- /dev/null +++ b/tests/prospects/test_prospects.py @@ -0,0 +1,47 @@ +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))) +import pytest +from fastapi.testclient import TestClient +from app.main import app + +client = TestClient(app) + +def test_prospects_init_returns_placeholder(): + response = client.get("/prospects/init") + assert response.status_code == 200 + data = response.json() + assert "meta" in data + assert "data" in data + assert data["data"]["message"] == "This is a placeholder for prospects/init." + +def test_prospects_returns_list(): + response = client.get("/prospects") + assert response.status_code == 200 + data = response.json() + assert "meta" in data + assert "data" in data + assert isinstance(data["data"], list) or isinstance(data["data"], dict) + +def test_prospects_unique_valid_fields(): + # This test assumes at least one valid field exists in the prospects table, e.g., 'id'. + response = client.get("/prospects/unique?fields=id") + assert response.status_code == 200 + data = response.json() + assert "meta" in data + assert "data" in data + assert isinstance(data["data"], dict) + +def test_prospects_unique_invalid_field(): + response = client.get("/prospects/unique?fields=notafield") + assert response.status_code == 200 + data = response.json() + assert "meta" in data + assert "errors" in data + assert "notafield" in data["errors"] + +def test_prospects_init_meta_keys(): + response = client.get("/prospects/init") + meta = response.json()["meta"] + for key in ["severity", "title", "version", "base_url", "time"]: + assert key in meta diff --git a/tests/test_health.py b/tests/test_health.py new file mode 100644 index 0000000..dc37299 --- /dev/null +++ b/tests/test_health.py @@ -0,0 +1,24 @@ +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../"))) +import pytest +from fastapi.testclient import TestClient +from app.main import app + +client = TestClient(app) + +def test_health_endpoint(): + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + +def test_health_meta_keys(): + response = client.get("/health") + data = response.json() + if "meta" in data: + meta = data["meta"] + for key in ["severity", "title", "version", "base_url", "time"]: + assert key in meta + else: + # Legacy: no meta, just status + assert data == {"status": "ok"}