Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ FastAPI auto-generates interactive docs:
#### Notable Endpoints

- `GET /health` — Health check
- `GET/POST /prompt` — LLM prompt completion (formerly `/llm`)
- `GET /prompt` or `GET /prompts` — Prompt table metadata (`record_count`, `columns`)
- `POST /prompt` — LLM prompt completion (formerly `/llm`)
- `GET/POST /resend` — Send email via Resend API (see implementation in `app/utils/notify/resend.py`)
- `GET /prospects` — Paginated prospects
- `POST /prospects/process` — Bulk CSV ingestion
Expand Down
3 changes: 1 addition & 2 deletions app/api/prompt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Prompt Routes"""


from .prompt import router as prompt_router
from .linkedin import router as linkedin_router
from .drop import router as drop_router
from .empty import router as empty_router
12 changes: 5 additions & 7 deletions app/api/prompt/drop.py → app/api/prompt/empty.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import os

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends

from app.utils.api_key_auth import get_api_key
from app.utils.db import get_db_connection_direct
from app.utils.make_meta import make_meta

router = APIRouter()

# PATCH /prompt/drop: empties the prompt table
@router.patch("/prompt/drop")
def drop_prompt_table(api_key: str = Depends(get_api_key)) -> dict:
"""PATCH /prompt/drop: empties the prompt table."""
# PATCH /prompt/empty: empties the prompt table
@router.patch("/prompt/empty")
def empty_prompt_table(api_key: str = Depends(get_api_key)) -> dict:
"""PATCH /prompt/empty: empties the prompt table."""
try:
conn = get_db_connection_direct()
cur = conn.cursor()
Expand Down
51 changes: 17 additions & 34 deletions app/api/prompt/prompt.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
import os
from fastapi import APIRouter, HTTPException, Query, Request, Depends
from fastapi import APIRouter, HTTPException, Depends
from app.utils.make_meta import make_meta
from app.utils.db import get_db_connection_direct
from app.utils.api_key_auth import get_api_key

router = APIRouter()

@router.get("/prompt")
def get_prompt_records(
request: Request,
page: int = Query(1, ge=1, description="Page number (1-based)"),
page_size: int = Query(10, ge=1, le=100, description="Records per page"),
api_key: str = Depends(get_api_key)
) -> dict:
"""GET /prompt: Paginated list of prompt completions."""
@router.get("/prompts")
def get_prompt_table_metadata(api_key: str = Depends(get_api_key)) -> dict:
"""GET /prompt: Return prompt table metadata."""
try:
conn = get_db_connection_direct()
cur = conn.cursor()
offset = (page - 1) * page_size
cur.execute("SELECT COUNT(*) FROM prompt;")
count_row = cur.fetchone()
total = count_row[0] if count_row and count_row[0] is not None else 0
cur.execute("""
SELECT id, prompt, completion, duration, time, data, model
FROM prompt
ORDER BY id DESC
LIMIT %s OFFSET %s;
""", (page_size, offset))
records = [
{
"id": row[0],
"prompt": row[1],
"completion": row[2],
"duration": row[3],
"time": row[4].isoformat() if row[4] else None,
"data": row[5],
"model": row[6],
}
for row in cur.fetchall()
]
record_count = count_row[0] if count_row and count_row[0] is not None else 0
cur.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'prompt'
ORDER BY ordinal_position;
"""
)
columns = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()
meta = make_meta("success", f"Prompt {len(records)} records (page {page})")
meta = make_meta("success", "Prompt table metadata")
return {
"meta": meta,
"data": {
"page": page,
"page_size": page_size,
"total": total,
"pages": (total + page_size - 1) // page_size,
"data": records,
"record_count": record_count,
"columns": columns,
},
}
except Exception as e:
Expand Down
2 changes: 0 additions & 2 deletions app/api/queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from .routes.empty import router as empty_router

from .routes.get import router as get_router
from .routes.next import router as next_router

from .routes.create import router as create_router
from .routes.delete import router as delete_router
Expand All @@ -19,7 +18,6 @@
router.include_router(drop_router)
router.include_router(empty_router)
router.include_router(get_router)
router.include_router(next_router)
router.include_router(create_router)
router.include_router(delete_router)
router.include_router(linkedin_import_router.router)
Expand Down
59 changes: 50 additions & 9 deletions app/api/queue/csv/apollo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,61 @@
@router.post("/queue/csv/apollo")
def import_apollo_csv() -> dict:
"""POST /queue/csv/apollo: Import data from apollo.csv into the queue table (template)."""
csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/seed.csv")
csv_path = os.path.join(os.path.dirname(__file__), "../csv/apollo/sample.csv")
if not os.path.exists(csv_path):
raise HTTPException(status_code=404, detail="seed.csv not found")
raise HTTPException(status_code=404, detail="sample.csv not found")
try:
conn = get_db_connection_direct()
cursor = conn.cursor()
# TODO: Implement CSV parsing and DB insertion logic for Apollo format
# Example placeholder for batch import logic:
# with open(csv_path, newline='', encoding='utf-8') as csvfile:
# reader = csv.DictReader(csvfile)
# for row in reader:
# pass # Process each row
with open(csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
now = int(time.time())
batch = []
batch_size = 500
imported_count = 0
for row in reader:
if not any(row.values()):
continue
batch.append([
now, # updated
False, # hidden
now, # created
row.get('Email', ''), # email
row.get('Company Name', ''), # company
row.get('Title', ''), # job
row.get('Person Linkedin Url', ''), # linkedin
row.get('First Name', ''), # first_name
row.get('Last Name', ''), # last_name
row.get('Seniority', None), # seniority
row.get('Sub Departments', None), # department
row.get('Corporate Phone', None), # phone
row.get('Country', None), # country
None, # connected (not present, set None)
'apollo', # collection
'magento' # group
])
imported_count += 1
if len(batch) >= batch_size:
cursor.executemany(
'''INSERT INTO queue (
updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group"
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)''',
batch
)
batch = []
if batch:
cursor.executemany(
'''INSERT INTO queue (
updated, hidden, created, email, company, job, linkedin, first_name, last_name, seniority, department, phone, country, connected, collection, "group"
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)''',
batch
)
conn.commit()
conn.close()
return {"meta": make_meta("success", "Apollo CSV import template executed"), "imported": 0}
return {"meta": make_meta("success", f"Apollo CSV imported: {imported_count} records imported"), "imported": imported_count}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
File renamed without changes.
42 changes: 22 additions & 20 deletions app/api/queue/csv/linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,67 +10,69 @@
@router.post("/queue/csv/linkedin")
def import_linkedin_csv() -> dict:
"""POST /queue/csv/linkedin: Import data from linkedin.csv into the queue table, robust for large files."""
csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin.csv")
csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv")
if not os.path.exists(csv_path):
raise HTTPException(status_code=404, detail="linkedin.csv not found")
try:
conn = get_db_connection_direct()
cursor = conn.cursor()
with open(csv_path, newline='', encoding='utf-8') as csvfile:
# Find the header line dynamically
header_line = None
pre_data_lines = []
while True:
pos = csvfile.tell()
line = csvfile.readline()
if not line:
break
if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"):
if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Job,Connected On") or \
line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"):
header_line = line.strip()
break
pre_data_lines.append(line)
if not header_line:
raise HTTPException(status_code=400, detail="CSV header not found.")
# Use DictReader with the found header

fieldnames = header_line.split(",")
# Map 'Position' to 'Job' for backward compatibility
fieldnames = [fn if fn != 'Position' else 'Job' for fn in fieldnames]
reader = csv.DictReader(csvfile, fieldnames=fieldnames)
now = int(time.time())
batch = []
batch_size = 500
first_row = None
imported_count = 0
for row in reader:
# Skip any rows that are just blank or not data

if not any(row.values()):
continue
if first_row is None:
first_row = row.copy()
print("DEBUG: First parsed row from CSV:", first_row)
batch.append([
row.get('First Name'), # first_name
row.get('Last Name'), # last_name
row.get('URL'), # linkedin
row.get('Email Address'), # email
row.get('Company'), # company
row.get('Position'), # position
row.get('Connected On'), # connected_on
now, # created
now, # updated
False, # hidden
'prospects', # collection
'linkedin' # group
])
now, # updated
False, # hidden
now, # created
row.get('Email Address'), # email
row.get('Company'), # company
row.get('Job') or row.get('Position'),# job (support both)
row.get('Connected On'), # connected
'prospects', # collection
'linkedin', # group
row.get('First Name'), # first_name
row.get('Last Name'), # last_name
row.get('URL') # linkedin
])
imported_count += 1
if len(batch) >= batch_size:
cursor.executemany(
'''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group")
'''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
batch
)
batch = []
if batch:
cursor.executemany(
'''INSERT INTO queue (first_name, last_name, linkedin, email, company, position, connected_on, created, updated, hidden, collection, "group")
'''INSERT INTO queue (updated, hidden, created, email, company, job, connected, collection, "group", first_name, last_name, linkedin)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
batch
)
Expand Down
10 changes: 7 additions & 3 deletions app/api/queue/routes/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ def read_queue(
"data": {
"total": total_count,
"filtered": filtered_count,
"collections": collections,
"groups": groups,
"next": next_record
"filters": {
"collectionFilter": collection,
"groupFilter": group,
"collections": collections,
"groups": groups,
},
"next": next_record,
}
}
except Exception as e:
Expand Down
78 changes: 0 additions & 78 deletions app/api/queue/routes/next.py

This file was deleted.

Loading
Loading