In [24]:
# Standard library imports
import logging
import random
import shutil
from argparse import Namespace
from pathlib import Path
from typing import List, Tuple

# Third-party imports
import librosa
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

In [28]:

def create_config() -> Namespace:
    """
    Create configuration namespace with all constants grouped.
    
    Returns:
        Namespace containing all configuration parameters
    """
    return Namespace(
        # Directory paths
        source_dir=Path("../audio_data/gun_sound_v2"),
        target_dir=Path("../audio_data/gun_sound_final"),
        dataset_dir=Path("../dataset"),
        metadata_dir=Path("../metadata"),
        
        # Filtering criteria
        distance_filter="0m",
        direction_filter="center",
        min_filename_parts=4,
        
        # File patterns
        audio_extension="*.mp3",
        metadata_filename="gun_sound_final_metadata.parquet",
        
        # Operation settings
        use_move=False,  # Set to True to move instead of copy
        skip_existing=True,
        
        # Progress and logging
        log_level=logging.INFO,
        show_examples=5,
        
        # Compression settings
        compression="gzip",
        parquet_engine="pyarrow",
        
        FEATURE_DIR=Path("../features"),
        DATASET_DIR=Path("../dataset"),
        SAMPLE_RATE=22050,
        N_MELS=64,
        HOP_LENGTH=512,
    )

# Create configuration
ARGS = create_config()

In [2]:
# Define base directory
AUDIO_DIR = Path("../audio_data/gun_sound_v2")

# Filtered list
filtered_files = []

# Loop through all .mp3 files
for file in AUDIO_DIR.glob("*.mp3"):
    parts = file.stem.split("_")  # Remove ".mp3" and split by "_"
    
    if len(parts) >= 4:
        gun_type, distance, direction, clip_id = parts
        if distance == "0m" and direction == "center":
            filtered_files.append(file)

# Result
print(f"✅ Found {len(filtered_files)} files matching 0m/center")
print("🔫 Example files:")
for f in filtered_files[:5]:
    print("-", f.name)

✅ Found 1013 files matching 0m/center
🔫 Example files:
- s12k_0m_center_1986.mp3
- s12k_0m_center_1992.mp3
- s12k_0m_center_1979.mp3
- pp_0m_center_1894.mp3
- tomy_0m_center_0812.mp3


In [11]:
file_components = [f.name.split("_") for f in filtered_files]

df = pd.DataFrame({
    "file_name": [f.name for f in filtered_files],
    "label": [components[0] for components in file_components],
    "id": [components[3].split(".")[0] for components in file_components]
})
# df.to_csv("filtered_bgg_0m_center.csv", index=False)

In [12]:
df

Unnamed: 0,file_name,label,id
0,s12k_0m_center_1986.mp3,s12k,1986
1,s12k_0m_center_1992.mp3,s12k,1992
2,s12k_0m_center_1979.mp3,s12k,1979
3,pp_0m_center_1894.mp3,pp,1894
4,tomy_0m_center_0812.mp3,tomy,0812
...,...,...,...
1008,ak_0m_center_0008.mp3,ak,0008
1009,kar_0m_center_0552.mp3,kar,0552
1010,m249_0m_center_0588.mp3,m249,0588
1011,m24_0m_center_1822.mp3,m24,1822


In [13]:
df.to_parquet(Path("../metadata/gun_sound_final_metadata.paquet"), index=False, engine="pyarrow", compression="gzip")

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def filter_audio_files(
    source_dir: Path, 
    distance_filter: str = "0m", 
    direction_filter: str = "center",
    min_parts: int = 4,
    audio_pattern: str = "*.mp3"
) -> List[Path]:
    """
    Filter audio files based on distance and direction criteria.
    
    Args:
        source_dir: Source directory containing audio files
        distance_filter: Distance criteria to filter by
        direction_filter: Direction criteria to filter by
        min_parts: Minimum number of filename parts required
        audio_pattern: File pattern to match
        
    Returns:
        List of filtered file paths
    """
    filtered_files = []
    
    if not source_dir.exists():
        logger.error(f"Source directory does not exist: {source_dir}")
        return filtered_files
    
    # Get all audio files first to show progress
    all_files = list(source_dir.glob(audio_pattern))
    
    for file in tqdm(all_files, desc="Filtering audio files", unit="file"):
        try:
            parts = file.stem.split("_")
            if len(parts) >= min_parts:
                gun_type, distance, direction, clip_id = parts[:4]
                if distance == distance_filter and direction == direction_filter:
                    filtered_files.append(file)
        except Exception as e:
            logger.warning(f"Error processing file {file.name}: {e}")
    
    logger.info(f"Found {len(filtered_files)} files matching {distance_filter}/{direction_filter}")
    return filtered_files

def copy_files_with_progress(
    files: List[Path], 
    target_dir: Path, 
    use_move: bool = False,
    skip_existing: bool = True
) -> Tuple[List[str], List[str]]:
    """
    Copy or move files to target directory with error handling.
    
    Args:
        files: List of source files to copy/move
        target_dir: Destination directory
        use_move: If True, move files instead of copying
        skip_existing: If True, skip files that already exist
        
    Returns:
        Tuple of (successful_files, failed_files)
    """
    target_dir.mkdir(parents=True, exist_ok=True)
    
    successful_files = []
    failed_files = []
    
    operation = "Moving" if use_move else "Copying"
    logger.info(f"{operation} {len(files)} files to {target_dir}")
    
    for file in tqdm(files, desc=f"{operation} files", unit="file"):
        try:
            dest_path = target_dir / file.name
            
            if dest_path.exists() and skip_existing:
                logger.warning(f"File already exists, skipping: {file.name}")
                continue
                
            if use_move:
                shutil.move(str(file), str(dest_path))
            else:
                shutil.copy2(file, dest_path)
                
            successful_files.append(file.name)
            
        except Exception as e:
            logger.error(f"Failed to {operation.lower()} {file.name}: {e}")
            failed_files.append(file.name)
    
    return successful_files, failed_files

def create_metadata_dataframe(
    files: List[Path],
    metadata_dir: Path,
    filename: str,
    compression: str = "gzip",
    engine: str = "pyarrow"
) -> pd.DataFrame:
    """
    Create metadata dataframe and save to parquet file.
    
    Args:
        files: List of audio files
        metadata_dir: Directory to save metadata
        filename: Metadata filename
        compression: Compression method
        engine: Parquet engine
        
    Returns:
        DataFrame with file metadata
    """
    metadata_dir.mkdir(parents=True, exist_ok=True)
    
    file_components = [f.name.split("_") for f in files]
    
    df = pd.DataFrame({
        "file_name": [f.name for f in files],
        "label": [components[0] for components in file_components],
        "id": [components[3].split(".")[0] for components in file_components]
    })
    
    # Save metadata
    metadata_path = metadata_dir / filename
    df.to_parquet(metadata_path, index=False, engine=engine, compression=compression)
    logger.info(f"Saved metadata to {metadata_path}")
    
    return df

def process_audio_files(args: Namespace) -> None:
    """
    Main processing function using configuration from Namespace.
    
    Args:
        args: Configuration namespace
    """
    try:
        # Step 1: Filter files
        filtered_files = filter_audio_files(
            args.source_dir, 
            args.distance_filter, 
            args.direction_filter,
            args.min_filename_parts,
            args.audio_extension
        )
        
        if not filtered_files:
            logger.warning("No files found matching the criteria")
            return
        
        # Step 2: Copy/move files
        successful_files, failed_files = copy_files_with_progress(
            filtered_files, 
            args.target_dir, 
            args.use_move,
            args.skip_existing
        )
        
        # Step 3: Create metadata
        if successful_files:
            # Filter to only successfully processed files
            processed_files = [f for f in filtered_files if f.name in successful_files]
            create_metadata_dataframe(
                processed_files,
                args.metadata_dir,
                args.metadata_filename,
                args.compression,
                args.parquet_engine
            )
        
        # Step 4: Summary
        logger.info(f"✅ Successfully processed {len(successful_files)} files")
        if failed_files:
            logger.error(f"❌ Failed to process {len(failed_files)} files")
        
        # Show examples
        print(f"\n✅ Processed {len(successful_files)} files to {args.target_dir}")
        if failed_files:
            print(f"❌ Failed to process {len(failed_files)} files")
        
        print(f"\n🔫 Example files (showing first {args.show_examples}):")
        for f in successful_files[:args.show_examples]:
            print(f"   - {f}")
            
    except Exception as e:
        logger.error(f"Critical error during processing: {e}")
        raise

# Execute the processing
process_audio_files(ARGS)

### Step - Shuffle and split dataset to (train/test/val)

In [21]:
# Split ratio
SPLIT_RATIOS = {
    "train": 0.7,
    "val": 0.15,
    "test": 0.15
}

# Shuffle seed
random.seed(42)

# Step 1: Gather all files
all_files = list(ARGS.source_dir.glob("*.mp3"))
random.shuffle(all_files)

# Step 2: Split files
total = len(all_files)
n_train = int(total * SPLIT_RATIOS["train"])
n_val = int(total * SPLIT_RATIOS["val"])
n_test = total - n_train - n_val

splits = {
    "train": all_files[:n_train],
    "val": all_files[n_train:n_train + n_val],
    "test": all_files[n_train + n_val:]
}

# Step 3: Copy files to target folders
for split_name, split_files in splits.items():
    split_dir = ARGS.dataset_dir / split_name
    split_dir.mkdir(parents=True, exist_ok=True)

    print(f"📁 Copying {len(split_files)} files to: {split_dir}")
    for file in split_files:
        shutil.copy(file, split_dir / file.name)

print("✅ Dataset split complete.")

📁 Copying 1536 files to: ../dataset/train
📁 Copying 329 files to: ../dataset/val
📁 Copying 330 files to: ../dataset/test
✅ Dataset split complete.


### Step - Convert audio to Log-Mel dataset

In [30]:
# 🔖 Auto-generate label map
LABELS = sorted({f.stem.split("_")[0] for f in (ARGS.DATASET_DIR / "train").glob("*.mp3")})
LABEL_MAP = {label: idx for idx, label in enumerate(LABELS)}
print("🔖 Gun type labels:", LABEL_MAP)

def extract_logmel(audio_path):
    y, _ = librosa.load(audio_path, sr=ARGS.SAMPLE_RATE)
    mel = librosa.feature.melspectrogram(y=y, sr=ARGS.SAMPLE_RATE, n_mels=ARGS.N_MELS, hop_length=ARGS.HOP_LENGTH)
    log_mel = librosa.power_to_db(mel, ref=np.max)
    return log_mel

def process_split(split):
    input_dir = ARGS.DATASET_DIR / split
    output_dir = ARGS.FEATURE_DIR / split
    output_dir.mkdir(parents=True, exist_ok=True)

    for audio_file in tqdm(list(input_dir.glob("*.mp3")), desc=f"🔁 Processing {split}"):
        gun_type = audio_file.stem.split("_")[0]
        label = LABEL_MAP[gun_type]

        try:
            logmel = extract_logmel(audio_file)
            out_name = f"{audio_file.stem}.npy"
            np.save(output_dir / out_name, {
                "features": logmel.astype(np.float32),
                "label": label
            })
        except Exception as e:
            print(f"⚠️ Failed to process {audio_file.name}: {e}")

# Run all splits
for split in ["train", "val", "test"]:
    process_split(split)

# Save label map
import json
with open(ARGS.FEATURE_DIR / "labels.json", "w") as f:
    json.dump(LABEL_MAP, f)

print("✅ Feature extraction complete and saved to:", ARGS.FEATURE_DIR.resolve())

🔖 Gun type labels: {'ak': 0, 'aug': 1, 'awm': 2, 'dbs': 3, 'deagle': 4, 'dp': 5, 'g36c': 6, 'gro': 7, 'k2': 8, 'kar': 9, 'm16': 10, 'm24': 11, 'm249': 12, 'm4': 13, 'mini': 14, 'mk': 15, 'nogun': 16, 'p18c': 17, 'p1911': 18, 'p90': 19, 'p92': 20, 'pp': 21, 'pump': 22, 'qbu': 23, 'qbz': 24, 'r1895': 25, 'r45': 26, 's12k': 27, 'scar': 28, 'sks': 29, 'slr': 30, 'tomy': 31, 'ump': 32, 'uzi': 33, 'vec': 34, 'verl': 35, 'vss': 36, 'win': 37}


🔁 Processing train:   0%|          | 0/1536 [00:00<?, ?it/s]

🔁 Processing val:   0%|          | 0/329 [00:00<?, ?it/s]

🔁 Processing test:   0%|          | 0/330 [00:00<?, ?it/s]

✅ Feature extraction complete and saved to: /Users/ittichaiboonyarakthunya/Documents/WorkDir/ai-ml/labs-gunshot-classification/features
