<a href="https://colab.research.google.com/github/friedelj/Capstone/blob/main/Assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

CAPSTONE   Assignment 3      7_14_25

In [None]:
import os
import shutil
from PIL import Image, ImageEnhance, ImageOps
import random
import traceback
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Input and output directories
input_dir = r'C:\Users\josep\RF Signals'
output_dir = r'C:\Users\josep\RF Signals Balanced'
target_count = 2000
resize_dim = (224, 224)

In [None]:
# Logging
failed_images = []

In [None]:
# === Data Augmentation Function ===
def augment_image(img):
    transformations = [
        lambda x: x.rotate(random.uniform(-15, 15)),
        lambda x: ImageOps.mirror(x),
        lambda x: ImageOps.flip(x),
        lambda x: ImageEnhance.Brightness(x).enhance(random.uniform(0.7, 1.3)),
        lambda x: ImageEnhance.Contrast(x).enhance(random.uniform(0.7, 1.3)),
    ]
    return random.choice(transformations)(img)

In [None]:
# === Histogram Equalization ===
def equalize_histogram(img):
    if img.mode != "RGB":
        img = img.convert("RGB")
    r, g, b = img.split()
    return Image.merge("RGB", (ImageOps.equalize(r), ImageOps.equalize(g), ImageOps.equalize(b)))

In [None]:
# === Process Each Class Folder ===
os.makedirs(output_dir, exist_ok=True)

In [None]:
for class_name in os.listdir(input_dir):
    class_path = os.path.join(input_dir, class_name)
    if not os.path.isdir(class_path):
        continue

    print(f'🔍 Processing class: {class_name}')
    orig_images = [f for f in os.listdir(class_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

    if len(orig_images) == 0:
        print(f'⚠️ Skipping {class_name} — no images found.')
        continue

    new_class_path = os.path.join(output_dir, class_name)
    os.makedirs(new_class_path, exist_ok=True)
    # Copy and standardize up to 2000 original images
    copy_count = min(target_count, len(orig_images))
    for i in range(copy_count):
        try:
            orig_name = orig_images[i]
            src = os.path.join(class_path, orig_name)
            with Image.open(src) as img:
                img = img.convert("RGB")
                img = img.resize(resize_dim)
                img = equalize_histogram(img)
                base_name = os.path.splitext(orig_name)[0]
                dst = os.path.join(new_class_path, f"{base_name}.jpg")
                img.save(dst, format='JPEG')
        except Exception as e:
            print(f"❌ Failed to process {orig_name} in {class_name}")
            failed_images.append((class_name, orig_name, str(e)))
            continue
    # Augment if less than 2000 images
    current_count = len(os.listdir(new_class_path))
    needed = target_count - current_count
    if needed > 0:
        print(f'➕ Augmenting {needed} images for {class_name}')
        for i in range(needed):
            try:
                orig_name = orig_images[i % len(orig_images)]
                orig_path = os.path.join(class_path, orig_name)
                with Image.open(orig_path) as img:
                    img = img.convert("RGB")
                    img = augment_image(img)
                    img = img.resize(resize_dim)
                    img = equalize_histogram(img)
                    aug_name = f"aug_{i}_{os.path.splitext(orig_name)[0]}.jpg"
                    aug_path = os.path.join(new_class_path, aug_name)
                    img.save(aug_path, format='JPEG')
            except Exception as e:
                print(f"❌ Failed to augment {orig_name} in {class_name}")
                failed_images.append((class_name, orig_name, str(e)))
                continue

    final_count = len(os.listdir(new_class_path))
    print(f'✅ {class_name}: {final_count} images total.')

# === Log Any Failures ===
if failed_images:
    log_path = os.path.join(output_dir, 'failed_images_log.txt')
    with open(log_path, 'w') as f:
        for entry in failed_images:
            f.write(f"Class: {entry[0]} | File: {entry[1]} | Error: {entry[2]}\n")
    print(f"\n⚠️ Logged {len(failed_images)} failed image(s) to {log_path}")

print("\n🎉 Dataset balancing to 2000 images per class complete.")

In [None]:
balanced_dir = r'C:\Users\josep\RF Signals Balanced'

# Supported image formats
extensions = ('.jpg', '.jpeg', '.png')

print(f"\n📊 Image counts in each class under: {balanced_dir}\n")

for class_name in sorted(os.listdir(balanced_dir)):
    class_path = os.path.join(balanced_dir, class_name)
    if os.path.isdir(class_path):
        files = [f for f in os.listdir(class_path) if f.lower().endswith(extensions)]
        count = len(files)
        print(f"{class_name:35} {count:5} images")

In [None]:
# Path to the balanced image dataset
balanced_dir = r'C:\Users\josep\RF Signals Balanced'

# Modulation mapping
modulation_map = {
    "16QAM": 1, "2ASK": 2, "32QAM": 3, "4FSK": 4, "8PSK": 5, "RS41-Radiosonde": 6,
    "Radioteletype": 7, "ads-b": 8, "airband": 9, "ais": 10, "am": 11, "atsc": 12,
    "automatic-picture-transmission": 13, "bluetooth": 14, "cellular": 15,
    "digital-audio-broadcasting": 16, "digital-speech-decoder": 17, "drone-video": 18,
    "fm": 19, "hdmi": 20, "lora": 21, "morse": 22, "on-off-keying": 23, "packet": 24,
    "pocsag": 25, "remote-keyless-entry": 26, "sstv": 27, "uav-video": 28,
    "vor": 29, "wifi": 30, "z-wave": 31
}

# Initialize rows for the dataframe
rows = []
line_number = 1

# Walk through each class directory
for class_name, class_number in modulation_map.items():
    class_dir = os.path.join(balanced_dir, class_name)
    if not os.path.isdir(class_dir):
        print(f"⚠️ Skipping missing folder: {class_dir}")
        continue

    for file in sorted(os.listdir(class_dir)):
        if file.lower().endswith(('.jpg', '.jpeg', '.png')):
            file_path = os.path.join(class_name, file)  # relative path
            rows.append({
                "Line Number": line_number,
                "Modulation Type": class_number,
                "Image File": file_path
            })
            line_number += 1

# Create the DataFrame
df = pd.DataFrame(rows, columns=["Line Number", "Modulation Type", "Image File"])

# Save to CSV
csv_path = os.path.join(balanced_dir, "modulation_dataset_index.csv")
df.to_csv(csv_path, index=False)

print(f"✅ DataFrame created with {len(df)} entries.")
print(f"📄 CSV saved to: {csv_path}")

In [None]:
print(df.head())

In [None]:
# Remove exact duplicates based on 'Image File'
df_cleaned = df.drop_duplicates(subset=["Image File"]).reset_index(drop=True)

# Show how many were removed
removed = len(df) - len(df_cleaned)
print(f"✅ Removed {removed} duplicate image entries.")
print(f"📊 Cleaned DataFrame now has {len(df_cleaned)} unique images.")

# Show first 5 rows
print(df_cleaned.head())

In [None]:
# Add a 'Base Name' column (filename without extension)
df['Base Name'] = df['Image File'].apply(lambda x: os.path.splitext(os.path.basename(x))[0])
df['Extension'] = df['Image File'].apply(lambda x: os.path.splitext(x)[1].lower())

# Sort so PNG comes after JPG
df_sorted = df.sort_values(by=['Base Name', 'Extension'], ascending=[True, False])

# Drop duplicates based on 'Base Name', keeping the .png version
df_deduped = df_sorted.drop_duplicates(subset=['Base Name'], keep='first').reset_index(drop=True)

# Drop helper columns
df_deduped = df_deduped.drop(columns=['Base Name', 'Extension'])

# Reassign new line numbers
df_deduped['Line Number'] = range(1, len(df_deduped) + 1)

# Save to CSV
csv_path = os.path.join(balanced_dir, "modulation_dataset_index_nodupes.csv")
df_deduped.to_csv(csv_path, index=False)

print(f"✅ Removed .jpg duplicates — only .png versions retained when both exist.")
print(f"📊 Final count: {len(df_deduped)} unique image entries.")

In [None]:
# Group by 'Modulation Type' and count entries
modulation_counts = df_deduped.groupby('Modulation Type')['Image File'].count()

# Display the counts
print(modulation_counts.sort_index())

In [None]:
# Show first 5 rows
print(df_deduped.head())

In [None]:
# Map numbers back to names
modulation_name_map = {
    1: "16QAM", 2: "2ASK", 3: "32QAM", 4: "4FSK", 5: "8PSK", 6: "RS41-Radiosonde",
    7: "Radioteletype", 8: "ads-b", 9: "airband", 10: "ais", 11: "am", 12: "atsc",
    13: "automatic-picture-transmission", 14: "bluetooth", 15: "cellular",
    16: "digital-audio-broadcasting", 17: "digital-speech-decoder", 18: "drone-video",
    19: "fm", 20: "hdmi", 21: "lora", 22: "morse", 23: "on-off-keying", 24: "packet",
    25: "pocsag", 26: "remote-keyless-entry", 27: "sstv", 28: "uav-video",
    29: "vor", 30: "wifi", 31: "z-wave"
}

# Convert Series to DataFrame for better formatting
modulation_counts_df = modulation_counts.reset_index()
modulation_counts_df['Modulation Name'] = modulation_counts_df['Modulation Type'].map(modulation_name_map)

# Reorder columns
modulation_counts_df = modulation_counts_df[['Modulation Type', 'Modulation Name', 'Image File']]
modulation_counts_df.columns = ['Modulation Type', 'Modulation Name', 'Image Count']

# Sort and display
print(modulation_counts_df.sort_values(by='Modulation Type'))

In [None]:
# Group by 'Modulation Type' and count image files
modulation_counts = df_deduped.groupby('Modulation Type')['Image File'].count()

# Define mapping from modulation type number to name
modulation_name_map = {
    1: "16QAM", 2: "2ASK", 3: "32QAM", 4: "4FSK", 5: "8PSK", 6: "RS41-Radiosonde",
    7: "Radioteletype", 8: "ads-b", 9: "airband", 10: "ais", 11: "am", 12: "atsc",
    13: "automatic-picture-transmission", 14: "bluetooth", 15: "cellular",
    16: "digital-audio-broadcasting", 17: "digital-speech-decoder", 18: "drone-video",
    19: "fm", 20: "hdmi", 21: "lora", 22: "morse", 23: "on-off-keying", 24: "packet",
    25: "pocsag", 26: "remote-keyless-entry", 27: "sstv", 28: "uav-video",
    29: "vor", 30: "wifi", 31: "z-wave"
}

# Convert to DataFrame
modulation_counts_df = modulation_counts.reset_index()
modulation_counts_df['Modulation Name'] = modulation_counts_df['Modulation Type'].map(modulation_name_map)
modulation_counts_df.columns = ['Modulation Type', 'Image Count', 'Modulation Name']
modulation_counts_df = modulation_counts_df[['Modulation Type', 'Modulation Name', 'Image Count']]

# Save to CSV
csv_output_path = os.path.join(balanced_dir, "modulation_image_counts.csv")
modulation_counts_df.to_csv(csv_output_path, index=False)
print(f"✅ CSV saved to: {csv_output_path}")

# Plot bar chart
plt.figure(figsize=(14, 7))
plt.bar(modulation_counts_df['Modulation Name'], modulation_counts_df['Image Count'], color='steelblue')
plt.xticks(rotation=90)
plt.title("Image Count per Modulation Type")
plt.xlabel("Modulation Type")
plt.ylabel("Image Count")
plt.tight_layout()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

In [None]:
# Show first 5 rows
print(modulation_counts_df.head(31))

In [None]:
# Add a column that marks whether the image was augmented or original
df_deduped['Augmented'] = df_deduped['Image File'].apply(lambda x: os.path.basename(x).startswith('aug_'))

# Optional: Count how many are augmented vs original
print(df_deduped['Augmented'].value_counts())

In [None]:
# Show first 5 rows
print(df_deduped.head())

In [None]:
df_signals = df_deduped

In [None]:
correlation_matrix = df_signals.corr(numeric_only=True)
print(correlation_matrix)

In [None]:
plt.figure(figsize=(8, 6))
sns.heatmap(df_signals.corr(numeric_only=True), annot=True, cmap='coolwarm', fmt=".2f")
plt.title("Correlation Matrix of df_signals")
plt.tight_layout()
plt.show()

In [None]:
# Compute correlation matrix (only for numeric columns)
corr_matrix = df_signals.corr(numeric_only=True)

# Create heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, fmt=".2f", linewidths=0.5, square=True)

# Add title and layout
plt.title("Correlation Matrix of df_signals")
plt.tight_layout()
plt.show()

In [None]:
# Create the summary table
summary = df_signals.groupby(['Modulation Type', 'Augmented'])['Image File'].count().unstack(fill_value=0)
summary.columns = ['Original Images', 'Augmented Images']
summary['Total Images'] = summary['Original Images'] + summary['Augmented Images']
summary = summary.reset_index()

# Add modulation names (if you have a mapping)
modulation_name_map = {
    1: "16QAM", 2: "2ASK", 3: "32QAM", 4: "4FSK", 5: "8PSK", 6: "RS41-Radiosonde",
    7: "Radioteletype", 8: "ads-b", 9: "airband", 10: "ais", 11: "am", 12: "atsc",
    13: "automatic-picture-transmission", 14: "bluetooth", 15: "cellular",
    16: "digital-audio-broadcasting", 17: "digital-speech-decoder", 18: "drone-video",
    19: "fm", 20: "hdmi", 21: "lora", 22: "morse", 23: "on-off-keying", 24: "packet",
    25: "pocsag", 26: "remote-keyless-entry", 27: "sstv", 28: "uav-video",
    29: "vor", 30: "wifi", 31: "z-wave"
}
summary['Modulation Name'] = summary['Modulation Type'].map(modulation_name_map)
summary = summary[['Modulation Type', 'Modulation Name', 'Original Images', 'Augmented Images', 'Total Images']]

# Display the table
print(summary)

In [None]:
# Bar chart
plt.figure(figsize=(14, 7))
bar_width = 0.35
x = range(len(summary))

plt.bar(x, summary['Original Images'], width=bar_width, label='Original', color='steelblue')
plt.bar([i + bar_width for i in x], summary['Augmented Images'], width=bar_width, label='Augmented', color='salmon')

plt.xlabel('Modulation Type')
plt.ylabel('Number of Images')
plt.title('Original vs Augmented Image Counts by Modulation Type')
plt.xticks([i + bar_width / 2 for i in x], summary['Modulation Name'], rotation=90)
plt.legend()
plt.tight_layout()
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()

In [None]:
# Save table to CSV
summary.to_csv("modulation_image_summary.csv", index=False)

# Save chart as PNG
plt.savefig("modulation_image_bar_chart.png")

In [None]:
# Sort by modulation name (optional, for readability)
summary_sorted = summary.sort_values("Modulation Name")

# Set up the plot
plt.figure(figsize=(14, 7))

# Plot stacked bars: Original + Augmented
plt.bar(summary_sorted['Modulation Name'], summary_sorted['Original Images'],
        label='Original', color='steelblue')
plt.bar(summary_sorted['Modulation Name'], summary_sorted['Augmented Images'],
        bottom=summary_sorted['Original Images'], label='Augmented', color='salmon')

# Plot Total as black dots (optional)
plt.plot(summary_sorted['Modulation Name'], summary_sorted['Total Images'],
         marker='o', color='black', linestyle='None', label='Total')

# Customize the plot
plt.xlabel('Modulation Type')
plt.ylabel('Number of Images')
plt.title('Image Count per Modulation Type (Original + Augmented)')
plt.xticks(rotation=90)
plt.legend()
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()

In [None]:
# Group by modulation type and augmentation status
summary_table = df_signals.groupby(['Modulation Type', 'Augmented'])['Image File'].count().unstack(fill_value=0)

# Rename columns for clarity
summary_table.columns = ['Original Images', 'Augmented Images']

# Add total column
summary_table['Total Images'] = summary_table['Original Images'] + summary_table['Augmented Images']

# Reset index to make 'Modulation Type' a column
summary_table = summary_table.reset_index()

# Add modulation name mapping
modulation_name_map = {
    1: "16QAM", 2: "2ASK", 3: "32QAM", 4: "4FSK", 5: "8PSK", 6: "RS41-Radiosonde",
    7: "Radioteletype", 8: "ads-b", 9: "airband", 10: "ais", 11: "am", 12: "atsc",
    13: "automatic-picture-transmission", 14: "bluetooth", 15: "cellular",
    16: "digital-audio-broadcasting", 17: "digital-speech-decoder", 18: "drone-video",
    19: "fm", 20: "hdmi", 21: "lora", 22: "morse", 23: "on-off-keying", 24: "packet",
    25: "pocsag", 26: "remote-keyless-entry", 27: "sstv", 28: "uav-video",
    29: "vor", 30: "wifi", 31: "z-wave"
}
summary_table['Modulation Name'] = summary_table['Modulation Type'].map(modulation_name_map)

# Rearrange columns
summary_table = summary_table[['Modulation Type', 'Modulation Name',
                               'Original Images', 'Augmented Images', 'Total Images']]

# Display the table
print(summary_table)

In [None]:
# Sort for cleaner x-axis display
summary_sorted = summary_table.sort_values("Modulation Name")

plt.figure(figsize=(14, 6))
plt.plot(summary_sorted['Modulation Name'], summary_sorted['Original Images'],
         marker='o', label='Original Images', color='steelblue')
plt.plot(summary_sorted['Modulation Name'], summary_sorted['Augmented Images'],
         marker='s', label='Augmented Images', color='salmon')
plt.plot(summary_sorted['Modulation Name'], summary_sorted['Total Images'],
         marker='^', label='Total Images', color='green')

plt.xticks(rotation=90)
plt.xlabel('Modulation Type')
plt.ylabel('Number of Images')
plt.title('Image Distribution per Modulation Type')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()

In [None]:
# Histogram of Image Counts per Modulation (Distribution)
plt.figure(figsize=(12,6))
sns.histplot(data=summary_table, x='Total Images', bins=20, kde=True)
plt.title('Histogram of Total Image Counts per Modulation Type')
plt.xlabel('Number of Images')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()

In [None]:
# Box Plot of Image Counts
plt.figure(figsize=(10,6))
sns.boxplot(data=summary_table[['Original Images', 'Augmented Images']])
plt.title('Box Plot of Image Counts')
plt.ylabel('Image Count')
plt.grid(True)
plt.show()

In [None]:
# Bar Chart of Top N Modulation Types by Image Count
top_n = summary_table.sort_values('Total Images', ascending=False).head(10)

plt.figure(figsize=(12,6))
sns.barplot(x='Modulation Name', y='Total Images', data=top_n, palette='viridis')
plt.title('Top 10 Modulation Types by Total Image Count')
plt.ylabel('Total Images')
plt.xticks(rotation=45)
plt.show()