# Human Action Classifier
**Schuldt et al. 2004. Recognizing Human Actions: A Local SVM Approach**

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import cv2
import re

from pathlib import Path
from collections import Counter
from sklearn.cluster import MiniBatchKMeans
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from joblib import dump

In [None]:
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(device)

cpu


### 1. Data Loading

In [3]:
K = 400
ACTIONS = ["boxing", "handclapping", "handwaving", "jogging", "running", "walking" ]
action_to_id = {a: i for i, a in enumerate(ACTIONS)}
pattern = re.compile(r"person(?P<person>\d+)_(?P<action>[a-z]+)_d(?P<scene>\d)")

def index_data(root="Data"):
    samples = []
    for video in Path(root).glob("*.avi"):
        match = pattern.search(video.name)
        if match is None:
            continue
        samples.append({
            "path": video,
            "label": action_to_id[match.group("action")],
            "action": match.group("action"),
            "person": int(match.group("person")),
            "scene": int(match.group("scene")),
        })
    return samples

In [4]:
# Import all data
samples = index_data()
print("Total:", Counter(s["action"] for s in samples))

# Train/validation/test split (70-15-15)
train_samples = [s for s in samples if s["person"] <= 17]
val_samples = [s for s in samples if 17 < s["person"] <= 21]
test_samples = [s for s in samples if s["person"] > 21]
print("Train:", len(train_samples), "Validation:", len(val_samples), "Test:", len(test_samples))

Total: Counter({'boxing': 100, 'running': 100, 'handwaving': 100, 'walking': 100, 'jogging': 100, 'handclapping': 99})
Train: 407 Validation: 96 Test: 96


### 2. Data Processing
#### 2.1. Transform Video To Image Sequence

In [5]:
def load_video(path, resize=(160, 120)):
    cap = cv2.VideoCapture(str(path))
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame = cv2.resize(frame, resize)
        frame = frame.astype(np.float32) / 255.0
        frames.append(frame)

    cap.release()
    video = np.stack(frames)   # (T, H, W)
    return torch.from_numpy(video)

#### 2.2. Construct Gaussian Scale Space
$$L(路 , \sigma^2, \tau^2) = f \times g(路 , \sigma^2, \tau^2)$$

In [6]:
def gaussian_1d(kernel_size, sigma):
    x = torch.arange(kernel_size) - kernel_size // 2
    g = torch.exp(-(x ** 2) / (2 * (sigma ** 2)))
    return g / g.sum()

def gaussian_blur_3d(video, sigma, tau):
    # spatial blur
    g_xy = gaussian_1d(7, sigma)
    g_xy = g_xy[None, None, :, None] * g_xy[None, None, None, :]
    video = F.conv2d(video.unsqueeze(1), g_xy, padding=3).squeeze(1)

    # temporal blur
    g_t = gaussian_1d(7, tau)[None, None, :, None, None]
    video = F.conv3d(video.unsqueeze(0), g_t, padding=(3,0,0)).squeeze(0)

    return video

#### 2.3. Compute Second-moment Matrix From Spatio-temporal Gradients

$$\nabla L = (L_x, L_y, L_t)^T$$
$$\mu(路; \sigma^2, \tau^2) = g(路; s\sigma^2, s\tau^2) \times (\nabla L(\nabla L)^T)$$

In [7]:
def gradients_3d(L):
	Lx = L[:, :, 2:] - L[:, :, :-2]
	Ly = L[:, 2:, :] - L[:, :-2, :]
	Lt = L[2:, :, :] - L[:-2, :, :]

	T = min(Lx.shape[0], Ly.shape[0], Lt.shape[0])
	H = min(Lx.shape[1], Ly.shape[1], Lt.shape[1])
	W = min(Lx.shape[2], Ly.shape[2], Lt.shape[2])

	return Lx[:T, :H, :W], Ly[:T, :H, :W], Lt[:T, :H, :W]

def second_moment_matrix(Lx, Ly, Lt, sigma=2.0, tau=1.5):
    J_xx = Lx * Lx
    J_xy = Lx * Ly
    J_xt = Lx * Lt
    J_yy = Ly * Ly
    J_yt = Ly * Lt
    J_tt = Lt * Lt

    def smooth(x):
        return gaussian_blur_3d(x, sigma, tau)

    return smooth(J_xx), smooth(J_xy), smooth(J_xt), smooth(J_yy), smooth(J_yt), smooth(J_tt)

#### 2.4. Detect Interest Point Using Harris Response

$$H = det(\mu) - k * trace^3(\mu)$$

In [8]:
def harris_response(J, k=0.005):
    J_xx, J_xy, J_xt, J_yy, J_yt, J_tt = J

    det = (
        J_xx * (J_yy * J_tt - J_yt ** 2)
        - J_xy * (J_xy * J_tt - J_xt * J_yt)
        + J_xt * (J_xy * J_yt - J_xt * J_yy)
    )

    trace = J_xx + J_yy + J_tt
    return det - k * trace ** 3

def detect_interest_points(H, threshold_ratio=0.01):
    threshold = threshold_ratio * H.max()
    points = torch.nonzero(H > threshold)
    return points

#### 2.5. Extract Spatio-temporal Descriptors

$$\bold{l} = (L_x, L_y, L_t, L_{xx}, ..., L_{tttt}$$

In [9]:
def extract_jet(L, x, y, t, sigma, tau):
    # first order
    Lx = L[t, y, x+1] - L[t, y, x-1]
    Ly = L[t, y+1, x] - L[t, y-1, x]
    Lt = L[t+1, y, x] - L[t-1, y, x]

    # second order
    Lxx = L[t, y, x+1] - 2*L[t, y, x] + L[t, y, x-1]
    Lyy = L[t, y+1, x] - 2*L[t, y, x] + L[t, y-1, x]
    Ltt = L[t+1, y, x] - 2*L[t, y, x] + L[t-1, y, x]

    jet = torch.tensor([
        sigma * Lx, sigma * Ly, tau * Lt,
        sigma**2 * Lxx, sigma**2 * Lyy, tau**2 * Ltt
    ], device=L.device)

    return jet

def extract_descriptors(video, sigma=1.5, tau=1.0):
    L = gaussian_blur_3d(video, sigma, tau)
    Lx, Ly, Lt = gradients_3d(L)
    J = second_moment_matrix(Lx, Ly, Lt, 2*sigma, 2*tau)
    H = harris_response(J)

    points = detect_interest_points(H)
    desc = []

    for t, y, x in points:
        if (
            t > 1 and y > 1 and x > 1 and
            t < L.shape[0]-2 and
            y < L.shape[1]-2 and
            x < L.shape[2]-2
        ):
            desc.append(extract_jet(L, x, y, t, sigma, tau))

    if len(desc) == 0:
        return torch.empty((0, 6), device=device)

    return torch.stack(desc)

#### 2.6. Build Visual Vocabulary

In [None]:
def build_vocabulary(train_samples, K=400, max_samples=100_000):
    all_desc = []

    for s in train_samples:
        video = load_video(s["path"])
        video = video.to(device)
        desc = extract_descriptors(video)
        all_desc.append(desc)

    X = torch.cat(all_desc).cpu().numpy()

    if len(X) > max_samples:
        idx = np.random.choice(len(X), max_samples, replace=False)
        X = X[idx]

    kmeans = MiniBatchKMeans(
        n_clusters=K,
        batch_size=4096,
        random_state=0
    )
    kmeans.fit(X)
    return kmeans

In [None]:
print("Learning vocabulary...")
kmeans = build_vocabulary(train_samples, K)
dump(kmeans, "artifact/kmeans_K400.joblib")

Learning vocabulary...


[mpeg4 @ 0x311d45880] ac-tex damaged at 8 6
[mpeg4 @ 0x311d45880] Error at MB: 74


#### 2.7. Encode Video as Histogram

In [12]:
def encode_video(video, kmeans, K):
    desc = extract_descriptors(video)

    if desc.numel() == 0:
        return torch.zeros(K)

    labels = kmeans.predict(desc.cpu().numpy())
    hist = np.bincount(labels, minlength=K).astype(np.float32)
    hist /= (hist.sum() + 1e-8)

    return torch.from_numpy(hist)

def build_dataset(samples, kmeans, K):
    X, y = [], []

    for s in samples:
        # Load video
        video = load_video(s["path"])
        video = video.to(device)
        
		# Encode video
        hist = encode_video(video, kmeans, K)
        
		# Build dataset
        X.append(hist)
        y.append(s["label"])

    return torch.stack(X), torch.tensor(y)

In [13]:
X_train, y_train = build_dataset(train_samples, kmeans, K)
X_val, y_val = build_dataset(val_samples, kmeans, K)
X_test, y_test = build_dataset(test_samples, kmeans, K)

print("X_train", X_train[0], "y_train", y_train[0])
print("X_val", X_val[0], "y_val", y_val[0])
print("X_test", X_test[0], "y_test", y_test[0])

[mpeg4 @ 0x3057ca5b0] ac-tex damaged at 8 6
[mpeg4 @ 0x3057ca5b0] Error at MB: 74


X_train tensor([2.7688e-04, 6.7106e-03, 7.8764e-03, 1.2387e-04, 0.0000e+00, 2.9145e-05,
        7.2862e-06, 3.0675e-03, 4.3717e-04, 7.8764e-03, 3.1258e-03, 7.7234e-03,
        1.7392e-02, 3.4974e-04, 6.6086e-03, 2.1859e-05, 6.9729e-03, 1.9454e-03,
        4.7360e-04, 6.8126e-03, 7.0676e-04, 5.7051e-03, 1.9090e-03, 9.6906e-04,
        2.2150e-03, 6.3754e-03, 9.4720e-03, 5.8289e-05, 4.0074e-04, 7.2862e-06,
        3.6431e-05, 2.5647e-03, 6.6304e-04, 7.7234e-04, 7.3590e-03, 3.4318e-03,
        2.4773e-04, 1.9119e-02, 0.0000e+00, 1.1621e-02, 4.3717e-05, 8.7434e-05,
        1.4572e-05, 5.2461e-04, 3.6431e-05, 0.0000e+00, 1.1002e-03, 5.7852e-03,
        4.5174e-04, 3.4609e-03, 1.1665e-02, 0.0000e+00, 0.0000e+00, 6.9947e-04,
        0.0000e+00, 5.1805e-03, 4.9692e-03, 6.5576e-05, 7.2862e-05, 0.0000e+00,
        0.0000e+00, 7.2862e-06, 3.7087e-03, 3.6431e-05, 4.3717e-05, 1.3895e-02,
        4.2989e-03, 0.0000e+00, 1.4572e-05, 3.6431e-05, 2.4044e-03, 1.3115e-04,
        7.3590e-03, 1.7924e-03, 

In [None]:
save_dir = Path("artifact")
torch.save({
	"k": 400,
	"actions": ["boxing", "handclapping", "handwaving", "jogging", "running", "walking" ],
	"X_train": X_train, "y_train": y_train,
	"X_val": X_val, "y_val": y_val,
	"X_test": X_test, "y_test": y_test,
}, save_dir / "kth_features.pt")

### 3. Train Classifier

In [14]:
def train_svm(X, y):
    scaler = StandardScaler()
    Xs = scaler.fit_transform(X.numpy())

    clf = LinearSVC(C=1.0, max_iter=5000)
    clf.fit(Xs, y.numpy())

    return clf, scaler

In [15]:
print("Training classifier...")
clf, scaler = train_svm(X_train, y_train)

Training classifier...


### 4. Model Evaluation

In [16]:
def evaluate(clf, scaler, X, y):
    Xs = scaler.transform(X.numpy())
    y_pred = clf.predict(Xs)

    acc = accuracy_score(y.numpy(), y_pred)
    cm = confusion_matrix(y.numpy(), y_pred)

    return acc, cm

In [None]:
print("---- Validation ----")
val_acc, _ = evaluate(clf, scaler, X_val, y_val)
print(f"Val accuracy: {val_acc*100:.2f}%")

---- Validation ----
Val accuracy: 79.17%


In [18]:
print("---- Test ----")
test_acc, cm = evaluate(clf, scaler, X_test, y_test)
print(f"Test accuracy: {test_acc*100:.2f}%")
print(cm)

---- Test ----
Test accuracy: 71.88%
[[ 7  6  3  0  0  0]
 [ 1 14  0  1  0  0]
 [ 2  0 12  0  1  1]
 [ 0  0  1 13  2  0]
 [ 0  0  0  6  9  1]
 [ 0  1  0  1  0 14]]
