# Level 3: Processing Line Protocol & uploading to InfluxDB

## Requirements

1. [InfluxDB installed](https://www.influxdata.com/downloads/).
2. Export InfluxDB API Key in `.env` file.
3. Prepare preprocessed CSV data using previous notebook or other tools.

In [1]:
!mkdir -p ../data_irem/line_protocol

In [2]:
import pandas as pd
import os
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
from pathlib import Path

from dotenv import load_dotenv
load_dotenv("../.env")


DATA_PREROCESSED_DIR = Path("../data_irem/csv")
DATA_LINE_PROTOCOL_DIR = Path("../data_irem/line_protocol")

TOKEN = os.environ.get("INFLUXDB_TOKEN")
URL = "http://localhost:8086"
ORG = "radem"

BUCKET = "radem"

## Tools

### Setting up the InfluxDB connection

In [5]:
def get_write_api(url: str = URL, token: str = TOKEN, org: str = ORG):
    client = influxdb_client.InfluxDBClient(
        url=url,
        token=token,
        org=org
    )

    write_api = client.write_api(write_options=SYNCHRONOUS)

    return write_api

### Reading preprocessed CSV data

In [6]:
def read_particles(filename: Path) -> pd.DataFrame:
    df = pd.read_csv(str(filename))

    # Convert time
    df['time'] = pd.to_datetime(df['time'])

    # Convert time to ns for InfluxDB
    df['time_ns'] = pd.to_datetime(df['time']).astype('int64')

    # Converts
    df["bin"] = df["bin"].astype("int8")
    df["value"] = df["value"].astype("int64")

    return df

### Converting DataFrame -> Line Protocol

In [7]:
def convert_particles_to_line_protocol(df: pd.DataFrame, measurement_name: str) -> pd.DataFrame:
    df = pd.DataFrame(
        measurement_name + 
        ",bin=" + df["bin"].astype(str) + " " 
        "value=" + df["value"].astype(str) + "i " + 
        df['time_ns'].astype(str),
        columns=["line"]
    )
    return df

### Saving Line Protocol file

Example line: `my_measurement,event_type=e,channel=0 value=123 1556813561098000000`


In [8]:
def save_line_protocol(df: pd.DataFrame, filename: Path):
    df.to_csv(filename, index=False, header=False)

### Reading Line Protocol file

In [9]:
def read_line_protocol(filename: Path) -> pd.DataFrame:
    return pd.read_csv(
        str(filename), 
        header=None, 
        sep='\0', 
        names=['line']
    )

### Upload data to InfluxDB

In [10]:
def upload_line_protocol(
        write_api: influxdb_client.WriteApi, 
        df_lines: pd.DataFrame,
        bucket: str,
        org: str, 
        batch_size: int = 1000000) -> None:
    for batch in range(0, len(df_lines), batch_size):
        batch_end = min(batch + batch_size - 1, len(df_lines) - 1)
        batch_indices = slice(batch, batch_end)

        print(f"Uploading batch of {batch_indices.stop - batch_indices.start + 1} records, from {batch_indices.start} to {batch_indices.stop}.")

        write_api.write(bucket, org, df_lines.loc[batch_indices, 'line'])

    write_api.flush()

## Pipelines

### Particles

In [11]:
# WARNING: Be prepared for using 20GB of RAM xD

for particle in ["irem_d1", "irem_d2", "irem_coin", "irem_d3"]:
    print(f"Uploading {particle} data...")

    # 1. get the newest CSV filename
    csv_filename = sorted(DATA_PREROCESSED_DIR.glob(f"{particle}_2*.csv"))[-1]

    # 2. read the CSV file
    df = read_particles(csv_filename)
    print(f"Read {len(df)} records from {csv_filename}")

    # 3. convert the CSV file to line protocol
    df_lines = convert_particles_to_line_protocol(df, particle)

    # 4. (optional) save the line protocol to a file
    line_protocol_filename = DATA_LINE_PROTOCOL_DIR / f"{csv_filename.stem}.line"
    save_line_protocol(df_lines, line_protocol_filename)

    # 5. upload the line protocol to InfluxDB
    write_api = get_write_api(
        url=URL, 
        token=TOKEN, 
        org=ORG)

    upload_line_protocol(
        write_api=write_api, 
        df_lines=df_lines, 
        bucket=BUCKET, 
        org=ORG)
    
    write_api.close()


Uploading irem_d1 data...
Read 53417756 records from ../data_irem/csv/irem_d1_20240716T183637.csv
Uploading batch of 1000000 records, from 0 to 999999.
Uploading batch of 1000000 records, from 1000000 to 1999999.
Uploading batch of 1000000 records, from 2000000 to 2999999.
Uploading batch of 1000000 records, from 3000000 to 3999999.
Uploading batch of 1000000 records, from 4000000 to 4999999.
Uploading batch of 1000000 records, from 5000000 to 5999999.
Uploading batch of 1000000 records, from 6000000 to 6999999.
Uploading batch of 1000000 records, from 7000000 to 7999999.
Uploading batch of 1000000 records, from 8000000 to 8999999.
Uploading batch of 1000000 records, from 9000000 to 9999999.
Uploading batch of 1000000 records, from 10000000 to 10999999.
Uploading batch of 1000000 records, from 11000000 to 11999999.
Uploading batch of 1000000 records, from 12000000 to 12999999.
Uploading batch of 1000000 records, from 13000000 to 13999999.
Uploading batch of 1000000 records, from 140000

: 