<a href="https://colab.research.google.com/github/Nobobi-Hasan/PointNeXt-PartSegmentation-FallenTrees/blob/main/PointNeXt_01_03_Data_Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mount Drive & Define Paths

In [None]:
from google.colab import drive
import os
import numpy as np
import shutil
import json
from tqdm import tqdm

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Path to the project root in Drive
DRIVE_PROJECT_ROOT = "/content/drive/MyDrive/ML_Projects/PointNeXt"

# Define project's Data directory
DRIVE_DATA_DIR = os.path.join(DRIVE_PROJECT_ROOT, "Data")

# Input/Output paths
DRIVE_ZIP_PATH = os.path.join(DRIVE_DATA_DIR, "data.zip")
LOCAL_DATA_DIR = "/content/data/data"
LOCAL_PROCESSED_DIR = "/content/processed_data"
FINAL_ZIP_NAME = "processed_data.zip"
FINAL_DRIVE_PATH = os.path.join(DRIVE_DATA_DIR, FINAL_ZIP_NAME)

print(f"Project Root: {DRIVE_PROJECT_ROOT}")
print(f"Data Source: {DRIVE_ZIP_PATH}")

Project Root: /content/drive/MyDrive/ML_Projects/PointNeXt
Data Source: /content/drive/MyDrive/ML_Projects/PointNeXt/Data/data.zip


# Copy & Unzip Data

In [None]:
if not os.path.exists(DRIVE_ZIP_PATH):
    print(f"Error: Could not find 'data.zip' at {DRIVE_ZIP_PATH}")
    print("Please upload data.zip to the 'Data' folder in Drive first.")
else:
    # Copy to colab local disk
    if not os.path.exists("/content/data.zip"):
        print("Copying data.zip from Drive to Colab local disk...")
        shutil.copy(DRIVE_ZIP_PATH, "/content/data.zip")

    # Unzip
    if not os.path.exists(LOCAL_DATA_DIR):
        print("Unzipping data...")
        !unzip -q /content/data.zip -d /content/data
        print("Unzip Complete.")
    else:
        print("Data already unzipped.")

Copying data.zip from Drive to Colab local disk...
Unzipping data...
Unzip Complete.


In [None]:
# Check raw data shape
raw_dir_0 = "/content/data/data/0/numpy"
raw_dir_1 = "/content/data/data/1/numpy"

target_file = None
if os.path.exists(raw_dir_0) and len(os.listdir(raw_dir_0)) > 0:
    target_file = os.path.join(raw_dir_0, os.listdir(raw_dir_0)[0])
elif os.path.exists(raw_dir_1) and len(os.listdir(raw_dir_1)) > 0:
    target_file = os.path.join(raw_dir_1, os.listdir(raw_dir_1)[0])

if target_file:
    raw_data = np.load(target_file)
    print(f"Raw File: {os.path.basename(target_file)}")
    print(f"Raw Shape: {raw_data.shape}")
else:
    print("No raw files found. Check /content/data path.")

Raw File: standing_trees_30.npy
Raw Shape: (21921, 11)


# Define Normalization Functions

In [None]:
def pc_normalize(pc):
    centroid = np.mean(pc, axis=0)
    pc = pc - centroid
    m = np.max(np.sqrt(np.sum(pc ** 2, axis=1)))
    pc = pc / m
    return pc

def rgb_normalize(points):
    min_vals = points.min(axis=0)      # shape: (,3)
    max_vals = points.max(axis=0)      # shape: (,3)

    normalized = (points - min_vals) / (max_vals - min_vals + 1e-8)  # avoid div by zero

    return normalized

def intensity_normalize(points):
    min_vals = points.min()      # shape: (,3)
    max_vals = points.max()      # shape: (,3)

    normalized = (points - min_vals) / (max_vals - min_vals + 1e-8)  # avoid div by zero

    return normalized

# Process, Normalize & Split

In [None]:
""" Using 15% for val """

import random

def process_dataset():
    # Define split file paths
    train_json_path = os.path.join(LOCAL_DATA_DIR, "train_test_split/shuffled_train_file_list.json")
    test_json_path = os.path.join(LOCAL_DATA_DIR, "train_test_split/shuffled_test_file_list.json")

    if not os.path.exists(train_json_path):
        print(f"Error: Could not find train_test_split folder inside {LOCAL_DATA_DIR}")
        return
    else:
        LOCAL_DATA_ROOT = LOCAL_DATA_DIR

    # Create Official Folder Hierarchy
    class_0_dir = os.path.join(LOCAL_PROCESSED_DIR, "0")
    class_1_dir = os.path.join(LOCAL_PROCESSED_DIR, "1")
    os.makedirs(class_0_dir, exist_ok=True)
    os.makedirs(class_1_dir, exist_ok=True)

    # Destination for Split Files
    dest_split_dir = os.path.join(LOCAL_PROCESSED_DIR, "train_test_split")
    os.makedirs(dest_split_dir, exist_ok=True)

    # SPLIT LOGIC
    print("Loading and splitting data...")
    with open(train_json_path, 'r') as f:
        full_train = json.load(f)

    with open(test_json_path, 'r') as f:
        full_test = json.load(f)

    # Shuffle and Split (85% Train / 15% Val)
    random.seed(42) # Ensure reproducibility
    random.shuffle(full_train)
    split_idx = int(len(full_train) * 0.85)

    train_list = full_train[:split_idx]
    val_list = full_train[split_idx:]

    print(f"Stats -> Train: {len(train_list)}, Val: {len(val_list)}, Test: {len(full_test)}")

    # Process 3 splits instead of 2
    splits_to_process = [("train", train_list), ("val", val_list), ("test", full_test)]

    for split_name, file_list in splits_to_process:
        print(f"\nProcessing {split_name} list...")

        new_json_list = []

        for filename in tqdm(file_list):
            clean_filename = os.path.basename(filename.replace("\\", "/"))

            # Find Source File
            path_0 = os.path.join(LOCAL_DATA_ROOT, "0", "numpy", clean_filename)
            path_1 = os.path.join(LOCAL_DATA_ROOT, "1", "numpy", clean_filename)

            full_path = None
            dest_folder = None
            path_prefix = ""

            if os.path.exists(path_0):
                full_path = path_0
                dest_folder = class_0_dir
                path_prefix = "0/numpy/" # Dummy string for the parser
            elif os.path.exists(path_1):
                full_path = path_1
                dest_folder = class_1_dir
                path_prefix = "1/numpy/"

            if full_path and dest_folder:
                # Load & Normalize
                data = np.load(full_path).astype(np.float32)
                xyz = pc_normalize(data[:, 0:3])
                rgb = rgb_normalize(data[:, 3:6])
                intensity = intensity_normalize(data[:, 6]).reshape(-1, 1)
                labels = data[:, 10].astype(np.int64).reshape(-1, 1)

                processed_data = np.hstack((xyz, rgb, intensity, labels))

                # Save File
                np.save(os.path.join(dest_folder, clean_filename), processed_data)

                # Add path to list
                new_json_list.append(f"{path_prefix}{clean_filename}")
            else:
                print(f"Warning: File {clean_filename} not found.")

        # Save the JSON list
        dest_json_name = f"shuffled_{split_name}_file_list.json"
        with open(os.path.join(dest_split_dir, dest_json_name), 'w') as f:
            json.dump(new_json_list, f)

    # Create Dummy Metadata
    with open(os.path.join(LOCAL_PROCESSED_DIR, "synsetoffset2category.txt"), 'w') as f:
        f.write("Standing_Trees\t0\nFallen_Trees\t1\n")

    print(f"\n\u2705 Processing Complete! 85/15 Train/Val split created.")

# Run it
process_dataset()

Loading and splitting data...
Stats -> Train: 721, Val: 128, Test: 213

Processing train list...


100%|██████████| 721/721 [00:01<00:00, 705.32it/s]



Processing val list...


100%|██████████| 128/128 [00:00<00:00, 796.11it/s]



Processing test list...


100%|██████████| 213/213 [00:00<00:00, 696.81it/s]


✅ Processing Complete! 85/15 Train/Val split created.





In [None]:
# """for directly copying splitting files from RAW to processed data"""

# print("Copying split files to processed directory...")

# raw_split_dir = os.path.join(LOCAL_DATA_DIR, "train_test_split")

# # Destination: Inside the processed data folder
# dest_split_dir = os.path.join(LOCAL_PROCESSED_DIR, "train_test_split")

# if raw_split_dir:
#     if os.path.exists(dest_split_dir):
#         shutil.rmtree(dest_split_dir) # Remove old copy if exists
#     shutil.copytree(raw_split_dir, dest_split_dir)
#     print(f"Success! Copied split files from '{raw_split_dir}' to '{dest_split_dir}'")
# else:
#     print("Error: Could not find 'train_test_split' folder in raw data.")

# Zip & Upload to Drive

In [None]:
print("Zipping processed data...")
# Zip the folder
!zip -r -q /content/processed_data.zip /content/processed_data

print(f"Uploading to Drive: {FINAL_DRIVE_PATH}...")
shutil.copy("/content/processed_data.zip", FINAL_DRIVE_PATH)

print(f"\nDONE! '{FINAL_ZIP_NAME}' is safely stored in Google Drive inside the '{DRIVE_DATA_DIR}' folder.")

Zipping processed data...
Uploading to Drive: /content/drive/MyDrive/ML_Projects/PointNeXt/Data/processed_data.zip...

DONE! 'processed_data.zip' is safely stored in Google Drive inside the '/content/drive/MyDrive/ML_Projects/PointNeXt/Data' folder.
