In [7]:
import pandas as pd
import numpy as np

### Strategy mapping

In [1]:
STRATEGY_MAP = {
    "monitoring": 0,
    "on_change": 1,
    "hysteresis": 2,
    "on_delta_change": 3 # on_enter, on_exit, on_both are omitted to simplify the model.
}
REVERSE_STRATEGY_MAP = {v: k for k, v in STRATEGY_MAP.items()}

### State encoder / decoder

In [2]:
MAX_OBD_PARAMS = 3

def encode_config(config: dict) -> list:
    vector = []

    # OBD parameter block
    for i in range(MAX_OBD_PARAMS):
        if i < len(config["obd_parameters"]):
            param = config["obd_parameters"][i]
            pid = param.get("pid", 0)
            enabled = 1 if param.get("enabled", False) else 0
            strategy = STRATEGY_MAP.get(param.get("strategy", "monitoring"), 0)
            min_time = param.get("min_time", 10)
        else:
            pid, enabled, strategy, min_time = 0, 0, 0, 0
        vector.extend([pid, enabled, strategy, min_time])

    # Global settings
    global_ = config.get("global_settings", {})
    vector.extend([
        global_.get("send_period", 10),
        global_.get("min_saved_records", 1)
    ])

    return vector


def decode_config(vector: list) -> dict:
    config = {"obd_parameters": [], "global_settings": {}}

    for i in range(MAX_OBD_PARAMS):
        base = i * 4
        pid, enabled, strategy, min_time = vector[base:base+4]
        if pid != 0:
            config["obd_parameters"].append({
                "pid": pid,
                "enabled": bool(enabled),
                "strategy": REVERSE_STRATEGY_MAP.get(strategy, "monitoring"),
                "min_time": min_time
            })

    g_base = MAX_OBD_PARAMS * 4
    config["global_settings"]["send_period"] = vector[g_base]
    config["global_settings"]["min_saved_records"] = vector[g_base + 1]

    return config

### Actions 

In [3]:
"""
    new_state, trace = apply_action(state_vector, 4)
    print("New state:", new_state)
    print("Action trace:", trace)
"""


def apply_action(vector: list[int], action_id: int, baseline: list[int] = None) -> tuple[list[int], dict]:
    new_vector = vector.copy()
    strategy_values = [0, 1, 2, 3]  # monitoring, on_change, hysteresis, delta_change
    trace = {"action_id": action_id, "change": None}

    for i in range(MAX_OBD_PARAMS):
        base = i * 4
        pid = new_vector[base]
        enabled = new_vector[base + 1]
        strategy = new_vector[base + 2]
        min_time = new_vector[base + 3]

        # 0–2: toggle enable
        if action_id == i:
            new_vector[base + 1] = 1 - enabled
            trace["change"] = f"PID {pid} enable toggled: {enabled} → {new_vector[base + 1]}"
            return new_vector, trace

        # 3–5: cycle strategy
        if action_id == i + 3:
            idx = strategy_values.index(strategy) if strategy in strategy_values else 0
            new_strategy = strategy_values[(idx + 1) % len(strategy_values)]
            new_vector[base + 2] = new_strategy
            trace["change"] = f"PID {pid} strategy changed: {strategy} → {new_strategy}"
            return new_vector, trace

        # 6–8: increase min_time
        if action_id == i + 6:
            new_time = min(min_time + 1, 30)
            new_vector[base + 3] = new_time
            trace["change"] = f"PID {pid} min_time increased: {min_time} → {new_time}"
            return new_vector, trace

        # 9–11: decrease min_time
        if action_id == i + 9:
            new_time = max(min_time - 1, 1)
            new_vector[base + 3] = new_time
            trace["change"] = f"PID {pid} min_time decreased: {min_time} → {new_time}"
            return new_vector, trace

    # Globals
    g_base = MAX_OBD_PARAMS * 4
    send_period = new_vector[g_base]
    min_saved = new_vector[g_base + 1]

    if action_id == 12:
        new_vector[g_base] = max(1, send_period - 1)
        trace["change"] = f"send_period decreased: {send_period} → {new_vector[g_base]}"

    elif action_id == 13:
        new_vector[g_base] = min(60, send_period + 1)
        trace["change"] = f"send_period increased: {send_period} → {new_vector[g_base]}"

    elif action_id == 14:
        new_vector[g_base + 1] = max(1, min_saved - 1)
        trace["change"] = f"min_saved_records decreased: {min_saved} → {new_vector[g_base + 1]}"

    elif action_id == 15:
        new_vector[g_base + 1] = min(10, min_saved + 1)
        trace["change"] = f"min_saved_records increased: {min_saved} → {new_vector[g_base + 1]}"

    elif action_id == 16 and baseline:
        new_vector = baseline.copy()
        trace["change"] = f"configuration reset to baseline"

    return new_vector, trace





### Reward function

In [44]:
"""
reward = latency_score + sum(pid_scores)

compute_reward() inputs: 
    latency_ms: int
    pid_data_list: List[Dict]  # One per PID

    
    latency_ms = 1850
    pid_data_list = [
        {
            "pid": 12,
            "values": [1500, 1500, 1500],
            "strategy": "on_change",
            "precision": 100,
            "valid_range": (800, 6000)
        },
        ...
    ]

Precision
    This defines how much a value must change to be considered “meaningful.”

    Even if all values are valid, we may want to ignore tiny fluctuations (e.g., RPM changes by 1 unit) and log only useful variation (e.g., ±100 RPM).
    
"""

def latency_score(latency_ms: int) -> float:
    """Scores latency from 1.0 (fast) to -0.5 (excessive delay)."""
    if latency_ms <= 10_000:
        return 1.0
    elif latency_ms <= 120_000:
        return 1 - (latency_ms - 10_000) / 110_000
    else:
        return -0.5
    

def compute_pid_scores(pid_data_list: list[dict]) -> list[dict]:
    """
    Computes per-PID scores based on values, strategy, precision, and valid_range.
    Filters out 'pid' before calling the scoring function.
    """
    scores = []

    for entry in pid_data_list:
        score = data_quality_score(
            values=entry["values"],
            strategy=entry["strategy"],
            precision=entry["precision"],
            valid_range=entry["valid_range"]
        )
        scores.append({
            "pid": entry["pid"],
            "score": round(score, 3)
        })

    return scores



def data_quality_score(
    values: list[float],
    strategy: str,
    precision: float,
    valid_range: tuple[float, float],
) -> float:
    """Scores how well a single PID was logged based on value range, variation, and strategy fit."""
    if not values:
        return -1.0

    # 1. Range check
    if not all(valid_range[0] <= v <= valid_range[1] for v in values):
        return -1.0

    # 2. Variation check
    variation = max(values) - min(values)
    significant = variation >= precision

    # 3. Strategy match evaluation
    if strategy == "on_change":
        return 1.0 if not significant else 0.5
    elif strategy == "on_delta_change":
        return 1.0 if significant else -0.5
    elif strategy == "hysteresis":
        return 0.8 if significant else 0.2
    elif strategy == "monitoring":
        return 0.6 if significant else -0.2

    return 0.0  # Fallback for unrecognized strategies


def compute_average_quality(pid_data_list: list[dict]) -> float:
    """
    Computes the total quality score by summing each PID's score.
    Each PID contributes equally to the total reward (no weighting).
    """
    scores = [data_quality_score(**entry) for entry in pid_data_list]
    return round(sum(scores), 3) if scores else 0.0


def clip_reward(score, min_val=-1.0, max_val=3.0):
    """Clipping reward is an optional step
    it helps reduce the traininf dominance of extreme values
    and when a tighter contol over agent learning rate stability is needed."""
    return max(min(score, max_val), min_val)


def compute_reward(latency_ms: int, pid_data_list: list[dict]) -> float:
    """
    Composite reward:
    - Latency is scored once for the entire configuration
    - Each PID adds to or subtracts from the total score
    """
    latency = latency_score(latency_ms)
    quality_sum = compute_average_quality(pid_data_list)
    return round(latency + quality_sum, 3)

def compute_reward_with_details(latency_ms: int, pid_data_list: list[dict]) -> tuple[float, dict]:
    """
    Returns:
    - Total reward score
    - A breakdown dictionary with latency score and individual PID scores
    """
    latency = latency_score(latency_ms)
    pid_scores = compute_pid_scores(pid_data_list)
    total_pid_score = sum(entry["score"] for entry in pid_scores)

    total_reward = round(latency + total_pid_score, 3)

    breakdown = {
        "latency_ms": latency_ms,
        "latency_score": round(latency, 3),
        "pid_scores": pid_scores,
        "total_pid_score": round(total_pid_score, 3),
    }

    return total_reward, breakdown

### Loop

In [5]:
Q = {}  # state-action table
state = encode_config(baseline)

for episode in range(N):
    action = choose_action(Q, state, epsilon=0.1)
    state_next, trace = apply_action(state, action)
    
    # Simulate or replay results for this config
    latency_list, pid_data_list, msg_sent, msg_meaningful = run_or_replay_sim(state_next)
    
    reward = compute_reward(latency_list, pid_data_list, msg_sent, msg_meaningful)
    
    # Update Q
    update_q(Q, state, action, reward, state_next)
    
    # Move to next
    state = state_next


NameError: name 'baseline' is not defined

### References


**Implementation references**

- OpenAI Spinning Up: https://spinningup.openai.com
Although it focuses more on policy-gradient methods, it gives good context on where Q-learning fits in the broader RL ecosystem.

- RL Course by David Silver (DeepMind)
Lectures 4–6 cover model-free methods, including Q-Learning.

- Towards Data Science
https://towardsdatascience.com/reinforcement-learning-explained-visually-part-4-q-learning-step-by-step-b65efb731d3e/



**Academic references**

1. Watkins, C.J.C.H., & Dayan, P. (1992)
   Q-learning: https://link.springer.com/article/10.1007/BF00992698

   
2. Sutton, R. S., & Barto, A. G. (2018)
    Reinforcement Learning: An Introduction (2nd Edition)
    Chapter 6 covers Q-Learning in depth.
    http://incompleteideas.net/book/the-book-2nd.html 

In [25]:
def flag_latency_outliers(freq_df: pd.DataFrame, threshold_ms: int = 7_200_000) -> pd.DataFrame:
    """
    Adds a boolean column 'is_latency_outlier' to the input DataFrame,
    where True indicates latency greater than the given threshold (default: 2 hours).
    """

    # Convert timestamps
    freq_df["ts_recorded"] = pd.to_datetime(freq_df["ts_recorded"])
    freq_df["ts_uploaded"] = pd.to_datetime(freq_df["ts_uploaded"])

    # Compute latency in milliseconds
    freq_df["latency_ms"] = (freq_df["ts_uploaded"] - freq_df["ts_recorded"]).dt.total_seconds() * 1000

    # Flag all rows with latency above the threshold
    freq_df["is_latency_outlier"] = freq_df["latency_ms"] > threshold_ms

    return freq_df


def get_latency(start_ts, end_ts, method='median'):
    freq_df = pd.read_csv('../../data_proc/csv_data/qa_device/frequencies.csv', low_memory=False)
    freq_df.drop('Unnamed: 0', inplace=True, axis=1)

    # Parse timestamps
    freq_df["ts_recorded"] = pd.to_datetime(freq_df["ts_recorded"])
    freq_df["ts_uploaded"] = pd.to_datetime(freq_df["ts_uploaded"])

    # Filter within time range
    mask = (freq_df["ts_recorded"] >= pd.to_datetime(start_ts)) & (freq_df["ts_recorded"] <= pd.to_datetime(end_ts))
    filtered_df = freq_df[mask].copy()

    if filtered_df.empty:
        return None  # or raise an exception if preferred

    # Compute latency in ms
    filtered_df["latency_ms"] = (filtered_df["ts_uploaded"] - filtered_df["ts_recorded"]).dt.total_seconds() * 1000

    if method == 'median':
        return filtered_df["latency_ms"].median()
    elif method == 'mean':
        return filtered_df["latency_ms"].mean()
    else:
        raise ValueError("method must be 'median' or 'mean'")


# Define a field-to-PID mapping
FIELD_TO_PID = {
    "obd.rpm.value": 12,
    "obd.speed.value": 13,
    "obd.fuel_level.value": 28,
    "obd.coolant_temp.value": 38,
    "obd.engine_load.value": 39,
    "obd.intake_temp.value": 20,
    "obd.maf.value": 21,
    "obd.throttle_pos.value": 41,
    "obd.ambient_air_temp.value": 131,
    "obd.distance_since_codes_clear.value": 31,
    "obd.time_since_codes_cleared.value": 47,
    # Add more as needed...
}


def extract_pid_statistics(obd_df: pd.DataFrame, start_ts: str, end_ts: str) -> list[dict]:
    """
    Extracts per-PID reward inputs (PID ID, values, precision, valid range)
    from an obd_export dataframe filtered by timestamp range.
    Returns a list of dictionaries ready for reward scoring.
    """
    # Filter by time window
    obd_df["@ts"] = pd.to_datetime(obd_df["@ts"])
    mask = (obd_df["@ts"] >= pd.to_datetime(start_ts)) & (obd_df["@ts"] <= pd.to_datetime(end_ts))
    obd_df = obd_df[mask].copy()

    results = []

    for field, pid in FIELD_TO_PID.items():
        if field not in obd_df.columns:
            continue

        values = obd_df[field].dropna().astype(float).tolist()
        if not values:
            continue

        vmin, vmax = min(values), max(values)
        vrange = vmax - vmin

        precision = round(vrange * 0.1, 3) if vrange > 0 else 1.0
        valid_range = (vmin - vrange * 0.1, vmax + vrange * 0.1)

        results.append({
            "pid": pid,
            "values": values,
            "precision": precision,
            "valid_range": valid_range
        })

    return results

In [27]:
def summarize_pid_statistics(pid_data_list: list[dict]) -> pd.DataFrame:
    """
    Returns a DataFrame summarizing each PID's stats (excluding raw values).
    """
    summary = []

    for entry in pid_data_list:
        values = entry["values"]
        pid_summary = {
            "PID": entry["pid"],
            "Count": len(values),
            "Min": min(values) if values else None,
            "Max": max(values) if values else None,
            "Precision": entry["precision"],
            "Valid Range": entry["valid_range"],
        }
        summary.append(pid_summary)

    return pd.DataFrame(summary).sort_values(by="PID")


In [28]:
obd_df = pd.read_csv('../../data_proc/csv_data/qa_device/obd_export.csv', low_memory=False)
obd_df.drop('Unnamed: 0', inplace=True, axis=1)
obd_stats = extract_pid_statistics(obd_df, "2025-05-07T06:40:38Z", "2025-05-23T14:12:00Z" )

In [29]:
summarize_pid_statistics(obd_stats)

Unnamed: 0,PID,Count,Min,Max,Precision,Valid Range
0,12,77036,0.0,7975.0,797.5,"(-797.5, 8772.5)"
1,13,77054,0.0,199.0,19.9,"(-19.900000000000002, 218.9)"
5,20,918,-40.0,60.0,10.0,"(-50.0, 70.0)"
6,21,49,0.0,70.0,7.0,"(-7.0, 77.0)"
2,28,16640,0.0,74.0,7.4,"(-7.4, 81.4)"
9,31,77052,0.0,16013.0,1601.3,"(-1601.3000000000002, 17614.3)"
3,38,49,-40.0,110.0,15.0,"(-55.0, 125.0)"
4,39,16639,0.0,69.0,6.9,"(-6.9, 75.9)"
7,41,869,0.0,0.0,1.0,"(0.0, 0.0)"
10,47,77052,0.0,2997.0,299.7,"(-299.7, 3296.7)"


In [45]:
latency_ms = get_latency("2025-05-07T06:40:38Z", "2025-05-07T23:59:00Z")
obd_stats = extract_pid_statistics(obd_df, "2025-05-16T06:40:38Z", "2025-05-17T23:59:00Z" )

In [47]:
# TODO read actual configuration
for entry in obd_stats:
    entry["strategy"] = "on_change"  # or set based on real config if available

In [48]:
reward, details = compute_reward_with_details(latency_ms, obd_stats)

print("Total Reward:", reward)
print("Breakdown:")
for pid_entry in details["pid_scores"]:
    print(f"  PID {pid_entry['pid']}: {pid_entry['score']}")
print("Latency Score:", details["latency_score"])

Total Reward: 4.0
Breakdown:
  PID 12: 0.5
  PID 13: 0.5
  PID 131: 1.0
  PID 31: 0.5
  PID 47: 0.5
Latency Score: 1.0


In [51]:
details

{'latency_ms': np.float64(2259.188),
 'latency_score': 1.0,
 'pid_scores': [{'pid': 12, 'score': 0.5},
  {'pid': 13, 'score': 0.5},
  {'pid': 131, 'score': 1.0},
  {'pid': 31, 'score': 0.5},
  {'pid': 47, 'score': 0.5}],
 'total_pid_score': 3.0}