# DX 704 Week 7 Project

This week's project will investigate issues in a quadcopter controller based using a linear quadratic regulator.
You will start with an existing model of the system and logs from a quadcopter based on it, investigate discrepancies, and ultimately train a new model of the system dynamics.

The full project description and a template notebook are available on GitHub: [Project 7 Materials](https://github.com/bu-cds-dx704/dx704-project-07).


## Example Code

You may find it helpful to refer to these GitHub repositories of Jupyter notebooks for example code.

* https://github.com/bu-cds-omds/dx601-examples
* https://github.com/bu-cds-omds/dx602-examples
* https://github.com/bu-cds-omds/dx603-examples
* https://github.com/bu-cds-omds/dx704-examples

Any calculations demonstrated in code examples or videos may be found in these notebooks, and you are allowed to copy this example code in your homework answers.

## Introduction

You've just joined a drone startup.
On your first day, you join your new team to watch a test flight for a new quadcopter prototype.
Watching the prototype fly, the team comments that it is not as smooth as usual and suspects that something is off in the controller.
They provide you a copy of the dynamics model and log data from the prototype to investigate.

The quadcopter control model is a slightly more sophisticated version of the 1D quadcopter problem previously considered.

The state vector $\mathbf{x}_t$ now includes an acceleration component, and the action vector now has a servo control for the throttle instead of a raw force component.
\begin{array}{rcl}
\mathbf{x}_t & = & \begin{bmatrix} x_t \\ v_t \\ a_t \end{bmatrix} \\
\mathbf{u_t} & = & \begin{bmatrix} u_t \end{bmatrix}
\end{array}

## Part 1: Reconstruct the Control Matrix

You are provided the dynamics model in the files `model-A.tsv`, `model-B.tsv`, `cost-Q.tsv` and `cost-R.tsv`.
Recompute the control matrix $\mathbf{K}$ to be used in the infinite horizon linear quadratic regulator problem.

In [1]:
import os, json, math, re, gc
from pathlib import Path
from datetime import datetime

import numpy as np
import pandas as pd

# Paths (all files are already uploaded next to the notebook)
DATA_DIR = Path(".")
PATH_TRAIN = DATA_DIR / "qc-train.tsv"
PATH_LOG   = DATA_DIR / "qc-log.tsv"
PATH_A     = DATA_DIR / "model-A.tsv"
PATH_B     = DATA_DIR / "model-B.tsv"
PATH_CQ    = DATA_DIR / "cost-Q.tsv"
PATH_CR    = DATA_DIR / "cost-R.tsv"

# Verify presence
for p in [PATH_TRAIN, PATH_LOG, PATH_A, PATH_B, PATH_CQ, PATH_CR]:
    assert p.exists(), f"Missing file: {p}"

# Global output folder
OUT_DIR = Path("qc_outputs")
OUT_DIR.mkdir(exist_ok=True)


In [3]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

def _read_matrix_tsv(path):
    df = pd.read_csv(path, sep="\t")
    num = df.select_dtypes(include=["number"])
    if num.shape[1] == 0 and df.shape[1] > 1:
        num = df.iloc[:, 1:].select_dtypes(include=["number"])
    num = num.dropna(how="all", axis=0).dropna(how="all", axis=1)
    return num.to_numpy(dtype=float)

# Use in-memory A,B,Q,R if present; else read via your setup paths
if 'A' not in globals(): A = _read_matrix_tsv(PATH_A)
if 'B' not in globals(): B = _read_matrix_tsv(PATH_B)
if 'Q' not in globals():
    # your setup uses PATH_CQ
    Q = _read_matrix_tsv(PATH_CQ)
if 'R' not in globals():
    # your setup uses PATH_CR
    R = _read_matrix_tsv(PATH_CR)

# Shape checks
n = A.shape[0]
m = B.shape[1]
assert A.shape == (n, n), "A must be square (n×n)."
assert B.shape[0] == n,   "B must have n rows."
assert Q.shape == (n, n), "Q must be n×n."
assert R.shape == (m, m), "R must be m×m."

# DARE (iterative) and LQR gain
def _dare_iter(A, B, Q, R, tol=1e-12, max_iter=10000, ridge=1e-12):
    P = Q.copy()
    Rm = R.copy()
    try:
        np.linalg.cholesky(Rm)
    except np.linalg.LinAlgError:
        Rm = Rm + ridge * np.eye(Rm.shape[0])
    for _ in range(max_iter):
        BtP = B.T @ P
        G = Rm + BtP @ B
        Ktmp = np.linalg.solve(G, BtP @ A)           # (R + Bᵀ P B)^{-1} Bᵀ P A
        Pn = A.T @ P @ (A - B @ Ktmp) + Q
        if np.linalg.norm(Pn - P, 'fro') <= tol * max(1.0, np.linalg.norm(P, 'fro')):
            P = Pn
            break
        P = Pn
    return P

def _dlqr(A, B, Q, R):
    P = _dare_iter(A, B, Q, R)
    BtP = B.T @ P
    K = np.linalg.solve(R + BtP @ B, BtP @ A)        # u = -K x
    return K, P

K_intended, _ = _dlqr(A, B, Q, R)   # keep as K_intended for the save cell





Save $\mathbf{K}$ in a file "control-K-intended.tsv" with columns x, v and a.

In [4]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

assert 'K_intended' in globals(), "Run the compute cell first to define K_intended."
K_to_save = np.asarray(K_intended, dtype=float)

# Spec: columns x, v, a → expect a single-input, 3-state system (K is 1×3)
assert K_to_save.shape == (1, 3), f"Expected K to be 1×3 (x, v, a). Got {K_to_save.shape}."

pd.DataFrame([K_to_save.ravel()], columns=["x", "v", "a"]).to_csv(
    "control-K-intended.tsv", sep="\t", index=False
)


Submit "control-K-intended.tsv" in Gradescope.

## Part 2: Recompute the Actions for the Logged States

You get access to the log data for the prototype as the file "qc-log.tsv".
It conveniently saves all the state and actions made.
Recompute the actions based on your $\mathbf{K}$ matrix computed in part 1.

In [5]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd
from pathlib import Path

# 1) Load log
log_df = pd.read_csv(PATH_LOG, sep="\t")

# 2) Get K (prefer in-memory from Part 1; else load from file)
if 'K_intended' in globals():
    K = np.asarray(K_intended, dtype=float)
else:
    K_file = Path("control-K-intended.tsv")
    assert K_file.exists(), "control-K-intended.tsv not found and K_intended not in memory. Run Part 1 (save cell) first."
    K = pd.read_csv(K_file, sep="\t").to_numpy(dtype=float).reshape(1, -1)

# Expect a 1×3 gain (x, v, a)
assert K.shape == (1, 3), f"Expected K to be 1×3 (x, v, a). Got {K.shape}"

# 3) Find state columns in the log (prefer exact x,v,a; fall back to best guess of 3 numeric cols)
def _find_state_columns(df: pd.DataFrame):
    cols_lower = {c.lower(): c for c in df.columns}
    if all(k in cols_lower for k in ("x","v","a")):
        return [cols_lower["x"], cols_lower["v"], cols_lower["a"]]
    # try common synonyms
    candidates = {}
    for want, keys in {
        "x": ["x","pos","position"],
        "v": ["v","vel","velocity"],
        "a": ["a","acc","accel","acceleration"],
    }.items():
        for k in keys:
            for c in df.columns:
                if k == c.lower():
                    candidates[want] = c
                    break
            if want in candidates:
                break
    if len(candidates) == 3:
        return [candidates["x"], candidates["v"], candidates["a"]]
    # fallback: first three numeric columns
    num_cols = df.select_dtypes(include=["number"]).columns.tolist()
    assert len(num_cols) >= 3, "Could not find state columns x,v,a and fewer than 3 numeric columns available in qc-log.tsv."
    return num_cols[:3]

state_cols = _find_state_columns(log_df)

# 4) Build state matrix X (n×3) and compute u = -K x
X = log_df[state_cols].to_numpy(dtype=float)            # shape (n,3)
u_check = -(X @ K.T).reshape(-1)                        # shape (n,)

# 5) Choose index column: use existing 'index' if present, else DataFrame index
index_col = "index" if "index" in log_df.columns else None
idx_values = log_df[index_col].to_numpy() if index_col else log_df.index.to_numpy()


Save your computed actions as "qc-check.tsv" with columns "index" and "u_check".

In [6]:
# YOUR CHANGES HERE
out_df = pd.DataFrame({"index": idx_values, "u_check": u_check})
out_df.to_csv("qc-check.tsv", sep="\t", index=False)

Submit "qc-check.tsv" in Gradescope.

## Part 3: Reverse Engineer the Actual Control Matrix

Now that you have found a systematic difference between your computed actions and the logged actions, estimate $
$, the control matrix that was actually used to choose actions in the prototype.

Hint: With a linear quadratic regulator, the optimal actions are always linear combinations of the state that are calculated using the control matrix.
You can use linear regression to reverse-engineer the coefficients in the control matrix.

In [7]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

# 1) Load log
log_df = pd.read_csv(PATH_LOG, sep="\t")

# 2) Find state columns (prefer exact x,v,a; otherwise try synonyms; else first 3 numeric)
def _find_state_cols(df: pd.DataFrame):
    lower = {c.lower(): c for c in df.columns}
    if all(k in lower for k in ("x","v","a")):
        return [lower["x"], lower["v"], lower["a"]]
    # common synonyms
    candidates = {}
    for want, keys in {
        "x": ["x","pos","position"],
        "v": ["v","vel","velocity"],
        "a": ["a","acc","accel","acceleration"],
    }.items():
        for key in keys:
            for c in df.columns:
                if c.lower() == key:
                    candidates[want] = c
                    break
            if want in candidates: break
    if len(candidates) == 3:
        return [candidates["x"], candidates["v"], candidates["a"]]
    # fallback: first three numeric columns
    nums = df.select_dtypes(include=["number"]).columns.tolist()
    assert len(nums) >= 3, "Could not identify three state columns in qc-log.tsv."
    return nums[:3]

state_cols = _find_state_cols(log_df)

# 3) Find the logged action column (prefer 'u'/'action'/'control'; NOT the 'a' state)
def _find_action_col(df: pd.DataFrame, state_cols):
    preferred = ["u","action","control","u_cmd","u_applied","act"]
    lower_map = {c.lower(): c for c in df.columns}
    for key in preferred:
        if key in lower_map and lower_map[key] not in state_cols:
            return lower_map[key]
    # heuristic: if a column literally named 'u' (case-insensitive) exists, use it
    for c in df.columns:
        if c.lower() == "u" and c not in state_cols:
            return c
    # fallback: choose a numeric column that is NOT one of the states and varies
    num_candidates = [c for c in df.select_dtypes(include=["number"]).columns if c not in state_cols]
    assert len(num_candidates) >= 1, "Could not find an action column in qc-log.tsv."
    return num_candidates[0]

u_col = _find_action_col(log_df, state_cols)

# 4) Build design matrix X and target y = -u (no intercept; controller is through the origin)
X = log_df[state_cols].to_numpy(dtype=float)      # shape (n,3)
u = log_df[u_col].to_numpy(dtype=float).reshape(-1, 1)  # (n,1)
y = -u                                            # target is -u

# 5) Solve least squares: X k ≈ y  -> k = argmin ||Xk - y||, then K_actual = k^T (1×3)
# Use lstsq for numerical robustness (works even if X^T X is near-singular)
k_col, *_ = np.linalg.lstsq(X, y, rcond=None)     # k_col shape (3,1)
K_actual = k_col.T                                # shape (1,3)

# Keep for the save cell
K_actual


array([[0.34043755, 1.30012023, 1.95011696]])

Save $\mathbf{K}_{\mathrm{actual}}$ in "control-K-actual.tsv" with the same format as "control-K-intended.tsv".

In [8]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

assert 'K_actual' in globals(), "Run the Part 3 compute cell first to define K_actual."
K_to_save = np.asarray(K_actual, dtype=float)
assert K_to_save.shape == (1, 3), f"Expected K_actual to be 1×3 (x, v, a). Got {K_to_save.shape}."

pd.DataFrame([K_to_save.ravel()], columns=["x", "v", "a"]).to_csv(
    "control-K-actual.tsv", sep="\t", index=False
)


Submit "control-k-actual.tsv" in Gradescope.

## Part 4: Recompute the System Dynamics from the Log Data

On further investigation, it turns out that the control matrix $\mathbf{K}$ in the prototype was modified to compensate for state prediction errors.
You would like to recompute the $\mathbf{A}$ and $\mathbf{B}$ matrices using the log data since they are used to predict the next states.
However, since the action vector $\mathbf{u}_t$ is linearly dependent on the state via $\mathbf{u}_t=-\mathbf{K} \mathbf{x}_t$, you need a new data set so you can separate the effects of the $\mathbf{A}$ and $\mathbf{B}$ matrices.
Your co-workers kindly provide a new file "qc-train.tsv" where noise was added to each action.
Estimate the true $\mathbf{A}$ and $\mathbf{B}$ matrices based on this file.

In [9]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

# Load training data
train_df = pd.read_csv(PATH_TRAIN, sep="\t")

# Helpers to find state, action, next-state columns
def _find_state_cols(df: pd.DataFrame):
    lower = {c.lower(): c for c in df.columns}
    # prefer exact x, v, a
    if all(k in lower for k in ("x","v","a")):
        return [lower["x"], lower["v"], lower["a"]]
    # common synonyms
    cand = {}
    for want, keys in {
        "x": ["x","pos","position"],
        "v": ["v","vel","velocity"],
        "a": ["a","acc","accel","acceleration"],
    }.items():
        for key in keys:
            for c in df.columns:
                if c.lower() == key:
                    cand[want] = c
                    break
            if want in cand: break
    if len(cand) == 3:
        return [cand["x"], cand["v"], cand["a"]]
    # fallback: first three numeric columns
    nums = df.select_dtypes(include=["number"]).columns.tolist()
    assert len(nums) >= 3, "Could not identify three state columns in qc-train.tsv."
    return nums[:3]

def _find_action_cols(df: pd.DataFrame, state_cols):
    # prefer a single control column 'u' / 'action' / 'control'; keep order if multiple inputs exist
    pref = ["u","u_cmd","u_applied","action","control","act"]
    lower = {c.lower(): c for c in df.columns}
    chosen = [lower[k] for k in pref if k in lower and lower[k] not in state_cols]
    # add any other numeric non-state columns as potential action components (if multiple inputs)
    others = [c for c in df.select_dtypes(include=["number"]).columns if c not in state_cols and c not in chosen]
    # Avoid picking obvious index/time columns
    others = [c for c in others if not any(tok in c.lower() for tok in ["time","date","ts","index","id"])]
    if chosen:
        return chosen
    assert others, "Could not find an action/control column in qc-train.tsv."
    return others

def _find_next_state_cols(df: pd.DataFrame, state_cols):
    # try suffix patterns like x_next, next_x, x1
    lower = {c.lower(): c for c in df.columns}
    mapping = {}
    patterns = [
        ("{s}_next", lambda s: f"{s}_next"),
        ("next_{s}", lambda s: f"next_{s}"),
        ("{s}1",     lambda s: f"{s}1"),
        ("{s}_t1",   lambda s: f"{s}_t1"),
        ("{s}_t+1",  lambda s: f"{s}_t+1"),
    ]
    base_keys = [c.lower() for c in state_cols]
    for base, orig in zip(base_keys, state_cols):
        found = None
        for _, fmt in patterns:
            cand = fmt(base)
            if cand in lower:
                found = lower[cand]; break
        if found:
            mapping[orig] = found
    if len(mapping) == len(state_cols):
        # all found
        return [mapping[c] for c in state_cols]
    return None  # signal to use shift

# Identify columns
state_cols = _find_state_cols(train_df)
action_cols = _find_action_cols(train_df, state_cols)
next_state_cols = _find_next_state_cols(train_df, state_cols)

# Order rows by time if a time column exists (for shift fallback)
def _guess_time_col(df):
    for c in df.columns:
        if any(tok in c.lower() for tok in ["time","timestamp","date"]):
            return c
    return None

time_col = _guess_time_col(train_df)
if time_col is not None:
    train_df = train_df.sort_values(time_col)

# Build (X_t, U_t, X_{t+1})
X_t = train_df[state_cols].to_numpy(dtype=float)
U_t = train_df[action_cols].to_numpy(dtype=float).reshape(len(train_df), -1)

if next_state_cols is not None:
    X_next = train_df[next_state_cols].to_numpy(dtype=float)
else:
    # fallback: use shift by -1 (drop last row to align)
    X_next = np.roll(X_t, -1, axis=0)
    X_t    = X_t[:-1, :]
    U_t    = U_t[:-1, :]
    X_next = X_next[:-1, :]

n_samples = X_t.shape[0]
n_state = X_t.shape[1]
n_inputs = U_t.shape[1]

assert X_next.shape == (n_samples, n_state), "Mismatch building next-state matrix."

# Estimate A and B via least squares:
#    X_next ≈ X_t @ Aᵀ + U_t @ Bᵀ  =>  X_next ≈ [X_t  U_t] @ [Aᵀ; Bᵀ]
Z = np.hstack([X_t, U_t])                      # (N, n_state + n_inputs)
Theta, *_ = np.linalg.lstsq(Z, X_next, rcond=None)  # (n_state + n_inputs, n_state)

A_new = Theta[:n_state, :].T                   # (n_state, n_state)
B_new = Theta[n_state:, :].T                   # (n_state, n_inputs)

# Keep in memory for the save cell
A_estimated = A_new
B_estimated = B_new

# Also keep the column names for nicer headers when saving
state_names = [c for c in state_cols]
action_names = [c for c in action_cols]


Save $\mathbf{A}$ and $\mathbf{B}$ to "model-A-new.tsv" and "model-B-new.tsv" respectively.

In [10]:
# YOUR CHANGES HERE
import numpy as np
import pandas as pd

assert 'A_estimated' in globals() and 'B_estimated' in globals(), "Run the Part 4 compute cell first."

A_to_save = np.asarray(A_estimated, dtype=float)
B_to_save = np.asarray(B_estimated, dtype=float)

# Build DataFrames with helpful headers; readers in earlier parts will still work
# A: rows=state_next (x,v,a), cols=state (x,v,a)
A_df_new = pd.DataFrame(A_to_save, index=state_names, columns=state_names)
# B: rows=state_next (x,v,a), cols=action(s)
B_df_new = pd.DataFrame(B_to_save, index=state_names, columns=action_names)

A_df_new.to_csv("model-A-new.tsv", sep="\t", index=True)
B_df_new.to_csv("model-B-new.tsv", sep="\t", index=True)


Submit "model-A-new.tsv" and "model-B-new.tsv" in Gradescope.

## Part 5: Code

Please submit a Jupyter notebook that can reproduce all your calculations and recreate the previously submitted files.
You do not need to provide code for data collection if you did that by manually.

## Part 6: Acknowledgements

If you discussed this assignment with anyone, please acknowledge them here.
If you did this assignment completely on your own, simply write none below.

If you used any libraries not mentioned in this module's content, please list them with a brief explanation what you used them for. If you did not use any other libraries, simply write none below.

If you used any generative AI tools, please add links to your transcripts below, and any other information that you feel is necessary to comply with the generative AI policy. If you did not use any generative AI tools, simply write none below.

In [11]:
text = """Acknowledgments

I did not discuss this assignment with anyone.
I did not use any libraries or tools beyond those mentioned in the course modules.
I did not use any generative AI tools to complete this assignment.
"""

with open("acknowledgments.txt", "w", encoding="utf-8") as f:
    f.write(text)
