In [35]:
import cv2
import mediapipe as mp
import csv
import pandas as pd
import os
import numpy as np

In [None]:
# Initialize MediaPipe Pose and Drawing utilities
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [62]:
#DATASET_DIR = 'Single_person_violent'
DATASET_DIR = 'Final_Dataset'
CLASSES_LIST = ["Kicking","Punching","Block","Idle"]
OUTPUT_DIR = 'Output'
custom_headers = ['Frame Number', 'x','y','z','Visibility']

In [None]:
# Convert mediapipe landmarks into proper format for storing into a CSV file
def write_landmarks_to_csv(landmarks, frame_number, csv_data):
    #print(f"Landmark coordinates for frame {frame_number}:")
    for idx, landmark in enumerate(landmarks):
        #print(f"{mp_pose.PoseLandmark(idx).name}: (x: {landmark.x}, y: {landmark.y}, z: {landmark.z})")
        csv_data.append([frame_number, landmark.x, landmark.y, landmark.z, landmark.visibility])    

In [None]:
# Convert a video into landmarks using mediapipe
def convert_video_to_landmark_csv(video_path):
    frame_number = 0
    csv_data = []

    # Open the video file
    cap = cv2.VideoCapture(video_path)

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():            
            ret, frame = cap.read()
            if not ret:
                break
            frame_number += 1
            # Convert the image to RGB
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Perform pose detection
            results = pose.process(image)

            # Convert the image back to BGR
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Draw the pose annotation on the image
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # Add the landmark coordinates to the list and print them
            write_landmarks_to_csv(results.pose_landmarks.landmark, frame_number, csv_data)   
    cap.release() 

    # Extract file and class names from the video path
    file_name = os.path.splitext(os.path.basename(video_path))[0]
    class_name = os.path.basename(os.path.dirname(video_path))

    output_file = os.path.join(output_file_dir, f'{file_name}.csv')
    
    # Ensure the output directory exists
    output_file_dir = os.path.join(OUTPUT_DIR, class_name)        
    os.makedirs(output_file_dir, exist_ok=True)  

    with open(f'{output_file}.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        # Write each row of the 2D array
        for row in csv_data:
            writer.writerow(row)

In [None]:
# Convert all videos in the given CLASSES_LIST into landmark csv files
def create_dataset(CLASSES_LIST):
    print('List: ',CLASSES_LIST)
    for class_name in CLASSES_LIST:
        print(f"Extracting data from {class_name}")
        # Get list of videos for each class
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))
        for file_name in files_list:
            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)
            convert_video_to_landmark_csv(video_file_path)

In [None]:
# Create a sequence from a CSV file for making a single dataframe
def createSequence(csv_path, label):
    # Define custom headers for the CSV file
    custom_headers = ['Frame Number', 'x', 'y', 'z', 'Visibility']
    
    # Read the CSV file into a DataFrame
    data = pd.read_csv(csv_path, header=None, names=custom_headers)
    
    # Initialize the sequence with the label
    sequence = [label]
    
    # Group the data by 'Frame Number' and collect frame data
    grouped = data.groupby('Frame Number')[['x', 'y', 'z', 'Visibility']].apply(lambda x: x.values.tolist())
    
    # Extend the sequence with the grouped frame data
    sequence.extend(grouped.tolist())
    
    return sequence


In [None]:
# Read data from CSV and append to processed_df
sequence = createSequence('output.csv','Kicking')
processed_df = pd.DataFrame([sequence])
#processed_df2 = processed_df.append(pd.DataFrame(frame, columns=['x', 'y', 'z', 'Visibility']), ignore_index=True)
processed_df.head()

In [None]:
dat = pd.read_csv('output.csv',header=None,names=custom_headers)
print(dat.groupby('FrameNumber')['x'].apply(lambda x: x.tolist()).iloc[:1][1])

In [None]:
# Get all the CSV files of the given CLASSES_LIST
def getAllCSV(CLASSES_LIST):
    files = {}
    for class_name in CLASSES_LIST:        
        # Get list of csv files for each class
        files_list = os.listdir(os.path.join(OUTPUT_DIR, class_name))
        for file_name in files_list:
            # Get the complete csv path.
            files.setdefault(class_name,[]).append(os.path.join(OUTPUT_DIR, class_name, file_name))
    return files

In [None]:
# Merge all the sequences into a single dataframe and return it
def createDataframe():
    df_array = []    
    for class_name in CLASSES_LIST:        
        # Get list of csv files for each class
        files_list = os.listdir(os.path.join(OUTPUT_DIR, class_name))
        for file_name in files_list:
            # Get the complete csv path.
            csv_file_path = os.path.join(OUTPUT_DIR, class_name, file_name)            
            df_array.append(createSequence(csv_file_path,class_name))
    return pd.DataFrame(df_array)
            

In [None]:
# Calculate the mean and standard deviation of z values (depth values)
def calculateZParamsForNormalization(CLASSES_LIST,csv_files):
    custom_headers = ['FrameNumber','x','y','z','Visibility']
    z_values = []
    for key in CLASSES_LIST:
        for file in csv_files[key]:
            df = pd.read_csv(file,header=None,names=custom_headers)
            z_values.extend(df['z'].values)
    z_mean = np.mean(z_values)
    z_std = np.std(z_values)
    return z_mean, z_std

In [None]:
# Calculate the min and max of z values (depth values)
def calculateZMinMaxForNormalization(CLASSES_LIST,csv_files):
    custom_headers = ['FrameNumber','x','y','z','Visibility']
    z_values = []
    for key in CLASSES_LIST:
        for file in csv_files[key]:
            df = pd.read_csv(file,header=None,names=custom_headers)
            z_values.extend(df['z'].values)
    z_min = np.min(z_values)
    z_max = np.max(z_values)
    return z_min, z_max

In [None]:
# Normalize all the z values (depth values) in the CSV files
def normalizeZValuesInCSV(files,z_min, z_max):
    custom_headers = ['FrameNumber','x','y','z','Visibility']
    for key in files:
        for file in files[key]:
            df = pd.read_csv(file,header=None,names=custom_headers)
            # Normalize the 'z' column using Min-Max scaling
            df['z'] = (df['z'] - z_min) / (z_max - z_min)
            df.to_csv(file, index=False, header=False)

In [None]:
create_dataset(CLASSES_LIST)

In [None]:
complete_df = createDataframe()

In [None]:
complete_df[complete_df.iloc[:,0] == 'Punching'][:5]

In [None]:
files = getAllCSV(CLASSES_LIST)

In [None]:
df2 = pd.read_csv('output.csv',header=None,names=custom_headers)
arr = []
arr.extend(df2['z'].values)
arr

In [48]:
z_mean, z_std = calculateZParamsForNormalization(files)

In [60]:
z_min, z_max = calculateZMinMaxForNormalization(files)

In [49]:
z_mean, z_std
#(-0.10812381639618134, 0.44725407070063944)

(-0.10812381639618134, 0.44725407070063944)

In [61]:
z_min, z_max
#(-1.6407425403594973, 1.754867672920227)

(0.0, 1.0)

In [59]:
normalizeZValuesInCSV(files,z_min, z_max)