# Data Ingestion to StreamCI

## 📡 Set URL, Request Headers, and Authentication Information

Before making a request to **StreamCI**, we need to define the basic configuration:

- The **API endpoint** we’re targeting is **`/data`**, used for sending sensor records (e.g., via `insert`, `bulkload`, `upload`).
- The **HTTP headers** specify that we’re sending data in `application/json` format.
- The **authentication object** includes the `target` user, authentication type, and a `secret_key`.

We'll use standard Python libraries such as `requests`, `json`, `datetime`, and `random` — these will also support later steps involving simulation and visualization.


In [None]:
import requests       # For sending HTTP POST requests
import json           # To convert Python dictionaries to JSON
import random         # To generate random sensor data
import time           # Used to control timing in the data simulation loop at the end
from datetime import datetime, timezone  # For precise UTC timestamps

# The target URL where the data will be sent
URL = "https://api.streamci.org/data"

# Headers to indicate we're sending JSON data in the request body
HEADERS = {
    "Content-Type": "application/json"
}

# Authentication information required by the API
# This will be included in the payload of the request
AUTH_INFO = {
    "target": "userXX",
    "authtype": "secret",
    "secret_key": "<password>"
}

---
## 🚀 Construct and Send a JSON Payload

Now that we've set up our URL, headers, and authentication, we'll construct a sample JSON payload and send it to StreamCI using a POST request.

This payload includes:
- The `auth` block with credentials.
- A `request` block with:
  - `method`: specifying the action to perform (e.g., `insert`).
  - `data`: a sensor reading with a timestamp, sensor ID, and random values.

We use Python’s `random` module to simulate sensor values and `requests.post()` to send the data.

In [None]:
# Create a JSON payload that contains:
# - Auth info for the API
# - A request object with the method "insert"
# - Data for a simulated sensor reading

payload = {
    "auth": AUTH_INFO,
    "request": {
        "method": "insert",
        "data": {
            "time": {"$date": "2025-08-04T10:00:00.001Z"},  # ISO 8601 UTC timestamp (under the $date key)
            "deviceID": "sensor1",                          # Simulated sensor ID
            "val1": random.randint(0, 50),                  # Random integer value (0–50)
            "val2": round(random.uniform(0, 25.0), 2),      # Random float value (0–25.0), 2 decimal places
        }
    }
}

# Send the payload as a JSON POST request to the server
response = requests.post(URL, json=payload, headers=HEADERS)

# Print the server’s response (parsed from JSON)
print("Response:", response.json())

---
## 📚 Bulkload Multiple Sensor Records with One API Request

Instead of sending one record at a time, we can use a `bulkload` method to send a **batch of data points** in a single request. This is much more efficient for uploading time-series or sensor data.

Each record in the `data` list includes:
- A timestamp in ISO 8601 UTC format (under the `$date` key)
- A sensor ID
- Randomly generated values for `val1` and `val2` to simulate realistic sensor data

This technique is especially useful when backfilling historical data or submitting multiple readings at once.

> ⚠️ **Note:**  
> The `bulkload` method is best suited for **small batches** of sensor data — such as inserting a few records at once or backfilling recent values.
>
> For **larger files** (e.g., NDJSON, CSV) or **streamed uploads**, use the dedicated `"method": "upload"` shown in the next block. It is optimized to handle larger payloads efficiently, without exhausting memory or blocking the server.


In [None]:
# Create a payload for bulk loading multiple sensor readings
payload = {
    "auth": AUTH_INFO,   # Use the same auth info defined earlier
    "request": {
        "method": "bulkload",  # API method for bulk data submission
        "datatype": "json",    # Indicate that the data format is JSON
        "data": [{
            "time": {"$date": "2025-08-04T10:01:00.001Z"},
            "deviceID": "sensor1",
            "val1": random.randint(0, 50),
            "val2": round(random.uniform(0, 25.0), 2),
        },{
            "time": {"$date": "2025-08-04T10:02:00.001Z"},
            "deviceID": "sensor2",
            "val1": random.randint(40, 80),
            "val2": round(random.uniform(20, 75.0), 2),
        },{
            "time": {"$date": "2025-08-04T10:03:00.001Z"},
            "deviceID": "sensor3",
            "val1": random.randint(70, 100.0),
            "val2": round(random.uniform(70, 100.0), 2),
        },]
    }
}

# Send the POST request with the bulk data
response = requests.post(URL, json=payload, headers=HEADERS)

# Print the parsed response from the server
print("Response:", response.json())

---
## 📁 File Upload – Step 1: Preview the NDJSON File

Before performing a file upload, it’s helpful to preview the contents of the NDJSON file.

**NDJSON (Newline-Delimited JSON)** is a lightweight format where each line is a standalone JSON object — ideal for streaming data without loading the entire file into memory.

In this step, we:
- Open the file
- Print the first 5 records for inspection
- Count the total number of records


In [None]:
file_path = "data.ndjson"
count = 0

# Open and scan the file
with open(file_path, 'r') as f:
    for i, line in enumerate(f):
        if line.strip():  # Skip empty lines
            record = json.loads(line)
            if count < 5:
                print(record)  # Preview the first few entries
            count += 1

print(f"\nTotal records: {count}")

## 📁 File Upload – Step 2: Upload File via `multipart/form-data`

This step sends the NDJSON file to the server using a **`POST` request with `multipart/form-data`**, which is commonly used for file uploads.

We attach:
- the **file** as a binary stream,
- the **auth object** (JSON-encoded),
- the **request payload** (specifying `upload` method and data type).

This is appropriate for **streaming files** where the server can process data incrementally without exhausting memory.


In [None]:
# Send multipart/form-data with file, auth, and request
with open(file_path, "rb") as f:
    files = {
        "file": (file_path.split("/")[-1], f, "application/octet-stream") # Upload binary stream
    }
    
    payload = {
        "auth": json.dumps(AUTH_INFO),
        "request": json.dumps({
            "method": "upload", 
            "datatype": "json"
        })
    }

    try:
        # Send multipart POST request
        response = requests.post(URL, files=files, data=payload, verify=True)
        print("Response:", response.json())
    except requests.exceptions.RequestException as e:
        print("Request failed:", e)

---
## ⏱ Streaming Simulation: Let’s Keep Ingesting Data

The code blocks below mimic a sensor sending data every few seconds, using the `insert` method repeatedly in a loop.



#### 🧠 `generate_random_data()` – Create One Record at a Time

This function returns a single synthetic sensor reading with the following format:

- `"deviceID"` is randomly selected from `sensor1`, `sensor2`, or `sensor3`
- `"val1"` and `"val2"` vary based on the sensor type
- `"time"` is set to the current UTC timestamp using the `"$date"` format



#### 🔄 Send a New Record Every Few Seconds

The loop below sends 1,200 records (one every 3 seconds) to the server using the `insert` method:

In [None]:
def generate_random_data():
    device_ids = ["sensor1", "sensor2", "sensor3"]
    device_id = random.choice(device_ids)

    # Generate values based on the device type
    if device_id == "sensor1":
        val1 = random.randint(0, 50)
        val2 = round(random.uniform(0.0, 25.0), 2)
    elif device_id == "sensor2":
        val1 = random.randint(40, 80)
        val2 = round(random.uniform(20.0, 75.0), 2)
    else:  # sensor3
        val1 = random.randint(70, 100)
        val2 = round(random.uniform(70.0, 100.0), 2)

    return {
        "time": {"$date": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")},
        "deviceID": device_id,
        "val1": val1,
        "val2": val2
    }

In [None]:
i = 0
while i < 1200:
    payload = {
        "auth": AUTH_INFO,
        "request": {
            "method": "insert",
            "data": generate_random_data()
        }
    }
    try:
        response = requests.post(URL, json=payload, headers=HEADERS)
        i += 1
    except:
        continue

    time.sleep(3)
