Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,20 @@ The API is at <http://localhost:8000>.

- **Python 3.11+**
- **Postgres**
- **tsvector** - Superfast search
- **tsvector** - Superfast full-text search (with GIN index)
### Full-Text Search (tsvector)

The prospects table includes a `search_vector` column (type: tsvector) that is automatically computed from all text fields on insert. A GIN index is created for this column, enabling fast and scalable full-text search queries.

**How it works:**
- On every insert (via `/prospects/seed` or `/prospects/process`), the `search_vector` is computed from all text columns using PostgreSQL's `to_tsvector('english', ...)`.
- The GIN index (`idx_prospects_search_vector`) allows efficient search queries like:

```sql
SELECT * FROM prospects WHERE search_vector @@ plainto_tsquery('english', 'search terms');
```

This makes searching across all text fields in the prospects table extremely fast, even for large datasets.
- **FastAPI** — RESTful API framework
- **Uvicorn** — ASGI server
- **Pytest** — testing framework
Expand Down Expand Up @@ -60,5 +73,24 @@ requirements.txt
| GET | `/` | Welcome message |
| GET | `/health` | Health check — returns `ok` |
| POST | `/echo` | Echoes the JSON `message` field |
| GET | `/prospects/seed` | (Re)create prospects table and seed with sample data |
| DELETE | `/prospects/process` | (Legacy) Empties the prospects table |
| GET | `/prospects/process` | Process and insert all records from big.csv into prospects table |

### Processing Large CSV Files

The `/prospects/process` endpoint is designed for robust, scalable ingestion of large CSV files (e.g., 1300+ rows, 300KB+). It follows the same normalization and insertion pattern as `/prospects/seed`, but is optimized for large files:


#### Example usage

1. Seed the table structure:
- `GET /prospects/seed`
2. (Optional) Empty the table:
- `DELETE /prospects/empty`
3. Process the large CSV:
- `GET /prospects/process`

The endpoint will return the number of records inserted. This is the core ingestion workflow for production-scale data.


2 changes: 1 addition & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""NX AI - FastAPI/Python/Postgres/tsvector"""

# Current Version
__version__ = "1.1.2"
__version__ = "1.1.3"
64 changes: 64 additions & 0 deletions app/api/prospects/database/alter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from fastapi import APIRouter, status
from app.utils.db import get_db_connection

router = APIRouter()

@router.get("/prospects/alter", status_code=status.HTTP_200_OK)
def alter_prospects_table() -> dict:
"""
Checks if the 'prospects' table exists, then checks if the 'secondary_email' column exists.
If both exist, attempts to drop the 'secondary_email' column and returns the result.
"""
import psycopg2
column_name = 'tertiary_email_verification_source' # Change this variable to alter a different column
conn_gen = get_db_connection()
conn = next(conn_gen)
cur = conn.cursor()
try:
# Check if 'prospects' table exists
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = 'prospects'
);
""")
table_row = cur.fetchone()
if table_row is None:
result = {"detail": "Error: Could not fetch table existence result."}
return result
table_exists = table_row[0]
if not table_exists:
result = {"detail": "Table 'prospects' does not exist."}
return result

# Check if the column exists
cur.execute(f"""
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'prospects' AND column_name = %s
);
""", (column_name,))
column_row = cur.fetchone()
if column_row is None:
result = {"detail": "Error: Could not fetch column existence result."}
return result
column_exists = column_row[0]
if not column_exists:
result = {"detail": f"Column '{column_name}' does not exist in 'prospects' table."}
return result

# Try to drop the column
try:
cur.execute(f'ALTER TABLE prospects DROP COLUMN {column_name};')
conn.commit()
result = {"detail": f"Column '{column_name}' dropped successfully from 'prospects' table."}
except Exception as e:
conn.rollback()
result = {"detail": f"Failed to drop column: {str(e)}"}
except Exception as e:
conn.rollback()
result = {"detail": f"Error: {str(e)}"}
finally:
cur.close()
conn.close()
return result
File renamed without changes.
File renamed without changes.
71 changes: 71 additions & 0 deletions app/api/prospects/database/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os, time
from fastapi import APIRouter, status
from app.utils.db import get_db_connection

router = APIRouter()

CSV_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data/big.csv'))


import csv
import io

def normalize_column(col):
import re
col = col.strip().lower().replace(' ', '_')
col = re.sub(r'[^a-z0-9_]', '', col)
if col and col[0].isdigit():
col = '_' + col
return col

@router.get("/prospects/process", status_code=status.HTTP_200_OK)
def process_prospects() -> dict:
"""
Process and insert data from the large CSV file (big.csv) into the prospects table.
The table must already exist with the correct columns (run seed and empty first).
This endpoint is robust and scalable for large files.
"""
import psycopg2
BATCH_SIZE = 200
conn_gen = get_db_connection()
conn = next(conn_gen)
cur = conn.cursor()
inserted = 0
try:
with open(CSV_PATH, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
columns_raw = next(reader)
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]
placeholders = ', '.join(['%s'] * len(columns))
batch = []
for row in reader:
filtered_row = [row[i] for i in col_indices]
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
batch.append(filtered_row + [text_content])
if len(batch) >= BATCH_SIZE:
cur.executemany(
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
batch
)
inserted += len(batch)
batch = []
if batch:
cur.executemany(
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
batch
)
inserted += len(batch)
conn.commit()
result = {"detail": f"Inserted {inserted} records from big.csv into prospects table."}
except psycopg2.errors.UndefinedTable:
conn.rollback()
result = {"detail": "Table 'prospects' does not exist. No records inserted."}
except Exception as e:
conn.rollback()
result = {"detail": f"Error: {str(e)}"}
finally:
cur.close()
conn.close()
return result
File renamed without changes.
29 changes: 23 additions & 6 deletions app/api/prospects/seed.py → app/api/prospects/database/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,36 @@ def seed_prospects() -> dict:
import io
reader = csv.reader(io.StringIO(csv_data))
columns_raw = next(reader)
columns = [normalize_column(col) for col in columns_raw]
# Remove 'Secondary Email' column and its variants
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]

# Drop and recreate table

# Drop and recreate table with tsvector column
cur.execute('DROP TABLE IF EXISTS prospects;')
create_cols = ',\n '.join([f'{col} TEXT' for col in columns])
cur.execute(f'''CREATE TABLE prospects (\n id SERIAL PRIMARY KEY,\n {create_cols}\n);''')
cur.execute(f'''
CREATE TABLE prospects (
id SERIAL PRIMARY KEY,
{create_cols},
search_vector tsvector
);
''')
# Create GIN index for full-text search
cur.execute('CREATE INDEX IF NOT EXISTS idx_prospects_search_vector ON prospects USING GIN (search_vector);')


# Insert rows
# Insert rows with tsvector
for row in reader:
# Only keep values for columns we want
filtered_row = [row[i] for i in col_indices]
placeholders = ', '.join(['%s'] * len(columns))
# Concatenate all text fields for tsvector
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
cur.execute(
f"INSERT INTO prospects ({', '.join(columns)}) VALUES ({placeholders})",
row
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
filtered_row + [text_content]
)

conn.commit()
Expand Down
42 changes: 40 additions & 2 deletions app/api/prospects/prospects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,35 @@

router = APIRouter()


# Endpoint to get unique values for specified fields
from fastapi import Query

@router.get("/prospects/unique")
def get_unique_fields(fields: list[str] = Query(..., description="List of field names to get unique values for")) -> dict:
"""Return lists of unique values and their counts for specified fields in the prospects table."""
conn_gen = get_db_connection()
conn = next(conn_gen)
cur = conn.cursor()
result = {}
errors = {}
try:
for field in fields:
try:
cur.execute(f'SELECT "{field}", COUNT(*) FROM prospects WHERE "{field}" IS NOT NULL GROUP BY "{field}" ORDER BY COUNT(*) DESC;')
values = [
{"value": row[0], "count": row[1]} for row in cur.fetchall()
]
result[field] = values
except Exception as e:
errors[field] = str(e)
meta = make_meta("success", f"Unique values and counts for fields: {fields}")
return {"meta": meta, "data": result, "errors": errors if errors else None}
finally:
cur.close()
conn.close()


@router.get("/prospects")
def root() -> dict:
"""Return all prospects table records"""
Expand All @@ -24,13 +53,13 @@ def root() -> dict:
},
]
try:
cur.execute('SELECT * FROM prospects;')
cur.execute('SELECT * FROM prospects LIMIT 200;')
if cur.description is None:
prospects = []
else:
columns = [desc[0] for desc in cur.description]
prospects = [dict(zip(columns, row)) for row in cur.fetchall()]
meta = make_meta("success", "Prospects List")
meta = make_meta("success", "Prospects List (max 200)")
result = {"meta": meta, "data": prospects}
except Exception as e:
import psycopg2
Expand All @@ -44,3 +73,12 @@ def root() -> dict:
cur.close()
conn.close()
return result


# New endpoint: /prospects/init
@router.get("/prospects/init")
def prospects_init() -> dict:
"""Initialize prospects (placeholder endpoint)"""
meta = make_meta("success", "Initialized prospects (placeholder)")
data = {"message": "This is a placeholder for prospects/init."}
return {"meta": meta, "data": data}
12 changes: 8 additions & 4 deletions app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
from app.api.root import router as root_router
from app.api.health import router as health_router

from app.api.prospects.prospects import router as prospects_router

from app.api.prospects.seed import router as prospects_seed_router
from app.api.prospects.empty import router as prospects_empty_router
from app.api.prospects.prospects import router as prospects_router
from app.api.prospects.database.alter import router as prospects_alter_router
from app.api.prospects.database.seed import router as prospects_seed_router
from app.api.prospects.database.empty import router as prospects_empty_router
from app.api.prospects.database.process import router as prospects_process_router

router.include_router(root_router)
router.include_router(health_router)
router.include_router(prospects_router)
router.include_router(prospects_alter_router)
router.include_router(prospects_seed_router)
router.include_router(prospects_empty_router)
router.include_router(prospects_empty_router)
router.include_router(prospects_process_router)
Binary file modified app/static/favicon.ico
100755 → 100644
Binary file not shown.
47 changes: 47 additions & 0 deletions tests/prospects/test_prospects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")))
import pytest
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_prospects_init_returns_placeholder():
response = client.get("/prospects/init")
assert response.status_code == 200
data = response.json()
assert "meta" in data
assert "data" in data
assert data["data"]["message"] == "This is a placeholder for prospects/init."

def test_prospects_returns_list():
response = client.get("/prospects")
assert response.status_code == 200
data = response.json()
assert "meta" in data
assert "data" in data
assert isinstance(data["data"], list) or isinstance(data["data"], dict)

def test_prospects_unique_valid_fields():
# This test assumes at least one valid field exists in the prospects table, e.g., 'id'.
response = client.get("/prospects/unique?fields=id")
assert response.status_code == 200
data = response.json()
assert "meta" in data
assert "data" in data
assert isinstance(data["data"], dict)

def test_prospects_unique_invalid_field():
response = client.get("/prospects/unique?fields=notafield")
assert response.status_code == 200
data = response.json()
assert "meta" in data
assert "errors" in data
assert "notafield" in data["errors"]

def test_prospects_init_meta_keys():
response = client.get("/prospects/init")
meta = response.json()["meta"]
for key in ["severity", "title", "version", "base_url", "time"]:
assert key in meta
24 changes: 24 additions & 0 deletions tests/test_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
import pytest
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}

def test_health_meta_keys():
response = client.get("/health")
data = response.json()
if "meta" in data:
meta = data["meta"]
for key in ["severity", "title", "version", "base_url", "time"]:
assert key in meta
else:
# Legacy: no meta, just status
assert data == {"status": "ok"}
Loading