In [None]:
!pip install hdbscan

In [7]:
import ast
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader

In [2]:
# -------------------------------
# 1️⃣ 載入 CSV
# -------------------------------
def load_track_csv(path):
    """
    假設 CSV 每一列是 [frame_id, joint_id, x, y, score]
    或者是展開格式 (T, 33*3)
    """
    data = np.loadtxt(path, delimiter=",", skiprows=1)  # 視 CSV 格式調整
    return data

In [3]:
# -------------------------------
# 2️⃣ 前處理 Pose
# -------------------------------
def preprocess_pose(data):
    """
    把 CSV 轉成 (T, 33, 3)
    假設 data 是 (T, 33*3)
    """
    # 如果 data 已經是 (T, 33, 3)，就不用 reshape
    if data.ndim == 2 and data.shape[1] == 33*3:
        frames = data.reshape(-1, 33, 3)
    elif data.ndim == 3 and data.shape[1] == 33 and data.shape[2] == 3:
        frames = data
    else:
        raise ValueError(f"Unexpected data shape: {data.shape}")
    return frames.astype(np.float32)

In [4]:
# -------------------------------
# 3️⃣ Sliding Window
# -------------------------------
def sliding_window(frames, window=48, step=24):
    """
    frames: (T, 33, 3)
    return: (num_slices, window, 33, 3)
    """
    T = frames.shape[0]
    slices = []
    for start in range(0, T - window + 1, step):
        slices.append(frames[start:start + window])
    slices = np.array(slices, dtype=np.float32)
    return slices

In [6]:
import pandas as pd
df = pd.read_csv("track_2.csv")
print(df.head())
print(df.shape)

              frame                               p0  \
0  frame_000350.jpg  ('0.5185', '0.1512', '-2.2092')   
1  frame_000351.jpg  ('0.5241', '0.1518', '-2.2169')   
2  frame_000352.jpg  ('0.5127', '0.1494', '-2.2461')   
3  frame_000353.jpg  ('0.5147', '0.1479', '-2.2896')   
4  frame_000354.jpg  ('0.5164', '0.1488', '-2.4700')   

                                p1                               p2  \
0  ('0.5506', '0.1301', '-2.2026')  ('0.5668', '0.1285', '-2.2040')   
1  ('0.5575', '0.1299', '-2.2069')  ('0.5730', '0.1280', '-2.2084')   
2  ('0.5469', '0.1273', '-2.2364')  ('0.5648', '0.1260', '-2.2378')   
3  ('0.5476', '0.1248', '-2.2771')  ('0.5650', '0.1236', '-2.2786')   
4  ('0.5478', '0.1258', '-2.4605')  ('0.5646', '0.1243', '-2.4620')   

                                p3                               p4  \
0  ('0.5809', '0.1271', '-2.2037')  ('0.5015', '0.1272', '-2.2168')   
1  ('0.5867', '0.1265', '-2.2081')  ('0.5087', '0.1279', '-2.2250')   
2  ('0.5792', '0.1252',

In [8]:
def load_track_csv(path):
    df = pd.read_csv(path)
    # 去掉 frame 名稱欄
    df = df.drop(columns=["frame"])
    
    # 每個欄位都是一個 tuple 字串，要轉換
    def parse_tuple(s):
        return tuple(map(float, ast.literal_eval(s)))
    
    # 把 dataframe 每個元素都轉成 (x, y, z)
    parsed = df.applymap(parse_tuple).to_numpy()
    
    # 現在 parsed.shape = (T, 33)，裡面每個元素是一個 tuple(3,)
    # 需要轉成 numpy array (T, 33, 3)
    frames = np.array([[list(joint) for joint in row] for row in parsed], dtype=np.float32)
    return frames  # (T, 33, 3)

# 測試
frames = load_track_csv("track_2.csv")
print("frames.shape =", frames.shape)  # 應該是 (2657, 33, 3)

  parsed = df.applymap(parse_tuple).to_numpy()


frames.shape = (2657, 33, 3)


In [9]:
slices = sliding_window(frames)
print(slices.shape)  # (B, W, 33, 3)

(109, 48, 33, 3)


In [5]:
# -------------------------------
# 4️⃣ 主程式
# -------------------------------
frames = preprocess_pose(load_track_csv("track_2.csv"))
print("frames.shape =", frames.shape)  # Debug: 應該是 (T, 33, 3)

slices = sliding_window(frames)
print("slices.shape =", slices.shape)  # Debug: 應該是 (B, W, 33, 3)

B, W, J, C = slices.shape
slices_flat = slices.reshape(B, W, J * C)

ValueError: could not convert string 'frame_000350.jpg' to float64 at row 0, column 1.

In [None]:
# -------------------------------
# 5️⃣ DataLoader
# -------------------------------
dataset = TensorDataset(torch.tensor(slices_flat, dtype=torch.float32))
loader = DataLoader(dataset, batch_size=32, shuffle=True)

print("DataLoader 準備完成，批次數 =", len(loader))

In [10]:
# pipeline_full.py
import ast
import json
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from typing import List, Tuple, Dict

# -------------------------------
# 1) CSV -> (T, J, C)
# -------------------------------
def load_track_csv(path: str) -> np.ndarray:
    df = pd.read_csv(path)
    if "frame" in df.columns:
        df = df.drop(columns=["frame"])
    # parse tuple strings like "('0.5185','0.1512','-2.2092')"
    def parse_tuple(s):
        return tuple(map(float, ast.literal_eval(s)))
    parsed = df.applymap(parse_tuple).to_numpy()  # (T, J) of tuples
    frames = np.array([[list(j) for j in row] for row in parsed], dtype=np.float32)  # (T, J, 3)
    return frames

# -------------------------------
# 2) Preprocess: hip-centering + shoulder normalization
# -------------------------------
def preprocess_pose(frames: np.ndarray,
                    hip_idx: int = 0,      # choose index for hip / mid-hip
                    left_sh_idx: int = 11, # example indices (depends on your keypoints)
                    right_sh_idx: int = 12) -> np.ndarray:
    """
    frames: (T, J, 3)
    return: (T, J, 3) centered & normalized
    - translate so hip at origin
    - scale so shoulder distance = 1 (or other)
    """
    frames = frames.copy()
    # translate: subtract hip position per frame
    hip_pos = frames[:, hip_idx:hip_idx+1, :]  # (T,1,3)
    frames = frames - hip_pos  # hip at origin

    # shoulder distance per frame
    shoulder_vec = frames[:, left_sh_idx, :] - frames[:, right_sh_idx, :]  # (T,3)
    shoulder_dist = np.linalg.norm(shoulder_vec, axis=1)  # (T,)
    # avoid division by zero
    shoulder_dist[shoulder_dist == 0] = 1.0
    # scale each frame to make shoulder_dist == 1.0 (or any scale)
    frames = frames / shoulder_dist[:, None, None]
    return frames.astype(np.float32)

# -------------------------------
# 3) Sliding window
# -------------------------------
def sliding_window(frames: np.ndarray, window: int = 48, step: int = 24) -> np.ndarray:
    T = frames.shape[0]
    slices = []
    for start in range(0, T - window + 1, step):
        slices.append(frames[start:start + window])  # (window, J, C)
    return np.array(slices, dtype=np.float32)  # (B, W, J, C)

# -------------------------------
# 4) Flatten for LSTM input
# -------------------------------
def flatten_slices(slices: np.ndarray) -> np.ndarray:
    B, W, J, C = slices.shape
    return slices.reshape(B, W, J * C)  # (B, W, feat_dim)

# -------------------------------
# 5) LSTM Autoencoder (simple)
# -------------------------------
class LSTMAutoencoder(nn.Module):
    def __init__(self, feat_dim: int, hidden_dim: int = 128, latent_dim: int = 32, num_layers: int = 1):
        super().__init__()
        self.encoder_lstm = nn.LSTM(input_size=feat_dim, hidden_size=hidden_dim,
                                    num_layers=num_layers, batch_first=True)
        self.enc_fc = nn.Linear(hidden_dim, latent_dim)
        self.dec_fc = nn.Linear(latent_dim, hidden_dim)
        self.decoder_lstm = nn.LSTM(input_size=hidden_dim, hidden_size=feat_dim,
                                    num_layers=num_layers, batch_first=True)
        # We'll run a simple decoder: map latent to hidden and repeat across time
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim

    def forward(self, x):
        # x: (B, W, feat_dim)
        enc_out, (h_n, c_n) = self.encoder_lstm(x)  # enc_out: (B, W, hidden_dim)
        # use last time step hidden
        last_hidden = enc_out[:, -1, :]  # (B, hidden_dim)
        z = self.enc_fc(last_hidden)     # (B, latent_dim)

        # decode: expand z to sequence
        dec_hidden = torch.relu(self.dec_fc(z))  # (B, hidden_dim)
        # replicate to W steps as "inputs" for decoder LSTM
        B, W, _ = x.shape
        dec_in = dec_hidden.unsqueeze(1).repeat(1, W, 1)  # (B, W, hidden_dim)
        dec_out, _ = self.decoder_lstm(dec_in)  # (B, W, feat_dim)
        # For simplicity, decoder LSTM's output is final reconstruction
        return dec_out, z

# -------------------------------
# 6) Train function
# -------------------------------
def train_autoencoder(model: nn.Module, dataloader: DataLoader, epochs: int = 30, lr: float = 1e-3, device: str = "cpu"):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0.0
        for batch in dataloader:
            x = batch[0].to(device)  # (B, W, feat)
            recon, z = model(x)
            loss = criterion(recon, x)
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss += loss.item() * x.size(0)
        avg = total_loss / len(dataloader.dataset)
        if epoch % 5 == 0 or epoch == 1:
            print(f"[Epoch {epoch:03d}] loss={avg:.6f}")
    return model

# -------------------------------
# 7) Extract embeddings
# -------------------------------
def extract_embeddings(model: nn.Module, dataloader: DataLoader, device: str = "cpu") -> np.ndarray:
    model.to(device)
    model.eval()
    embeddings = []
    with torch.no_grad():
        for batch in dataloader:
            x = batch[0].to(device)
            _, z = model(x)
            embeddings.append(z.cpu().numpy())
    embeddings = np.vstack(embeddings)  # (B, latent_dim)
    return embeddings

# -------------------------------
# 8) Unsupervised clustering -> tokens
# -------------------------------
def cluster_embeddings(embeddings: np.ndarray, n_clusters: int = 16) -> np.ndarray:
    k = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    labels = k.fit_predict(embeddings)
    return labels  # (B,)

# -------------------------------
# 9) Simple Sequitur grammar induction (very minimal)
#    Not a full-featured, but finds repeated digrams and replaces them.
# -------------------------------
def sequitur_from_tokens(tokens: List[int]) -> Dict[str, List]:
    """
    Very small Sequitur-like implementation:
    - tokens: sequence of integers (token ids)
    - returns a grammar as dict: {'R1': [symbols...], 'R0': [axiom symbols...]}
    Note: symbols are ints or rule names 'R#'
    """
    # Start with axiom as token list
    grammar = {}
    axiom = list(tokens)
    rule_id = 1

    def find_repeated_digram(seq):
        counts = {}
        for i in range(len(seq) - 1):
            dg = (seq[i], seq[i+1])
            counts[dg] = counts.get(dg, 0) + 1
        # pick a digram with count >= 2
        for dg, c in counts.items():
            if c >= 2:
                return dg
        return None

    # iteratively replace repeated digrams
    while True:
        dg = find_repeated_digram(axiom)
        if dg is None:
            break
        # create rule for dg
        rule_name = f"R{rule_id}"
        grammar[rule_name] = [dg[0], dg[1]]
        # replace all occurrences
        i = 0
        new_axiom = []
        while i < len(axiom):
            if i < len(axiom) - 1 and (axiom[i], axiom[i+1]) == dg:
                new_axiom.append(rule_name)
                i += 2
            else:
                new_axiom.append(axiom[i])
                i += 1
        axiom = new_axiom
        rule_id += 1
        # safety break
        if rule_id > 200:
            break

    grammar["R0"] = axiom  # R0 is axiom
    return grammar

# -------------------------------
# 10) Convert grammar -> L-system JSON
# -------------------------------
def grammar_to_lsystem_json(grammar: Dict[str, List], token_to_symbol: Dict[int, str] = None) -> Dict:
    """
    Convert grammar (with tokens and rule names) into a JSON-friendly L-system:
    - rules: mapping from symbol to expansion (list)
    - axiom: initial sequence
    token_to_symbol: map token int -> textual symbol (like 'A','B',...)
    """
    # create mapping for token ints
    if token_to_symbol is None:
        token_to_symbol = {}
    def sym(x):
        if isinstance(x, str) and x.startswith("R"):
            return x  # rule reference stays as is
        elif isinstance(x, int):
            return token_to_symbol.get(x, f"T{x}")
        else:
            return str(x)

    rules_json = {}
    for rule, expansion in grammar.items():
        rules_json[rule] = [sym(s) for s in expansion]
    axiom = [sym(s) for s in grammar.get("R0", [])]
    return {"axiom": axiom, "rules": rules_json}

# -------------------------------
# 11) Example main pipeline
# -------------------------------
def main_pipeline(csv_path: str,
                  window: int = 48,
                  step: int = 24,
                  lstm_hidden: int = 128,
                  latent_dim: int = 32,
                  cluster_k: int = 16,
                  device: str = "cpu"):
    # 1. load
    frames = load_track_csv(csv_path)
    print("frames.shape =", frames.shape)

    # 2. preprocess
    frames_p = preprocess_pose(frames, hip_idx=0, left_sh_idx=11, right_sh_idx=12)
    print("preprocessed:", frames_p.shape)

    # 3. sliding window
    slices = sliding_window(frames_p, window=window, step=step)
    print("slices.shape =", slices.shape)  # (B, W, J, C)

    # 4. flatten
    slices_flat = flatten_slices(slices)  # (B, W, feat)
    B, W, feat = slices_flat.shape
    print("flattened:", slices_flat.shape)

    # 5. dataloader
    dataset = TensorDataset(torch.tensor(slices_flat, dtype=torch.float32))
    loader = DataLoader(dataset, batch_size=16, shuffle=True)

    # 6. model
    model = LSTMAutoencoder(feat_dim=feat, hidden_dim=lstm_hidden, latent_dim=latent_dim)
    # 7. train
    model = train_autoencoder(model, loader, epochs=30, lr=1e-3, device=device)

    # 8. extract embeddings
    infer_loader = DataLoader(dataset, batch_size=32, shuffle=False)
    embeddings = extract_embeddings(model, infer_loader, device=device)
    print("embeddings.shape =", embeddings.shape)

    # 9. clustering -> tokens
    labels = cluster_embeddings(embeddings, n_clusters=cluster_k)
    print("cluster labels unique:", np.unique(labels))

    # 10. tokens -> grammar (Sequitur)
    tokens = labels.tolist()
    grammar = sequitur_from_tokens(tokens)
    print("grammar rules:", grammar.keys())

    # 11. create token->symbol map (A,B,C...)
    symbols = [chr(ord('A') + i) for i in range(26)]
    token_to_symbol = {i: symbols[i % len(symbols)] + (str(i//26) if i>=26 else "") for i in range(cluster_k)}

    lsys = grammar_to_lsystem_json(grammar, token_to_symbol=token_to_symbol)
    print("L-system JSON preview:", json.dumps(lsys, indent=2)[:400])

    # save JSON
    with open("l_system.json", "w") as f:
        json.dump(lsys, f, indent=2)
    print("Saved l_system.json")
    return lsys

# -------------------------------
# If running as script:
# -------------------------------
if __name__ == "__main__":
    # 把路徑換成你的檔案
    lsys = main_pipeline("track_2.csv", window=48, step=24,
                         lstm_hidden=128, latent_dim=32, cluster_k=16, device="cpu")


  parsed = df.applymap(parse_tuple).to_numpy()  # (T, J) of tuples


frames.shape = (2657, 33, 3)
preprocessed: (2657, 33, 3)
slices.shape = (109, 48, 33, 3)
flattened: (109, 48, 99)
[Epoch 001] loss=1.557305
[Epoch 005] loss=0.896720
[Epoch 010] loss=0.883558
[Epoch 015] loss=0.860917
[Epoch 020] loss=0.850787
[Epoch 025] loss=0.835136
[Epoch 030] loss=0.830535
embeddings.shape = (109, 32)
cluster labels unique: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15]
grammar rules: dict_keys(['R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9', 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R0'])
L-system JSON preview: {
  "axiom": [
    "J",
    "A",
    "R2",
    "R4",
    "R6",
    "L",
    "R4",
    "R7",
    "G",
    "R8",
    "R8",
    "F",
    "F",
    "R10",
    "A",
    "R8",
    "A",
    "P",
    "A",
    "K",
    "R11",
    "G",
    "C",
    "I",
    "R1",
    "R1",
    "G",
    "E",
    "C",
    "R7",
    "E",
    "I",
    "R2",
    "R12",
    "R13",
    "M",
    "J",
    "M",
    "R14",
    "R14",
 
Saved l_system.json


