# Trip Ingestion Project - Testing Mandatory Features
This notebook documents and tests all **mandatory features** of the data ingestion project.
Each section corresponds to one feature requirement from the assignment.

In [None]:
# trip_ingestion_test.ipynb

# ---
# # Trip Ingestion Project — Test Notebook
# This notebook documents and tests all mandatory features of the project:
# 1. Upload CSV to FastAPI
# 2. Track ingestion job status (via WebSocket)
# 3. Validate data stored in Postgres
# 4. Run analytics queries (grouping + weekly averages)
# 5. Show scalability proof of concept
# ---

import os
import csv
import requests
import websockets
import asyncio
import json
import pandas as pd
from dotenv import load_dotenv

BASE_URL = "http://localhost:8000"   # FastAPI service

# Load environment variables
load_dotenv()

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "tripsdb")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS", "postgres")


## 1. Health check API

In [None]:
import requests

r = requests.get(f"{BASE_URL}/health")
print(r.json())

## 2. Automated Process to Ingest and Store the Data
**Goal:** Upload a CSV file via API and store it in Postgres.


**Test:** Send a request to the API and check that rows appear in the `trips` table.

In [None]:
import requests

url = "http://localhost:8000/ingest"  # your API URL
file_path = "../source_data/trips.csv"

with open(file_path, "rb") as f:
    response = requests.post(url, files={"file": f})

print(response.status_code)
print(response.json())


Expected output:

{'job_id': "uuid-string",
 'status': 'running',
 'message': 'Started ingestion: trips.csv'}

## 3. Track ingestion status via WebSocket // Inform User About Status of Ingestion Without Polling
**Goal:** Use WebSockets to receive job status in real-time.


**Test:** Connect to `/ws/{job_id}` endpoint.

In [None]:
async def track_status(job_id):
    uri = f"ws://localhost:8000/ws/{job_id}"
    async with websockets.connect(uri) as ws:
        while True:
            msg = await ws.recv()
            data = json.loads(msg)
            print(data)
            if data["status"].startswith("completed") or data["status"].startswith("failed"):
                break

job_id = resp.json()["job_id"]
print(f"Tracking status for job_id: {job_id}")
await track_status(job_id)


In [None]:
import geohash2
print(geohash2.encode(4.6, -74.1, precision=5))

## 4. Validate stored data in Postgres

In [None]:

# Check DB row count (using SQLAlchemy)
from sqlalchemy import create_engine, text

engine = create_engine("postgresql://postgres:postgres@localhost:5432/tripsdb")
with engine.connect() as conn:
    result = conn.execute(text("SELECT count(*) FROM trips;"))
    print("Trips loaded:", result.scalar())

## 2. Analytics — Grouping trips by city // Group Trips with Similar Origin, Destination, and Time of Day
**Goal:** Trips are grouped by geohash of origin/destination and time bucket.

**Test:** Run a query grouping trips and confirm grouping works.

In [None]:
# Check DB row count (using SQLAlchemy)
from sqlalchemy import create_engine, text
sql = """
SELECT region, COUNT(*) AS trip_count
FROM trips
GROUP BY region
ORDER BY trip_count DESC;
"""

engine = create_engine("postgresql://postgres:postgres@localhost:5432/tripsdb")
with engine.connect() as conn:
    df = pd.read_sql(sql, conn)
df


In [None]:
query = '''
SELECT origin_geohash, dest_geohash, tod_bucket, COUNT(*) AS trips
FROM trip_groups
GROUP BY 1,2,3
ORDER BY trips DESC
LIMIT 10;
'''
with engine.connect() as conn:
    print(conn.execute(text(query)).fetchall())

## 6. Analytics — Weekly averages inside bounding box // Weekly Average Number of Trips for an Area
**Goal:** Calculate weekly averages within a bounding box.

**Test:** Use PostGIS query with bounding box filter.

In [None]:
query = '''
WITH weekly AS (
  SELECT date_trunc('week', trip_ts) wk, count(*) c
  FROM trips
  WHERE ST_Within(origin, ST_MakeEnvelope(-74.2,4.5,-74.0,4.8,4326))
  GROUP BY 1
)
SELECT avg(c) AS weekly_avg FROM weekly;
'''
with engine.connect() as conn:
    print(conn.execute(text(query)).fetchall())

## 4. Inform User About Status of Ingestion Without Polling
**Goal:** Use WebSockets to receive job status in real-time.


**Test:** Connect to `/ws/{job_id}` endpoint.

In [None]:
import websockets, asyncio, json

async def test_ws(job_id):
    uri = f"ws://localhost:8000/ws/{job_id}"
    async with websockets.connect(uri) as websocket:
        for _ in range(5):
            msg = await websocket.recv()
            print("Received:", json.loads(msg))

# Replace with a valid job_id returned from ingestion
#job_id = "REPLACE_WITH_JOB_ID"
#job_id = resp.json()["job_id"]
asyncio.run(test_ws(job_id))

## 5. Scalability Proof to 100M Entries
**Goal:** Demonstrate ingestion and query performance at scale.


**Test:** Generate synthetic trips and bulk load them into Postgres, then measure throughput.

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Generate 1 million synthetic rows (small test, extrapolate to 100M)
n = 10**6
base_time = datetime(2025,1,1)
data = {
    "trip_ts": [base_time + timedelta(seconds=i*60) for i in range(n)],
    "city": ["testcity"]*n,
    "origin_lat": np.random.uniform(4.5, 4.8, n),
    "origin_lon": np.random.uniform(-74.2, -74.0, n),
    "dest_lat": np.random.uniform(4.5, 4.8, n),
    "dest_lon": np.random.uniform(-74.2, -74.0, n),
}
df = pd.DataFrame(data)
df.to_csv("synthetic_trips.csv", index=False)
print("Synthetic CSV generated with", len(df), "rows")

In [None]:
params = {"region": "Prague"}
response = requests.get(f"{BASE_URL}/trips/weekly_average/", params=params)
print(response.json())

Next step: Use Postgres `COPY` command or SQLAlchemy bulk insert to load this file.
Then run `EXPLAIN ANALYZE` queries to measure performance and document results here.