In [1]:
import influxdb_client_3
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions

In [2]:
client = InfluxDBClient3(host=f"http://3.98.181.12:9000",
                        database=f"WFRtest",
                        token=f"apiv3_gIW9ZiakRQi_JrfdaJA2Q7VBcUym95S76lbhmbn59dtmDKNk-yZRyZQwJcu7gElvn_1yWpRnbwwy-rjuwUbQMw")

In [3]:
point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1748678400"
client.write(record=point, write_precision="s")

In [None]:
# Execute an SQL query
table = client.query(query='''SELECT room
                            FROM home
                            WHERE temp=21.1
                              AND time=from_unixtime(1748678400)''')
# table is a pyarrow.Table
room = table[0][0]
assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room"

In [None]:
print(f"Room: {room}")

In [None]:
# Code below uploads to InfluxDB 3.0 using the InfluxDBClient3
import csv
from datetime import datetime, timezone
import time
import logging
from tqdm import tqdm
import cantools

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions

# Setup error logging
logging.basicConfig(
    filename='parse_errors.log',
    filemode='w',
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


def parse_can_csv_row(row, db):
    try:
        can_id_int = int(row['message_id'])
    except (KeyError, ValueError):
        return [f"Error: Invalid CAN ID '{row.get('message_id')}'"] # Return list with error string

    try:
        message = db.get_message_by_frame_id(can_id_int)
    except KeyError:
        return [f"Error: No message found for CAN ID {can_id_int}"]

    # Byte parsing - aiming to replicate your original logic's expectation
    # This assumes all 'byte0' through 'byte7' fields are expected to be present in the CSV
    # and contain valid integer strings.
    byte_values_from_row = []
    try:
        for i in range(8):  # Expect byte0 through byte7
            byte_str = row.get(f'byte{i}')
            if byte_str is None or byte_str == '':
                # Original code's `if len(data_list) < 8:` check implies all 8 bytes must be present.
                # If a byte field is missing/empty, the original `data_list` would be shorter,
                # causing the length check to fail.
                return [f"Error: Missing or empty value for byte{i} for CAN ID {can_id_int}. All 8 byte fields are expected."]
            byte_values_from_row.append(int(byte_str)) # Can raise ValueError
        data_for_decode = bytes(byte_values_from_row)
    except ValueError as e: # From int() conversion
        return [f"Error: Non-integer byte value in CSV for CAN ID {can_id_int} (byte{i}: '{byte_str}') — {e}"]
    except Exception as e: # General catch for byte parsing phase
        return [f"Error: Failed byte parse for CAN ID {can_id_int} — {e}"]

    try:
        decoded = message.decode(data_for_decode)
    except Exception as e:
        return [f"Error: Decoding error for CAN ID {can_id_int} with data {data_for_decode.hex()} — {e}"]

    try:
        ts_str = row['timestamp']
        ts = float(ts_str)
        ts_dt = datetime.fromtimestamp(ts, tz=timezone.utc)
    except (KeyError, ValueError, TypeError):
        ts_dt = datetime.now(timezone.utc) # Fallback timestamp

    line_protocol_points = []
    measurement_name = "canBus"

    # Helper function for escaping characters in InfluxDB Line Protocol
    def escape_lp_component(value, is_field_string_value=False):
        s_value = str(value)
        if is_field_string_value:
            # For field string values: escape backslashes and double quotes
            return s_value.replace('\\', '\\\\').replace('"', '\\"')
        else:
            # For measurement, tag keys, tag values, field keys:
            # escape spaces, commas, and equal signs
            s_value = s_value.replace(',', '\\,')
            s_value = s_value.replace(' ', '\\ ')
            s_value = s_value.replace('=', '\\=')
            return s_value

    # Pre-escape parts that are common to all signals in this message
    escaped_message_name = escape_lp_component(message.name)
    escaped_can_id_str = escape_lp_component(str(can_id_int))

    for sig_name, raw_signal_value in decoded.items():
        sig = message.get_signal_by_name(sig_name)
        unit = getattr(sig, 'unit', '') or "N/A"

        if hasattr(raw_signal_value, 'value'):  # It's an Enum-like object from cantools
            numeric_val = float(raw_signal_value.value)
            label_str = str(raw_signal_value.name)
        else:  # It's a direct numeric value
            numeric_val = float(raw_signal_value)
            label_str = str(raw_signal_value)

        # Escape components for Line Protocol
        escaped_sig_name = escape_lp_component(sig_name)
        escaped_unit_val = escape_lp_component(unit, is_field_string_value=True)
        escaped_label_val = escape_lp_component(label_str, is_field_string_value=True)

        # Timestamp in nanoseconds (InfluxDB default precision)
        timestamp_ns = int(ts_dt.timestamp() * 1_000_000_000)

        # Assemble tags string part: key1=value1,key2=value2
        tags_str = f"signalName={escaped_sig_name},messageName={escaped_message_name},canID={escaped_can_id_str}"

        # Assemble fields string part: key1=value1,key2="string value"
        # sensorReading is float, unit and signalLabel are strings (hence quoted)
        fields_str = f'sensorReading={numeric_val},unit="{escaped_unit_val}",signalLabel="{escaped_label_val}"'
        # If numeric_val could be NaN or Inf, f-string will produce 'nan', 'inf', '-inf', which is standard.

        # Assemble the final line protocol string for this signal point
        # Format: measurement,tag_set field_set timestamp
        lp_point = f"{measurement_name},{tags_str} {fields_str} {timestamp_ns}"
        line_protocol_points.append(lp_point)

    return line_protocol_points



def process_csv(file_path, db, influx_client, database_name, mps=400):
    batch = []
    count = 0
    start = time.time()

    with open(file_path, newline='') as csvfile:
        reader = list(csv.DictReader(csvfile))
        total = len(reader)
        for idx, row in enumerate(tqdm(reader, total=total, desc="Uploading CAN data")):
            pts = parse_can_csv_row(row, db)
            if pts and isinstance(pts[0], str) and pts[0].startswith("Error:"):
                logging.error(f"Row {idx}: {pts[0]}")
            else:
                batch.extend(pts)
                count += 1

            if count >= mps:
                if batch:
                    influx_client.write(
                        database=database_name,
                        record=batch,
                        write_precision="ns",
                        write_options=WriteOptions(batch_size=10000, flush_interval=100)
                    )
                    batch = []
                elapsed = time.time() - start
                if elapsed < 1:
                    time.sleep(1 - elapsed)
                start = time.time()
                count = 0

    if batch:
        influx_client.write(
            database=database_name,
            record=batch,
            write_precision="ns",
            write_options=WriteOptions(batch_size=10000, flush_interval=100)
        )


def main():
    dbc_file = 'local_analysis/WFR25-3.dbc'
    try:
        db = cantools.database.load_file(dbc_file)
        print(f"Loaded DBC: {dbc_file}")
    except Exception as e:
        print(f"Failed loading DBC: {e}")
        return

    influx_url = "http://3.98.181.12:9000"
    influx_token = "apiv3_gIW9ZiakRQi_JrfdaJA2Q7VBcUym95S76lbhmbn59dtmDKNk-yZRyZQwJcu7gElvn_1yWpRnbwwy-rjuwUbQMw"
    database_name = "WFRtest"

    client = InfluxDBClient3(
        host=influx_url,
        database=database_name,
        token=influx_token
    )

    csv_path = 'local_analysis/cleaned_can.csv'
    process_csv(csv_path, db, client, database_name, mps=10000)

    print("Finished writing all points.")
    print("Errors (if any) logged in parse_errors.log")


if __name__ == "__main__":
    main()


In [9]:
result = client.query(query="SELECT * FROM home", database="WFRtest")

In [10]:
print(result)

pyarrow.Table
co: int64
hum: double
room: string
temp: double
time: timestamp[ns] not null
----
co: [[0]]
hum: [[35.9]]
room: [["Living Room"]]
temp: [[21.1]]
time: [[2025-05-31 08:00:00.000000000]]
