Skip to content

amit-v07/TCP-Server

Repository files navigation

TCP Data Ingestion Server

A fully containerized, robust, and asynchronous TCP ingestion server built with Python (asyncio), paired with a FastAPI REST API, and backed by a PostgreSQL database.

This project was built to address three specific challenges:

  1. Dynamic Data Framing & Formatting: Extracting explicit data payloads via newline \n delimited streams, mapping the client IP address to the Device ID intelligently, and tracking exact timestamps.
  2. Flexible Connection Lifecycle: Gracefully supporting both persistent, long-running connections streaming multiple payloads over time, as well as short-lived connect-and-drop connections.
  3. High Scalability & Fault Tolerance: Employing an edge non-blocking asynchronous event loop capable of juggling thousands of connections. Critical database unavailability is handled seamlessly via an internal asyncio.Queue and background retry workers, guaranteeing zero data loss during DB downtimes.

🏗 Architecture Layers

  • TCP Server (tcp_server/server.py): Runs on 0.0.0.0:5000 (configurable via TCP_PORT). Accepts TCP sockets, reads payload bytes, validates the JSON framing, and pushes the records into an in-memory queue.
  • Database Writer (async worker): Continually pops the verified payloads from the memory queue and batches them into the PostgreSQL database. If the DB connection faults, it backs off and retries autonomously.
  • REST API (api/main.py): Runs on 0.0.0.0:8000. Exposes a Swagger-documented FastAPI service to query and filter the ingested records in real-time.
  • Database Engine (database/): Utilizes asyncpg combined with SQLAlchemy 2.0 async sessions for extremely fast non-blocking reads/writes.

🚀 Getting Started

The entire architecture is packaged in Docker so you don't need any local Python virtual environments or database installations.

1. Set Up Environment Variables

Before running the project, create your secret .env file based on the provided template:

# Windows (PowerShell)
Copy-Item .env.example .env
# Linux/Mac
cp .env.example .env

(You can edit .env if you wish to change the default database passwords)

2. Bring the System Online

Start the PostgreSQL database, the TCP Server, and the REST API detached in the background:

docker-compose up --build -d

2. Verify Everything is Running

docker ps

You should see three containers running: tcpserver-api, tcpserver-tcp_server, and tcpserver-db.


🧪 How to Test Data Ingestion

Method A: Use the Python Test Client (Easiest)

We've included an isolated client script to quickly fire automated payloads into the TCP socket and then pull them back from the API to prove ingestion works.

python test_client.py

Method B: Send a Raw Manual Packet (Windows CMD/PowerShell)

You can manually prove the socket works natively relying on Windows PowerShell:

$tcp = New-Object System.Net.Sockets.TcpClient("127.0.0.1", 5000)
$stream = $tcp.GetStream()
$writer = New-Object System.IO.StreamWriter($stream)

# Send a raw JSON packet delimited by a newline
$writer.WriteLine('{"card_id": "RAW-TCP-TEST", "timestamp": "2026-03-31T12:00:00Z"}')
$writer.Flush()
$tcp.Close()

Method C: Mac/Linux Netcat

echo '{"card_id": "NETCAT-TEST"}' | nc localhost 5000

🔎 How to View the Data

You can explore your tracked events natively in the browser via the Swagger Interface:


🛡 Demonstrating Fault Tolerance

A core requirement of this exercise is proving how the system handles a crashed database. You can simulate this live!

  1. Stop the database container temporarily:
    docker stop tcpserver-db-1
  2. Send TCP messages using python test_client.py or the manual powershell command. (You'll notice the TCP connections succeed perfectly! The messages are queued safely in RAM.)
  3. Bring the database back online:
    docker start tcpserver-db-1
  4. Check your API at http://localhost:8000/data. Within seconds, the background worker will have flushed all the "offline" records natively into the recovered database!

📌 Assumptions Made

As requested, here are the key assumptions made during this implementation:

  1. Device ID vs Client Address: The prompt mentioned "client address (Device ID)". We assumed that the most secure and accurate way to identify a device is by natively extracting its Layer 4 IPv4 string from the TCP socket context (writer.get_extra_info('peername')). However, if the client sends a device_id in their payload, we prioritize that, using the IP address as a robust fallback.
  2. Data Streaming Frame Format: Since the exact stream structure was open to design, we assumed the packets are encoded as Newline Delimited JSON (\n). This is an industry-standard mechanism for safely delineating streaming bytes without requiring complex binary buffer headers.
  3. Queue Strategy: We assumed a bounded queue (maxsize=100000) was preferred to prevent silent Out-Of-Memory (OOM) fatal crashes in the highly unlikely event the database is permanently destroyed or offline for a very long period.
  4. Database Insertion Bottlenecks: We assumed high-frequency event streaming would immediately transaction-lock the database if inserted row-by-row. We engineered the worker to aggressively pull batch_size=500 items off the queue and commit them in a single massive ACID transaction to drastically increase throughput.
  5. Slowloris Timeout Protections: We assumed malicious or broken devices might connect and drip data without a newline forever. We natively wrapped the stream reader in an asyncio.wait_for(timeout=30.0) block to aggressively free resource slots from idle socket connections.
  6. Graceful Shutdown: We assumed a docker stop command would forcefully delete pending objects sitting in the queue RAM. The system now securely traps SIGTERM/SIGINT variables, safely clamping the TCP connections closed, and forcing Docker to wait until every object in the queue safely flashes into PostgreSQL before actually killing the Python process.
  7. REST API Query Optimization (B-Tree Indexing): We assumed that as the dataset reaches millions of points, our FastAPI querying logic would drastically slow down. We explicitly implemented index=True on both the card_id and device_id SQLAlchemy models, building B-Tree indexes in Postgres to ensure our API lookups remain O(log N) instantaneous regardless of table size.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors