In [1]:
import pandas as pd
import csv
import pyarrow.parquet as pq
import numpy as np
import os
import json


ModuleNotFoundError: No module named 'pyarrow'

In [None]:
max_left_hand_index = 20
max_right_hand_index = 20
max_pose_index = 32

In [None]:
left_hand_columns = [f"left_hand_{i}" for i in range(max_left_hand_index + 1)]
right_hand_columns = [f"right_hand_{i}" for i in range(max_right_hand_index + 1)]
pose_columns = [f"pose_{i}" for i in range(max_pose_index + 1)]


# Combine all column headers into a single list
all_columns = [f"{col}_{coord}" for col in left_hand_columns for coord in ['x', 'y','z']] + \
              [f"{col}_{coord}" for col in right_hand_columns for coord in ['x', 'y','z']] + \
              [f"{col}_{coord}" for col in pose_columns for coord in ['x', 'y','z']] + \
              ['label']


In [None]:
train_df = pd.read_csv('Dataset_CSVs/train.csv')

In [None]:
selected_words = ["TV", "after",  "all", "alligator", "animal", "another", "any", "apple", "arm"]
# selected_words = ["TV", "after", "airplane", "all", "alligator"]

# Filter the dataframe to include only the selected words
filtered_df = train_df[train_df['sign'].isin(selected_words)]

# Group by 'sign' and select 10 sequences for each word
sub_df = filtered_df.groupby('sign').head(15)

In [None]:


# Initialize an empty list to store all rows of data
all_rows = []

# Iterate through each row in sub_df
for index, row in sub_df.iterrows():
    path = row['path']
    label = row['sign']
    
    # Read the Parquet file using PyArrow
    table = pq.read_table(path)
    
    # Convert PyArrow Table to Pandas DataFrame
    df = table.to_pandas()
    
    # Initialize a list to store rows of data
    rows = []
    
    # Iterate through each unique frame
    for frame in df['frame'].unique():
        # Filter rows for the current frame
        subset_df = df[df['frame'] == frame]
        
        # Initialize dictionaries to store landmarks
        face_dict = {}
        left_hand_dict = {}
        right_hand_dict = {}
        pose_dict = {}
        
        # Iterate through rows in subset_df and populate dictionaries
        for idx, row in subset_df.iterrows():
            landmark_type = row['type']
            landmark_index = row['landmark_index']
            x = row['x']
            y = row['y']
            z = row['z']
            
            if landmark_type == 'left_hand':
                left_hand_dict[f"left_hand_{landmark_index}_x"] = x
                left_hand_dict[f"left_hand_{landmark_index}_y"] = y
                left_hand_dict[f"left_hand_{landmark_index}_z"] = z
            elif landmark_type == 'right_hand':
                right_hand_dict[f"right_hand_{landmark_index}_x"] = x
                right_hand_dict[f"right_hand_{landmark_index}_y"] = y
                right_hand_dict[f"right_hand_{landmark_index}_z"] = z
            elif landmark_type == 'pose':
                pose_dict[f"pose_{landmark_index}_x"] = x
                pose_dict[f"pose_{landmark_index}_y"] = y
                pose_dict[f"pose_{landmark_index}_z"] = z
        
        # Combine dictionaries into a single row of data
        row_data = {
            **left_hand_dict,
            **right_hand_dict,
            **pose_dict,
            'label': label,
        }
        
        # Append row_data to rows list
        rows.append(row_data)
    
    # Extend rows to all_rows
    all_rows.extend(rows)

# Define CSV file path
csv_file = 'Dataset_CSVs/ASL_word_data_xyz.csv'

# Define column headers as the union of keys from all row_data dictionaries
# header = ['frame'] + sorted(set().union(*(row.keys() for row in all_rows)))

# Write rows to CSV file
with open(csv_file, 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=all_columns)
    writer.writeheader()
    
    # Iterate through all_rows and write each row to CSV
    for row_data in all_rows:
        # Round numerical values to 3 decimal places
        rounded_row_data = {key: round(value, 6) if isinstance(value, (int, float)) else value for key, value in row_data.items()}
        
        # Replace NaN values with 0.0
        cleaned_row_data = {key: (0.0 if pd.isna(value) else value) for key, value in rounded_row_data.items()}
        
        # Write the row to CSV
        writer.writerow(cleaned_row_data)

print(f"Data has been successfully written to {csv_file}")


In [5]:
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2

# Load the data from the CSV file
df = pd.read_csv('Dataset_CSVs/ASL_word_data_xyz.csv')

# Define landmark columns
left_hand_columns = [f"left_hand_{i}" for i in range(21)]
right_hand_columns = [f"right_hand_{i}" for i in range(21)]
pose_columns = [f"pose_{i}" for i in range(33)]

left_hand_coords = [f"{col}_{coord}" for col in left_hand_columns for coord in ['x', 'y', 'z']]
right_hand_coords = [f"{col}_{coord}" for col in right_hand_columns for coord in ['x', 'y', 'z']]
pose_coords = [f"{col}_{coord}" for col in pose_columns for coord in ['x', 'y', 'z']]



In [6]:

# Extract left hand coordinates and reshape into array
left_hand_coords_array = df[left_hand_coords].values.flatten()
right_hand_coords_array = df[right_hand_coords].values.flatten()
pose_coords_array = df[pose_coords].values.flatten()

In [7]:

def convert_to_mediapipe_format(coords_array, num_landmarks):
    if len(coords_array) != num_landmarks * 3:  # Each landmark has x, y, z
        raise ValueError(f"Input array should contain {num_landmarks * 3} values ({num_landmarks} landmarks * 3 coordinates)")

    mediapipe_landmarks = landmark_pb2.NormalizedLandmarkList()

    for i in range(0, len(coords_array), 3):
        x, y, z = coords_array[i:i+3]
        # Create a new NormalizedLandmark object and assign the x, y, z values
        landmark = landmark_pb2.NormalizedLandmark(x=x, y=y, z=z)
        # Append the landmark to the list
        mediapipe_landmarks.landmark.append(landmark)

    return mediapipe_landmarks

In [8]:
mediapipe_array_lefthand = convert_to_mediapipe_format(left_hand_coords_array, 21) 
mediapipe_array_righthand = convert_to_mediapipe_format(right_hand_coords_array, 21)  
mediapipe_array_pose = convert_to_mediapipe_format(pose_coords_array, 33) 

In [9]:
import cv2
import numpy as np
import mediapipe as mp

def visualize_and_save_landmarks(landmarks_dict, connections, filename):
    # Initialize MediaPipe drawing utilities
    mp_drawing = mp.solutions.drawing_utils

    # Create a blank image (white background)
    image = np.ones((480, 640, 3), dtype=np.uint8) * 255

    # Draw the landmarks on the image
    mp_drawing.draw_landmarks(
        image,
        landmark_list=landmarks_dict,
        connections=connections,
        landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=2),
        connection_drawing_spec=mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1)
    )

    # Save the image
    cv2.imwrite(filename, image)

    # Display the image (optional, you can remove this if you just want to save the images)
    cv2.imshow('Landmarks', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()




In [10]:
visualize_and_save_landmarks(mediapipe_array_lefthand, mp.solutions.hands.HAND_CONNECTIONS, 'left_hand_landmarks.png')
visualize_and_save_landmarks(mediapipe_array_righthand, mp.solutions.hands.HAND_CONNECTIONS, 'right_hand_landmarks.png')
visualize_and_save_landmarks(mediapipe_array_pose, mp.solutions.pose.POSE_CONNECTIONS, 'pose_landmarks.png')
