In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
def fix_labels(label):
    if label=='Throwing':
        return 'Throwing things'
    return label

# Label Encoder

In [None]:
# import label encoder
from sklearn.preprocessing import LabelEncoder
data=pd.read_csv('./real_data/keypoints_with_labels_1.csv')
data=data.dropna(subset=['Action Label'])
y=data['Action Label'].to_numpy()
label_encoder = LabelEncoder()
data['Action Label'] = label_encoder .fit_transform(y)
# data['Action Label'].value_counts().plot(kind='bar')



# Load dataset

In [None]:
dfs = []
import os
for file in os.listdir('./real_data/'):
    print('file ', file)
    if file.endswith('.csv'):
        df = pd.read_csv(os.path.join('./real_data/', file)).dropna(subset=['Action Label'])
        print(df['Action Label'].unique())
        df['Action Label'] = df['Action Label'].apply(fix_labels)
        df['Action Label'] = label_encoder.transform(df['Action Label'])
        df['Action Label'] =df['Action Label'].astype('category')
        df['ID']= file.split('.')[0].split('_')[-1]
        df['ID']=df['ID'].astype('category')
        dfs.append(df)

In [None]:
final_df=pd.concat(dfs, ignore_index=True)
final_df['Action Label'].value_counts().plot(kind='bar')

# Data Visualization

In [None]:
for col in data.columns:
    if col in ['Action Label','frame_id']:
        continue
    sns.boxplot(x='Action Label', y=col, data=final_df)
    plt.xticks(rotation=90)

    plt.show()

# Split train-test

In [None]:
final_df_train=final_df[final_df['ID']!='1']
final_df_test=final_df[final_df['ID']=='1']

In [None]:
final_df_test

# Extract time series domain features
## In this tutorial tsfel was chosen to demonstrate to you. But you can use any library or you may even self writing code for that :)

In [None]:
personal_dir='./tsfel_feat.json'
feature_path='./custom_features.py'

In [None]:
from tsfel.utils.add_personal_features import add_feature_json
add_feature_json('./custom_features.py',personal_dir)

In [None]:
import tsfel
cfg= tsfel.get_features_by_domain(['statistical','Custom','temporal'],json_path=personal_dir)
# You can chose from ['statistical', 'temporal', 'spectral', 'fractal', 'Custom'] domains


# Extract time series domain features using TSFEL
final_df_train_tsfel = tsfel.time_series_features_extractor(cfg, final_df_train,
                                                            window_size=30, overlap=0.5, fs=30,
                                                            features_path=feature_path)
final_df_test_tsfel = tsfel.time_series_features_extractor(cfg, final_df_test, 
                                                           window_size=30, overlap=0.5, fs=30,
                                                           features_path=feature_path)



In [None]:
# save Action Label as feature
label_train = final_df_train_tsfel['Action Label_Mode']
label_test = final_df_test_tsfel['Action Label_Mode']

# remove Action Label and id columns from train set
final_df_train_tsfel = final_df_train_tsfel.loc[:, ~final_df_train_tsfel.columns.str.contains('Action Label|id|ID')]
final_df_train_tsfel['Action Label'] = label_train  #add Action Label column back

# remove Action Label and id columns from test set
final_df_test_tsfel = final_df_test_tsfel.loc[:, ~final_df_test_tsfel.columns.str.contains('Action Label|id|ID')]
final_df_test_tsfel['Action Label'] = label_test #add Action Label column back

In [None]:
X_train = final_df_train_tsfel.drop(columns=['Action Label']).astype(float)
y_train= final_df_train_tsfel['Action Label'].astype(int)
X_test = final_df_test_tsfel.drop(columns=['Action Label']).astype(float)
y_test= final_df_test_tsfel['Action Label'].astype(int)

In [None]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

# Machine learning models

In [None]:
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# model = XGBClassifier(eval_metric='mlogloss',base_score=0.5, use_label_encoder=True)
model=HistGradientBoostingClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))
ax,fig=plt.subplots(figsize=(10, 7))
sns.heatmap(confusion_matrix(y_test, y_pred),annot=True,fmt='d', cmap='Blues')

In [None]:
labels= label_encoder.classes_
labels

## Why the result so poor???

In [None]:
for col in data.columns:
    if col in ['Action Label','frame_id']:
        continue
    sns.boxplot(x='Action Label', y=col, data=final_df,hue='ID')
    plt.xticks(rotation=90)

    plt.show()

# Relative features extraction

## In this tutorial, I presented the relative position feature, which is the relative position of one body part to another. This tutorial will select left ear as basepoint for the feature.

### $$ relative\_position = position_A - position_B $$

### where $A,\;B $ is the part of body
#### <center>____________________!!!You should note that x, y axis matters!!!____________________</center>

In [None]:

for col in final_df_train.columns:
    if col in ['Action Label','frame_id','ID']:
        continue
    if col.endswith('_x'):
        final_df_train[col]=final_df_train['left_ear_x']-final_df_train[col]
    if col.endswith('_y'):
        final_df_train[col]=final_df_train['left_ear_y']-final_df_train[col]
for col in final_df_test.columns:
    if col in ['Action Label','frame_id','ID']:
        continue
    if col.endswith('_x'):
        final_df_test[col]=final_df_test['left_ear_x']-final_df_test[col]
    if col.endswith('_y'):
        final_df_test[col]=final_df_test['left_ear_y']-final_df_test[col]

# You may define a function to extract more feature such as angle, or velocity, or acceleration:
### $$\theta=arccos(\frac{\overrightarrow{x}.\overrightarrow{y}}{ \|x\| \|y\|}) $$
### $$ Vel=\sqrt{\Delta x^2+\Delta y^2} $$
### $$ Acc=\Delta Vel $$

#### this part is for you :))))))))

In [9]:
import tsfel
cfg= tsfel.get_features_by_domain(['statistical','Custom','temporal'],json_path=personal_dir)
# You can chose from ['statistical', 'temporal', 'spectral', 'fractal', 'Custom'] domains


# Extract time series domain features using TSFEL
final_df_train_tsfel = tsfel.time_series_features_extractor(cfg, final_df_train,
                                                            window_size=30, overlap=0.5, fs=30,
                                                            features_path=feature_path)
final_df_test_tsfel = tsfel.time_series_features_extractor(cfg, final_df_test, 
                                                           window_size=30, overlap=0.5, fs=30,
                                                           features_path=feature_path)



NameError: name 'personal_dir' is not defined

In [None]:
# save Action Label as feature
label_train = final_df_train_tsfel['Action Label_Mode']
label_test = final_df_test_tsfel['Action Label_Mode']

# remove Action Label and id columns from train set
final_df_train_tsfel = final_df_train_tsfel.loc[:, ~final_df_train_tsfel.columns.str.contains('Action Label|id|ID')]
final_df_train_tsfel['Action Label'] = label_train  #add Action Label column back

# remove Action Label and id columns from test set
final_df_test_tsfel = final_df_test_tsfel.loc[:, ~final_df_test_tsfel.columns.str.contains('Action Label|id|ID')]
final_df_test_tsfel['Action Label'] = label_test #add Action Label column back

In [None]:
X_train = final_df_train_tsfel.drop(columns=['Action Label']).astype(float)
y_train= final_df_train_tsfel['Action Label'].astype(int)
X_test = final_df_test_tsfel.drop(columns=['Action Label']).astype(float)
y_test= final_df_test_tsfel['Action Label'].astype(int)

In [None]:
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# model = XGBClassifier(eval_metric='mlogloss',base_score=0.5, use_label_encoder=True)
model=HistGradientBoostingClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))
ax,fig=plt.subplots(figsize=(10, 7))
sns.heatmap(confusion_matrix(y_test, y_pred),annot=True,fmt='d', cmap='Blues')

In [None]:
labels= label_encoder.classes_
labels

In [None]:

# import pandas as pd
# import matplotlib.pyplot as plt
# import os
# import numpy as np

# # Define the keypoints and their connections for drawing the skeleton.
# # This assumes a 17-point model similar to COCO.
# # Adjust KEYPOINT_NAMES and SKELETON_CONNECTIONS if your CSV uses a different set of keypoints.
# KEYPOINT_NAMES = [
#     'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
#     'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
#     'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
#     'left_knee', 'right_knee', 'left_ankle', 'right_ankle'
# ]

# SKELETON_CONNECTIONS = [
#     # Head
#     ('nose', 'left_eye'), ('nose', 'right_eye'),
#     ('left_eye', 'left_ear'), ('right_eye', 'right_ear'),
#     # Torso
#     ('left_shoulder', 'right_shoulder'), ('left_hip', 'right_hip'),
#     ('left_shoulder', 'left_hip'), ('right_shoulder', 'right_hip'),
#     # Left arm
#     ('left_shoulder', 'left_elbow'), ('left_elbow', 'left_wrist'),
#     # Right arm
#     ('right_shoulder', 'right_elbow'), ('right_elbow', 'right_wrist'),
#     # Left leg
#     ('left_hip', 'left_knee'), ('left_knee', 'left_ankle'),
#     # Right leg
#     ('right_hip', 'right_knee'), ('right_knee', 'right_ankle')
# ]

# # --- Configuration ---



# def draw_pose(pose_data_row, frame_id, action_label):
#     """
#     Draws a single pose and saves it as an image.

#     Args:
#         pose_data_row (pd.Series): A row from the DataFrame containing keypoint coordinates and label.
#         frame_id (int): The identifier for this frame (e.g., row index).
#         action_label (str): The action label for this pose.
#     """
#     if frame_id % 100 != 0:  # Print progress every 100 frames
#         return
#     keypoints = {}
#     valid_keypoints_present = False
#     for kp_name in KEYPOINT_NAMES:
#         x_col, y_col = f'{kp_name}_x', f'{kp_name}_y'
#         if x_col in pose_data_row and y_col in pose_data_row:
#             x, y = pose_data_row[x_col], pose_data_row[y_col]
#             # Check if coordinates are valid numbers (not NaN)
#             if pd.notna(x) and pd.notna(y):
#                 keypoints[kp_name] = (float(x), float(y))
#                 valid_keypoints_present = True
#         else:
#             # print(f"Warning: Keypoint columns {x_col} or {y_col} not found for frame {frame_id}.")
#             pass # Silently skip if columns are missing

#     if not valid_keypoints_present:
#         print(f"Skipping frame {frame_id} for label '{action_label}' due to no valid keypoints.")
#         return

#     fig, ax = plt.subplots(figsize=IMAGE_FIGSIZE)

#     # Plot connections
#     for kp_name1, kp_name2 in SKELETON_CONNECTIONS:
#         if kp_name1 in keypoints and kp_name2 in keypoints:
#             x1, y1 = keypoints[kp_name1]
#             x2, y2 = keypoints[kp_name2]
#             ax.plot([x1, x2], [y1, y2], color=SKELETON_LINE_COLOR, linewidth=SKELETON_LINE_WIDTH, zorder=1)

#     # Plot keypoints
#     for kp_name, (x, y) in keypoints.items():
#         ax.plot(x, y, 'o', color=JOINT_COLOR, markersize=JOINT_SIZE, zorder=2)
#     # hide x and y ticks
#     ax.set_xticks([])
#     ax.set_yticks([])
#     # hide the border
#     for spine in ax.spines.values():
#         spine.set_visible(False)

#     # Set plot appearance
#     # ax.set_title(f'Action: {action_label} - Frame: {frame_id}')
#     # ax.set_xlabel('X coordinate')
#     # ax.set_ylabel('Y coordinate')
    
#     # Invert Y axis to match typical image coordinate systems (origin at top-left)
#     ax.invert_yaxis()
#     # Ensure aspect ratio is equal so the skeleton is not distorted
#     ax.set_aspect('equal', adjustable='box')
    
#     # Determine plot limits based on keypoint data
#     all_x = [kp[0] for kp in keypoints.values()]
#     all_y = [kp[1] for kp in keypoints.values()]

#     if all_x and all_y: # Ensure there are points to calculate limits from
#         min_x, max_x = min(all_x), max(all_x)
#         min_y, max_y = min(all_y), max(all_y)
        
#         # Add some padding
#         padding_x = (max_x - min_x) * 0.1 if (max_x - min_x) > 0 else 10
#         padding_y = (max_y - min_y) * 0.1 if (max_y - min_y) > 0 else 10
        
#         ax.set_xlim(min_x - padding_x, max_x + padding_x)
#         ax.set_ylim(max_y + padding_y, min_y - padding_y) # Y-axis is inverted
#     else: # Default view if no keypoints or single keypoint
#         ax.autoscale_view()


#     # Create directory and save image
#     output_dir = os.path.join(OUTPUT_BASE_DIR, str(action_label))
#     os.makedirs(output_dir, exist_ok=True)
    
#     # Sanitize frame_id for filename (if it could be non-integer or problematic)
#     safe_frame_id = str(frame_id).replace('/', '_').replace('\\', '_')
#     output_path = os.path.join(output_dir, f'{safe_frame_id}.png')
    
#     try:
#         plt.savefig(output_path)
#         # print(f'Saved: {output_path}')
#     except Exception as e:
#         print(f"Error saving figure {output_path}: {e}")
#     finally:
#         plt.close(fig) # Close the figure to free memory

# def main():
#     # Check if CSV file exists
#     if not os.path.exists(CSV_FILE_PATH):
#         print(f"Error: CSV file not found at '{CSV_FILE_PATH}'")
#         print("Please make sure the file is in the same directory as the script, or provide the full path.")
#         return

#     # Load the dataset
#     try:
#         df = pd.read_csv(CSV_FILE_PATH)
#     except Exception as e:
#         print(f"Error reading CSV file: {e}")
#         return

#     # Ensure the 'label' column exists
#     if 'Action Label' not in df.columns:
#         print("Error: 'label' column not found in the CSV file.")
#         print(f"Available columns are: {df.columns.tolist()}")
#         return
        
#     print(f"Processing {len(df)} poses from '{CSV_FILE_PATH}'...")

#     # Iterate over each row (pose) in the DataFrame
#     for index, row in df.iterrows():
#         action_label = row['Action Label']
#         # Sanitize action_label for directory name
#         # Replace characters that are problematic for directory names
#         safe_action_label = str(action_label).replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_').strip()
#         if not safe_action_label: # Handle empty or whitespace-only labels
#             safe_action_label = "unknown_action"
            
#         draw_pose(row, index, safe_action_label)
#         if (index + 1) % 50 == 0: # Print progress every 50 frames
#              print(f"Processed {index + 1}/{len(df)} frames...")


#     print(f"Finished processing. Skeleton images saved in '{OUTPUT_BASE_DIR}'")
# for i in os.listdir('./real_data'):
#     id=i.split('.')[0].split('_')[-1]
#     CSV_FILE_PATH = f'./real_data/keypoints_with_labels_{id}.csv'
#     OUTPUT_BASE_DIR = f'./skeleton_frames/{id}'
#     IMAGE_FIGSIZE = (8, 8)  # Width, Height in inches for the output image
#     SKELETON_LINE_COLOR = 'cornflowerblue'
#     SKELETON_LINE_WIDTH = 2
#     JOINT_COLOR = 'orangered'
#     JOINT_SIZE = 5
#     main()