From 69b44ef2ef7195073ef570ca118c052a493080bb Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Mon, 20 Apr 2026 13:58:01 +0100 Subject: [PATCH 1/5] Remove queue/next route and update get meta Delete the standalone /queue/next route (remove app/api/queue/routes/next.py and unregister its router in app/api/queue/__init__.py). Also adjust the GET queue endpoint's meta message from "Queue table info" to "Next in queue" to better reflect the returned data. --- app/api/queue/__init__.py | 2 - app/api/queue/routes/get.py | 2 +- app/api/queue/routes/next.py | 78 ------------------------------------ 3 files changed, 1 insertion(+), 81 deletions(-) delete mode 100644 app/api/queue/routes/next.py diff --git a/app/api/queue/__init__.py b/app/api/queue/__init__.py index 9705e10..57782d6 100644 --- a/app/api/queue/__init__.py +++ b/app/api/queue/__init__.py @@ -6,7 +6,6 @@ from .routes.empty import router as empty_router from .routes.get import router as get_router -from .routes.next import router as next_router from .routes.create import router as create_router from .routes.delete import router as delete_router @@ -19,7 +18,6 @@ router.include_router(drop_router) router.include_router(empty_router) router.include_router(get_router) -router.include_router(next_router) router.include_router(create_router) router.include_router(delete_router) router.include_router(linkedin_import_router.router) diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 9ad55c5..07f8338 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -53,7 +53,7 @@ def read_queue( conn.close() return { - "meta": make_meta("success", "Queue table info"), + "meta": make_meta("success", "Next in queue"), "data": { "total": total_count, "filtered": filtered_count, diff --git a/app/api/queue/routes/next.py b/app/api/queue/routes/next.py deleted file mode 100644 index add3078..0000000 --- a/app/api/queue/routes/next.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -from fastapi import APIRouter, HTTPException, Query -from app.utils.make_meta import make_meta -from app.utils.db import get_db_connection_direct - -router = APIRouter() - - -# Route: /queue/next?collection=prospects&group=linkedin -@router.get("/queue/next") -def get_next_queue( - collection: str = Query(None, description="Filter by collection name"), - group: str = Query(None, description="Filter by group name") -) -> dict: - """Return the next queue record filtered by collection/group, ordered by latest updated.""" - try: - conn = get_db_connection_direct() - cursor = conn.cursor() - - # Build query with optional filters - base_query = "SELECT * FROM queue" - filters = [] - params = [] - if collection: - filters.append("collection = %s") - params.append(collection) - if group: - filters.append('"group" = %s') - params.append(group) - where_clause = (" WHERE " + " AND ".join(filters)) if filters else "" - - # 1. Get the next record - query = base_query + where_clause + " ORDER BY updated DESC LIMIT 1;" - cursor.execute(query, params) - row = cursor.fetchone() - columns = [desc[0] for desc in cursor.description] if cursor.description else [] - record = dict(zip(columns, row)) if row and columns else None - - # 2. Get count of records matching filters - count_query = "SELECT COUNT(*) FROM queue" + where_clause + ";" - cursor.execute(count_query, params) - filtered_row = cursor.fetchone() - filtered_count = filtered_row[0] if filtered_row else 0 - - # 3. Get total count - cursor.execute("SELECT COUNT(*) FROM queue;") - total_row = cursor.fetchone() - total_count = total_row[0] if total_row else 0 - - # 4. Get table schema - cursor.execute("SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = 'queue';") - schema = [ - {"name": row[0], "type": row[1], "nullable": row[2]} for row in cursor.fetchall() - ] - - conn.close() - - # Build a dynamic title with filters - filter_labels = [] - if collection: - filter_labels.append(f"collection='{collection}'") - if group: - filter_labels.append(f"group='{group}'") - filter_str = f" (filtered by {', '.join(filter_labels)})" if filter_labels else "" - title = f"Next queue record found{filter_str}" if record else "No queue record to show" - - return { - "meta": make_meta("success" if record else "info", title), - "data": { - "record": record, - "filtered_count": filtered_count, - "total_count": total_count, - "schema": schema, - "message": None if record else "Nothing to show for the given filters." - } - } - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) From 25ab979d20f415c204d2863dc1f0537426a53a18 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Mon, 20 Apr 2026 17:05:26 +0100 Subject: [PATCH 2/5] LinkedIn CSV: support Job field Switch linkedin import to use the sample CSV and handle both 'Position' and 'Job' headers by normalizing 'Position' to 'Job'. Map row values to job (row.get('Job') or row.get('Position')) and update INSERTs to use the job column. Update the queue GET route meta/title and include an "in_queue" count in the response payload. Update tests to expect the new response keys and meta title. --- app/api/queue/csv/linkedin.py | 42 ++++++++++++++++++----------------- app/api/queue/routes/get.py | 5 +++-- tests/test_queue.py | 4 +++- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/app/api/queue/csv/linkedin.py b/app/api/queue/csv/linkedin.py index 77759ed..e376fe5 100644 --- a/app/api/queue/csv/linkedin.py +++ b/app/api/queue/csv/linkedin.py @@ -10,14 +10,13 @@ @router.post("/queue/csv/linkedin") def import_linkedin_csv() -> dict: """POST /queue/csv/linkedin: Import data from linkedin.csv into the queue table, robust for large files.""" - csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin.csv") + csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv") if not os.path.exists(csv_path): raise HTTPException(status_code=404, detail="linkedin.csv not found") try: conn = get_db_connection_direct() cursor = conn.cursor() with open(csv_path, newline='', encoding='utf-8') as csvfile: - # Find the header line dynamically header_line = None pre_data_lines = [] while True: @@ -25,14 +24,17 @@ def import_linkedin_csv() -> dict: line = csvfile.readline() if not line: break - if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"): + if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Job,Connected On") or \ + line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"): header_line = line.strip() break pre_data_lines.append(line) if not header_line: raise HTTPException(status_code=400, detail="CSV header not found.") - # Use DictReader with the found header + fieldnames = header_line.split(",") + # Map 'Position' to 'Job' for backward compatibility + fieldnames = [fn if fn != 'Position' else 'Job' for fn in fieldnames] reader = csv.DictReader(csvfile, fieldnames=fieldnames) now = int(time.time()) batch = [] @@ -40,37 +42,37 @@ def import_linkedin_csv() -> dict: first_row = None imported_count = 0 for row in reader: - # Skip any rows that are just blank or not data + if not any(row.values()): continue if first_row is None: first_row = row.copy() print("DEBUG: First parsed row from CSV:", first_row) batch.append([ - row.get('First Name'), # first_name - row.get('Last Name'), # last_name - row.get('URL'), # linkedin - row.get('Email Address'), # email - row.get('Company'), # company - row.get('Position'), # position - row.get('Connected On'), # connected_on - now, # created - now, # updated - False, # hidden - 'prospects', # collection - 'linkedin' # group - ]) + row.get('First Name'), # first_name + row.get('Last Name'), # last_name + row.get('URL'), # linkedin + row.get('Email Address'), # email + row.get('Company'), # company + row.get('Job') or row.get('Position'),# job (support both) + row.get('Connected On'), # connected_on + now, # created + now, # updated + False, # hidden + 'prospects', # collection + 'linkedin' # group + ]) imported_count += 1 if len(batch) >= batch_size: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (first_name, last_name, linkedin, email, company, job, connected_on, created, updated, hidden, collection, "group") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) batch = [] if batch: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (first_name, last_name, linkedin, email, company, job, connected_on, created, updated, hidden, collection, "group") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 07f8338..e097e3f 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -53,13 +53,14 @@ def read_queue( conn.close() return { - "meta": make_meta("success", "Next in queue"), + "meta": make_meta("success", "Queue table info"), "data": { + "in_queue": total_count, "total": total_count, "filtered": filtered_count, "collections": collections, "groups": groups, - "next": next_record + "next": next_record, } } except Exception as e: diff --git a/tests/test_queue.py b/tests/test_queue.py index 574efe8..4f2cd33 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -15,7 +15,9 @@ def test_get_queue(): assert "in_queue" in queue_data assert "collections" in queue_data assert "groups" in queue_data - assert "example" in queue_data + assert "filtered" in queue_data + assert "total" in queue_data + assert "next" in queue_data meta = data["meta"] assert meta["severity"] == "success" assert meta["title"] == "Queue table info" From bf3955c8281364ddc8e7655913da1e12b5837282 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Mon, 20 Apr 2026 18:27:28 +0100 Subject: [PATCH 3/5] Import Apollo CSV and fix LinkedIn import Rename apollo seed.csv to sample.csv and implement actual CSV parsing + batched DB inserts for Apollo imports (batch_size=500). Map Apollo columns into queue fields, set collection/group, track imported_count, and update error messages to reference sample.csv. Adjust LinkedIn import row layout and SQL column order to include updated/hidden/created fields and place name/linkedin columns in the correct positions for executemany inserts. --- app/api/queue/csv/apollo.py | 59 ++++++++++++++++--- .../queue/csv/apollo/{seed.csv => sample.csv} | 0 app/api/queue/csv/linkedin.py | 20 +++---- 3 files changed, 60 insertions(+), 19 deletions(-) rename app/api/queue/csv/apollo/{seed.csv => sample.csv} (100%) diff --git a/app/api/queue/csv/apollo.py b/app/api/queue/csv/apollo.py index 65c7349..39e9069 100644 --- a/app/api/queue/csv/apollo.py +++ b/app/api/queue/csv/apollo.py @@ -10,20 +10,61 @@ @router.post("/queue/csv/apollo") def import_apollo_csv() -> dict: """POST /queue/csv/apollo: Import data from apollo.csv into the queue table (template).""" - csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/seed.csv") + csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/sample.csv") if not os.path.exists(csv_path): - raise HTTPException(status_code=404, detail="seed.csv not found") + raise HTTPException(status_code=404, detail="sample.csv not found") try: conn = get_db_connection_direct() cursor = conn.cursor() - # TODO: Implement CSV parsing and DB insertion logic for Apollo format - # Example placeholder for batch import logic: - # with open(csv_path, newline='', encoding='utf-8') as csvfile: - # reader = csv.DictReader(csvfile) - # for row in reader: - # pass # Process each row + with open(csv_path, newline='', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + now = int(time.time()) + batch = [] + batch_size = 500 + imported_count = 0 + for row in reader: + if not any(row.values()): + continue + batch.append([ + now, # updated + False, # hidden + now, # created + row.get('Email', ''), # email + row.get('Company Name', ''), # company + row.get('Title', ''), # job + row.get('Person Linkedin Url', ''), # linkedin + row.get('First Name', ''), # first_name + row.get('Last Name', ''), # last_name + row.get('Seniority', None), # seniority + row.get('Sub Departments', None), # department + row.get('Corporate Phone', None), # phone + row.get('Country', None), # country + None, # connected (not present, set None) + 'apollo', # collection + 'magento' # group + ]) + imported_count += 1 + if len(batch) >= batch_size: + cursor.executemany( + '''INSERT INTO queue ( + updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group" + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + )''', + batch + ) + batch = [] + if batch: + cursor.executemany( + '''INSERT INTO queue ( + updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group" + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + )''', + batch + ) conn.commit() conn.close() - return {"meta": make_meta("success", "Apollo CSV import template executed"), "imported": 0} + return {"meta": make_meta("success", f"Apollo CSV imported: {imported_count} records imported"), "imported": imported_count} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/queue/csv/apollo/seed.csv b/app/api/queue/csv/apollo/sample.csv similarity index 100% rename from app/api/queue/csv/apollo/seed.csv rename to app/api/queue/csv/apollo/sample.csv diff --git a/app/api/queue/csv/linkedin.py b/app/api/queue/csv/linkedin.py index e376fe5..5d1c43a 100644 --- a/app/api/queue/csv/linkedin.py +++ b/app/api/queue/csv/linkedin.py @@ -49,30 +49,30 @@ def import_linkedin_csv() -> dict: first_row = row.copy() print("DEBUG: First parsed row from CSV:", first_row) batch.append([ - row.get('First Name'), # first_name - row.get('Last Name'), # last_name - row.get('URL'), # linkedin + now, # updated + False, # hidden + now, # created row.get('Email Address'), # email row.get('Company'), # company row.get('Job') or row.get('Position'),# job (support both) - row.get('Connected On'), # connected_on - now, # created - now, # updated - False, # hidden + row.get('Connected On'), # connected 'prospects', # collection - 'linkedin' # group + 'linkedin', # group + row.get('First Name'), # first_name + row.get('Last Name'), # last_name + row.get('URL') # linkedin ]) imported_count += 1 if len(batch) >= batch_size: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, job, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) batch = [] if batch: cursor.executemany( - '''INSERT INTO queue (first_name, last_name, linkedin, email, company, job, connected_on, created, updated, hidden, collection, "group") + '''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', batch ) From ff27f2904df40d54eb7359131d72af8be3ce84df Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Mon, 20 Apr 2026 19:16:51 +0100 Subject: [PATCH 4/5] Remove redundant in_queue field Remove the 'in_queue' key from the read_queue response payload because it duplicated the 'total' value (total_count). This eliminates a redundant field and reduces confusion in the API response. --- app/api/queue/routes/get.py | 1 - 1 file changed, 1 deletion(-) diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index e097e3f..5d2034c 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -55,7 +55,6 @@ def read_queue( return { "meta": make_meta("success", "Queue table info"), "data": { - "in_queue": total_count, "total": total_count, "filtered": filtered_count, "collections": collections, From 78eadf494424198d0763c329b5a47068aee4fee2 Mon Sep 17 00:00:00 2001 From: Wei Zang Date: Wed, 22 Apr 2026 13:47:19 +0100 Subject: [PATCH 5/5] Rename drop endpoint to empty and nest queue filters Rename prompt/drop to prompt/empty and update router imports and handler names for clarity. Files changed: app/api/prompt/drop.py -> app/api/prompt/empty.py (endpoint path /prompt/empty, handler empty_prompt_table, removed unused os import), app/api/prompt/__init__.py and app/api/routes.py (update imports and router include names). Also adjust queue GET response shape (app/api/queue/routes/get.py) to group collection/group values and collections/groups under a new "filters" object with keys collectionFilter and groupFilter. Clients should be updated to use the new endpoint and the revised queue response structure. --- README.md | 3 +- app/api/prompt/__init__.py | 3 +- app/api/prompt/{drop.py => empty.py} | 12 +++---- app/api/prompt/prompt.py | 51 ++++++++++------------------ app/api/queue/routes/get.py | 8 +++-- app/api/routes.py | 4 +-- 6 files changed, 33 insertions(+), 48 deletions(-) rename app/api/prompt/{drop.py => empty.py} (68%) diff --git a/README.md b/README.md index ed61994..eac66c7 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,8 @@ FastAPI auto-generates interactive docs: #### Notable Endpoints - `GET /health` — Health check -- `GET/POST /prompt` — LLM prompt completion (formerly `/llm`) +- `GET /prompt` or `GET /prompts` — Prompt table metadata (`record_count`, `columns`) +- `POST /prompt` — LLM prompt completion (formerly `/llm`) - `GET/POST /resend` — Send email via Resend API (see implementation in `app/utils/notify/resend.py`) - `GET /prospects` — Paginated prospects - `POST /prospects/process` — Bulk CSV ingestion diff --git a/app/api/prompt/__init__.py b/app/api/prompt/__init__.py index 76637df..b35910c 100644 --- a/app/api/prompt/__init__.py +++ b/app/api/prompt/__init__.py @@ -1,6 +1,5 @@ """Prompt Routes""" - from .prompt import router as prompt_router from .linkedin import router as linkedin_router -from .drop import router as drop_router +from .empty import router as empty_router diff --git a/app/api/prompt/drop.py b/app/api/prompt/empty.py similarity index 68% rename from app/api/prompt/drop.py rename to app/api/prompt/empty.py index f016e33..a3a698c 100644 --- a/app/api/prompt/drop.py +++ b/app/api/prompt/empty.py @@ -1,6 +1,4 @@ -import os - -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends from app.utils.api_key_auth import get_api_key from app.utils.db import get_db_connection_direct @@ -8,10 +6,10 @@ router = APIRouter() -# PATCH /prompt/drop: empties the prompt table -@router.patch("/prompt/drop") -def drop_prompt_table(api_key: str = Depends(get_api_key)) -> dict: - """PATCH /prompt/drop: empties the prompt table.""" +# PATCH /prompt/empty: empties the prompt table +@router.patch("/prompt/empty") +def empty_prompt_table(api_key: str = Depends(get_api_key)) -> dict: + """PATCH /prompt/empty: empties the prompt table.""" try: conn = get_db_connection_direct() cur = conn.cursor() diff --git a/app/api/prompt/prompt.py b/app/api/prompt/prompt.py index 90a8117..9e5f7a4 100644 --- a/app/api/prompt/prompt.py +++ b/app/api/prompt/prompt.py @@ -1,5 +1,5 @@ import os -from fastapi import APIRouter, HTTPException, Query, Request, Depends +from fastapi import APIRouter, HTTPException, Depends from app.utils.make_meta import make_meta from app.utils.db import get_db_connection_direct from app.utils.api_key_auth import get_api_key @@ -7,49 +7,32 @@ router = APIRouter() @router.get("/prompt") -def get_prompt_records( - request: Request, - page: int = Query(1, ge=1, description="Page number (1-based)"), - page_size: int = Query(10, ge=1, le=100, description="Records per page"), - api_key: str = Depends(get_api_key) -) -> dict: - """GET /prompt: Paginated list of prompt completions.""" +@router.get("/prompts") +def get_prompt_table_metadata(api_key: str = Depends(get_api_key)) -> dict: + """GET /prompt: Return prompt table metadata.""" try: conn = get_db_connection_direct() cur = conn.cursor() - offset = (page - 1) * page_size cur.execute("SELECT COUNT(*) FROM prompt;") count_row = cur.fetchone() - total = count_row[0] if count_row and count_row[0] is not None else 0 - cur.execute(""" - SELECT id, prompt, completion, duration, time, data, model - FROM prompt - ORDER BY id DESC - LIMIT %s OFFSET %s; - """, (page_size, offset)) - records = [ - { - "id": row[0], - "prompt": row[1], - "completion": row[2], - "duration": row[3], - "time": row[4].isoformat() if row[4] else None, - "data": row[5], - "model": row[6], - } - for row in cur.fetchall() - ] + record_count = count_row[0] if count_row and count_row[0] is not None else 0 + cur.execute( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = 'prompt' + ORDER BY ordinal_position; + """ + ) + columns = [row[0] for row in cur.fetchall()] cur.close() conn.close() - meta = make_meta("success", f"Prompt {len(records)} records (page {page})") + meta = make_meta("success", "Prompt table metadata") return { "meta": meta, "data": { - "page": page, - "page_size": page_size, - "total": total, - "pages": (total + page_size - 1) // page_size, - "data": records, + "record_count": record_count, + "columns": columns, }, } except Exception as e: diff --git a/app/api/queue/routes/get.py b/app/api/queue/routes/get.py index 5d2034c..5f08743 100644 --- a/app/api/queue/routes/get.py +++ b/app/api/queue/routes/get.py @@ -57,8 +57,12 @@ def read_queue( "data": { "total": total_count, "filtered": filtered_count, - "collections": collections, - "groups": groups, + "filters": { + "collectionFilter": collection, + "groupFilter": group, + "collections": collections, + "groups": groups, + }, "next": next_record, } } diff --git a/app/api/routes.py b/app/api/routes.py index b10e210..e933002 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -10,7 +10,7 @@ from app.utils.notify.resend import router as resend_router from app.api.prompt.prompt import router as prompt_router from app.api.prompt.linkedin import router as linkedin_router -from app.api.prompt.drop import router as drop_router +from app.api.prompt.empty import router as prompts_empty_router from app.api.prospects.prospects import router as prospects_router from app.api.orders.orders import router as orders_router from app.api.queue import router as queue_router @@ -20,7 +20,7 @@ router.include_router(health_router) router.include_router(prompt_router) router.include_router(linkedin_router) -router.include_router(drop_router) +router.include_router(prompts_empty_router) router.include_router(prospects_router) router.include_router(orders_router) router.include_router(queue_router) \ No newline at end of file