In [None]:
import pandas as pd
import numpy as np
import torch
import lightning.pytorch as pl
import matplotlib.pyplot as plt
# import plotly
import plotly.express as px

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# !rm -rf saved_model

In [None]:
import os
import torch
import pytorch_lightning as pl
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
from src.modules.lifter_2d_3d.model.linear_model.linear_model import BaselineModel
from src.modules.lifter_2d_3d.dataset.simple_keypoint_dataset import SimpleKeypointDataset
from src.modules.lifter_2d_3d.model.linear_model.lit_linear_model import LitSimpleBaselineLinear
from IPython.display import display

pl.seed_everything(1234)

# ------------
# args
# ------------
# parser = ArgumentParser()
# parser.add_argument('--batch_size', default=32, type=int)
# parser.add_argument('--hidden_dim', type=int, default=128)
# parser = pl.Trainer.add_argparse_args(parser)
# parser = LitClassifier.add_model_specific_args(parser)
# args = parser.parse_args()

# ------------
# data
# ------------
# dataset = MNIST('', train=True, download=True, transform=transforms.ToTensor())
# mnist_test = MNIST('', train=False, download=True, transform=transforms.ToTensor())
# mnist_train, mnist_val = random_split(dataset, [55000, 5000])

train_dataset = SimpleKeypointDataset(
    prediction_file="/root/data/processed/synthetic_cabin_bw/A_Pillar_Codriver/keypoint_detection_results/keypoint_detection_train.json",
    annotation_file="/root/data/processed/synthetic_cabin_bw/A_Pillar_Codriver/annotations/person_keypoints_train.json",
    image_width=1280,
    image_height=1024,
    exclude_ankle=True
)
val_dataset = SimpleKeypointDataset(
    prediction_file="/root/data/processed/synthetic_cabin_bw/A_Pillar_Codriver/keypoint_detection_results/keypoint_detection_val.json",
    annotation_file="/root/data/processed/synthetic_cabin_bw/A_Pillar_Codriver/annotations/person_keypoints_val.json",
    image_width=1280,
    image_height=1024,
    exclude_ankle=True
)
print('train_dataset', len(train_dataset), 'val_dataset', len(val_dataset))
train_loader = DataLoader(train_dataset, batch_size=512, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=512, drop_last=True)
# test_loader = DataLoader(mnist_test, batch_size=args.batch_size)

# ------------
# model
# ------------
# model = LitClassifier(Backbone(hidden_dim=args.hidden_dim), args.learning_rate)
lit_model = LitSimpleBaselineLinear(exclude_ankle=True)
# ------------
# training
# ------------
saved_model_path = './saved_lifter_2d_3d_model/synthetic_cabin_bw/A_Pillar_Codriver/prediction/linear_model/'
if not os.path.exists(saved_model_path):
    os.makedirs(saved_model_path)

trainer = pl.Trainer(
    # max_steps=10,
    max_epochs=500,
    # callbacks=[TQDMProgressBar(refresh_rate=5)],
    # val_check_interval=10,
    # accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    accelerator='cpu',
    check_val_every_n_epoch=5,
    default_root_dir=saved_model_path,
    # gradient_clip_val=1.0
)
trainer.fit(lit_model, train_loader, val_loader)
# ------------
# testing
# ------------
# result = trainer.test(test_dataloaders=test_loader)
# print(result)

In [None]:
!ls /root/data/processed/synthetic_cabin_bw/A_Pillar_Codriver/annotations

In [None]:
def generate_connection_line(vals):
    L = 0
    C = 1
    R = 2
    connections = [
        (0, 1, L, 'nose_left_eye'), # nose & left_eye
        (0, 2, R, 'nose_right_eye'), # nose & right_eye
        (1, 2, C, 'left_right_eye'), # left & right eyes
        (1, 3, L, 'left_ear_left_eye'), # left ear & eye
        (2, 4, R, 'right_ear_right_eye'), # right ear & eye
        # (0, 5, L, 'nose_left_shoulder'), # nose & left shoulder
        # (0, 6, R, 'nose_right_shoulder'), # nose & right shoulder
        # (3, 5, L, 'left_ear_shoulder'), # left ear & shoulder
        # (4, 6, R, 'right_ear_shoulder'), # right ear & shoulder
        (5, 6, C, 'left_shoulder_right_sholder'), # left & right shoulder
        (5, 7, L, 'left_sholder_left_elbow'), # left shoulder & elbow
        (5, 11, L, 'left_shoulder_left_hip'), # left shoulder & hip
        (6, 8, R, 'right_shoulder_right_elbow'), # right shoulder & elbow
        (6, 12, R, 'right_shoulder_right_hip'), # right shoulder & hip
        (7, 9, L, 'left_elbow_left_wrist'), # left elbow & wrist
        (8, 10, R, 'right_elbow_right_wrist'), # right elbow & wrist
        (11, 12, C, 'left_hip_right_hip'), # left & right hip
        (11, 13, L, 'left_hip_left_knee'), # left hip & knee
        (12, 14, R, 'right_hip_right_knee'), # right hip & knee
        # (13, 15, L, 'left_knee_left_ankle'), # left knee & ankle
        # (14, 16, R, 'right_knee_right_ankle') # right knee & ankle
    ]
    connection_lines = []

    connection_count = 0
    for i, connection in enumerate(connections):
        x, y, z = [np.array([vals[connection[0], j], vals[connection[1], j]]) for j in range(3)]
        for px, py, pz in zip(x, y, z):
            connection_lines.append({
                # "line": connection_count,
                "line": connection[3],
                "left_right": connection[2],
                "x": px,
                "y": py,
                "z": pz
            })
        connection_count += 1
    return connection_lines

In [None]:
# loader = train_loader
loader = val_loader
sample = None
count = 0
item_index = 5
for item in iter(loader):
    sample = item
    if (count + 1) % item_index == 0:
        break
    count += 1

In [None]:
# keypoint_df = pd.DataFrame({
#     'name': train_dataset.metadata['keypoints'],
#     'x': train_dataset.raw_data[0]['keypoints3D'][:, 0],
#     'y': train_dataset.raw_data[0]['keypoints3D'][:, 1],
#     'z': train_dataset.raw_data[0]['keypoints3D'][:, 2],
# })
results = generate_connection_line(sample[1][0].detach().numpy().reshape(-1, 3))
pose_df = pd.DataFrame(results)

fig = px.line_3d(pose_df, x="z", y="x", z="y", color="line")
fig.update_layout(
    scene={
        'xaxis': {'autorange': 'reversed'}, # reverse automatically
        'zaxis': {'autorange': 'reversed'},
    }
)
fig.show()

In [None]:
model = LitSimpleBaselineLinear.load_from_checkpoint(
    'saved_lifter_2d_3d_model/linear_model/lightning_logs/version_12/checkpoints/epoch=99-step=7300.ckpt'
)
model.eval()
estimated_pose = model(sample[0].float().squeeze(2), 0)
estimated_pose_df = pd.DataFrame(generate_connection_line(estimated_pose[0].reshape([-1, 3]).detach().numpy()))
fig = px.line_3d(estimated_pose_df, x="z", y="x", z="y", color="line")
fig.update_layout(
    scene={
        'xaxis': {'autorange': 'reversed'}, # reverse automatically
        'zaxis': {'autorange': 'reversed'},
    }
)
fig.show()

In [None]:
# pd.DataFrame({
#     'name': train_dataset.metadata['keypoints'],
#     'keypoint3D_1': train_dataset[0]['keypoints3D'][:, 0],
#     'keypoint3D_2': train_dataset[0]['keypoints3D'][:, 1],
#     'keypoint3D_3': train_dataset[0]['keypoints3D'][:, 2],
# })
# left_eye = train_dataset[0]['keypoints3D'][2]
# left_sholder = train_dataset[0]['keypoints3D'][5]
# left_hip = train_dataset[0]['keypoints3D'][11]
# left_knee = train_dataset[0]['keypoints3D'][13]
# left_ankle = train_dataset[0]['keypoints3D'][15]

# bone_length = lambda row1, row2: np.sqrt(np.sum((row2 - row1) ** 2))

# ( 
#     bone_length(left_ankle, left_knee)
#     + bone_length(left_knee, left_hip)
#     + bone_length(left_hip, left_sholder)
#     + bone_length(left_sholder, left_eye)
# )