In [9]:
import os
import cv2
from glob import glob
import numpy as np
import pandas as pd
from tqdm import tqdm
from tensorflow.keras.preprocessing.image import img_to_array, array_to_img

In [10]:
# Set input and output directories
input_csv = '../data/raw/track1/driving_log.csv'
output_csv = '../data/processed/track1/driving_log.csv'
output_dir = '../data/processed/track1'


# Clear the output directory
print("Clearing output directory...")
for file in glob(os.path.join(output_dir, 'IMG/*')):
    os.remove(file)
for file in glob(os.path.join(output_dir, '*')):
    if os.path.isfile(file):
        os.remove(file)

Clearing output directory...


In [11]:
# Load the image
def load_image(path):
    image = cv2.imread(path)
    return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

In [12]:
# Crop and resize the image
def preprocess_image(image):
    image = image[60:140, :, :]  # Crop top and bottom
    return cv2.resize(image, (200, 66), interpolation=cv2.INTER_AREA)

In [13]:
# Apply augmentations
def augment_image(image, steering_angle):
    images, angles = [image], [steering_angle]
    
    # Horizontal flip
    flipped_image = cv2.flip(image, 1)
    images.append(flipped_image)
    angles.append(-steering_angle)
    
    # Brightness adjustment
    hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
    hsv_image[:, :, 2] = hsv_image[:, :, 2] * (0.2 + np.random.uniform())
    bright_image = cv2.cvtColor(hsv_image, cv2.COLOR_HSV2RGB)
    images.append(bright_image)
    angles.append(steering_angle)
    
    # Translation
    trans_x = np.random.randint(-20, 20)
    trans_y = np.random.randint(-10, 10)
    trans_matrix = np.float32([[1, 0, trans_x], [0, 1, trans_y]])
    translated_image = cv2.warpAffine(image, trans_matrix, (image.shape[1], image.shape[0]))
    images.append(translated_image)
    angles.append(steering_angle + trans_x * 0.002)
    
    return images, angles

In [14]:
# Process each image and save results
def process_and_save_images(row):
    camera_paths = [row['center'], row['left'], row['right']]
    angles = [row['steering_angle'], row['steering_angle'] + 0.2, row['steering_angle'] - 0.2]
    throttle, brake, speed = row['throttle'], row['brake'], row['speed']
    
    processed_entries = []
    
    for path, angle in zip(camera_paths, angles):
        image = load_image(path)
        if image is not None:
            image = preprocess_image(image)
            augmented_images, augmented_angles = augment_image(image, angle)
            
            for idx, (aug_img, aug_angle) in enumerate(zip(augmented_images, augmented_angles)):
                filename = f"{os.path.basename(path).split('.')[0]}_aug_{idx}.jpg"
                save_path = os.path.join(output_dir, "IMG/", filename)
                cv2.imwrite(save_path, cv2.cvtColor(aug_img, cv2.COLOR_RGB2BGR))
                
                processed_entries.append([save_path, aug_angle, throttle, brake, speed])
    
    return processed_entries

In [15]:
# Function to clean up the paths
def clean_path(path, project_folder):
    start_index = path.find(project_folder)
    return ".." + path[start_index+len(project_folder):] if start_index != -1 else path

# Load the CSV file
data = pd.read_csv(input_csv, names=["center", "left", "right", "steering_angle", "throttle", "brake", "speed"])

# Apply the clean_path function to each path column
project_folder = os.getcwd().split('/')[-2]
data['center'] = data['center'].apply(clean_path, project_folder=project_folder)
data['left'] = data['left'].apply(clean_path, project_folder=project_folder)
data['right'] = data['right'].apply(clean_path, project_folder=project_folder)
data['center'].head()

# Save the modified data back to a new CSV file
data.to_csv(output_csv, index=False, header=False)
print(f"Paths have been updated and saved to {output_csv}")

Paths have been updated and saved to ../data/processed/track1/driving_log.csv


In [16]:
# Main processing loop
data = pd.read_csv(input_csv, names=["center", "left", "right", "steering_angle", "throttle", "brake", "speed"])
processed_data = []

for _, row in tqdm(data.iterrows(), total=len(data), desc="Processing Images"):
    processed_entries = process_and_save_images(row)
    processed_data.extend(processed_entries)

# Save new CSV with processed data
processed_df = pd.DataFrame(processed_data, columns=["image_path", "steering_angle", "throttle", "brake", "speed"])
processed_df.to_csv(output_csv, index=False)
print(f"Processing complete. Processed data saved to {output_csv}.")

Processing Images: 100%|██████████| 8959/8959 [00:28<00:00, 311.43it/s]


Processing complete. Processed data saved to ../data/processed/track1/driving_log.csv.
