In [None]:
import os
import wave
import numpy as np
import math
import csv

def load_wav(filename):
    with wave.open(filename, 'r') as wav_file:
        num_channels = wav_file.getnchannels()
        sample_width = wav_file.getsampwidth()
        frame_rate = wav_file.getframerate()
        num_frames = wav_file.getnframes()
        
        frames = wav_file.readframes(num_frames)
        signal = np.frombuffer(frames, dtype=np.int16)
        
        if num_channels > 1:
            signal = signal[::num_channels]
        
        return signal, frame_rate

def split_frames(signal, frame_size):
    return [signal[i:i+frame_size] for i in range(0, len(signal), frame_size) if len(signal[i:i+frame_size]) == frame_size]


def extract_features(frames):
    features = []
    for frame in frames:
        mean = sum(frame) / len(frame)
        variance = sum((x - mean) ** 2 for x in frame) / len(frame)
        features.append([mean, variance])
    return features


def mean_column(matrix):
    cols = len(matrix[0])
    return [sum(row[col] for row in matrix) / len(matrix) for col in range(cols)]


def center_matrix(matrix, means):
    return [[x - m for x, m in zip(row, means)] for row in matrix]


def covariance_matrix(matrix):
    n, m = len(matrix), len(matrix[0])
    cov = [[0] * m for _ in range(m)]
    for i in range(m):
        for j in range(m):
            cov[i][j] = sum(matrix[k][i] * matrix[k][j] for k in range(n)) / (n - 1)
    return cov


def eigen_2x2(matrix):
    a, b = matrix[0][0], matrix[0][1]
    c, d = matrix[1][0], matrix[1][1]
    trace = a + d
    det = a * d - b * c
    sqrt_disc = math.sqrt(trace ** 2 - 4 * det)
    lambda1 = (trace + sqrt_disc) / 2
    lambda2 = (trace - sqrt_disc) / 2
    def eigenvector(lmbd):
        if b != 0:
            return [lmbd - d, b]
        elif a - lmbd != 0:
            return [b, lmbd - a]
        else:
            return [1, 0]
    return [(lambda1, eigenvector(lambda1)), (lambda2, eigenvector(lambda2))]


def normalize(X):
    if not X:
        return []
    X = np.array(X)
    means = X.mean(axis=0)
    stds = X.std(axis=0)

    X_normalized = (X - means) / stds

    return X_normalized.tolist()


def pca_on_audio(signal, frame_size=1024, n_components=3):
    frames = split_frames(signal, frame_size)
    features = extract_features(frames)
    means = mean_column(features)
    centered = center_matrix(features, means)
    cov = covariance_matrix(centered)
    eigens = eigen_2x2(cov)
    eigens.sort(reverse=True, key=lambda x: x[0])
    
    components = [normalize(vec) for _, vec in eigens[:n_components]]
    
    projected = [[sum(x * y for x, y in zip(row, comp)) for comp in components] for row in centered]
    
    return projected


In [None]:

def process_audio_folder_and_save(input_folder, output_csv, frame_size=1024, n_components=3):
    with open(output_csv, mode='w', newline='') as csvfile:
        fieldnames = ['Audio File', 'Component 1', 'Component 2', 'Component 3']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        for filename in os.listdir(input_folder):
            if filename.endswith('.wav'):
                audio_path = os.path.join(input_folder, filename)
                print(f"Processing {audio_path}...")
                
                signal, frame_rate = load_wav(audio_path)
                
                projected_data = pca_on_audio(signal, frame_size, n_components)
                
                for row in projected_data:
                    writer.writerow({
                        'Audio File': filename,
                        'Component 1': row[0] if len(row) > 0 else None,
                        'Component 2': row[1] if len(row) > 1 else None,
                        'Component 3': row[2] if len(row) > 2 else None
                    })
                
                print(f"Finished processing {audio_path}")

input_folder = "Data"
output_csv = "projected_data.csv"
process_audio_folder_and_save(input_folder, output_csv, frame_size=1024, n_components=3)

Processing Data\blues.00000.wav...
Finished processing Data\blues.00000.wav
Processing Data\blues.00001.wav...
Finished processing Data\blues.00001.wav
Processing Data\blues.00002.wav...
Finished processing Data\blues.00002.wav
Processing Data\blues.00003.wav...
Finished processing Data\blues.00003.wav
Processing Data\blues.00004.wav...
Finished processing Data\blues.00004.wav
Processing Data\blues.00005.wav...
Finished processing Data\blues.00005.wav
Processing Data\blues.00006.wav...
Finished processing Data\blues.00006.wav
Processing Data\blues.00007.wav...
Finished processing Data\blues.00007.wav
Processing Data\blues.00008.wav...
Finished processing Data\blues.00008.wav
Processing Data\blues.00009.wav...
Finished processing Data\blues.00009.wav
Processing Data\blues.00010.wav...
Finished processing Data\blues.00010.wav
Processing Data\blues.00011.wav...
Finished processing Data\blues.00011.wav
Processing Data\blues.00012.wav...
Finished processing Data\blues.00012.wav
Processing D

In [None]:
import os
import wave
import numpy as np
import math
import csv

def euclidean_distance(vec1, vec2):
    return math.sqrt(sum((x - y) ** 2 for x, y in zip(vec1, vec2)))

def knn_classify(audio_file, csv_file, frame_size=1024, n_components=3, k=1):
    signal, frame_rate = load_wav(audio_file)
    projected_data = pca_on_audio(signal)

    print(f"Projected data: {projected_data}")
    if isinstance(projected_data, list) and len(projected_data) > 0 and isinstance(projected_data[0], float):
        projected_data = [projected_data]
        
    training_data = []
    with open(csv_file, mode='r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            genre = row['Audio File']
            component_1 = float(row['Component 1']) if row['Component 1'] else 0.0
            component_2 = float(row['Component 2']) if row['Component 2'] else 0.0
            component_3 = float(row['Component 3']) if row.get('Component 3') else 0.0
            training_data.append((genre, [component_1, component_2, component_3]))

    distances = []
    for genre, components in training_data:
        for test_components in projected_data:
            if not isinstance(components, list) or not isinstance(test_components, list):
                raise ValueError("Both components and test_components must be lists.")
            distance = euclidean_distance(components, test_components[:len(components)])
            distances.append((distance, genre))
    
    distances.sort(key=lambda x: x[0])
    nearest_neighbors = distances[:k]
    
    genres = [neighbor[1] for neighbor in nearest_neighbors]
    return max(set(genres), key=genres.count)



audio_file = "Examples/56.wav"
csv_file = "projected_data.csv"

genre = knn_classify(audio_file, csv_file, frame_size=1024, n_components=3, k=3)
print(f"Predicted genre: {genre[:-10]}")


Projected data: [43045132481431.52, 3374.9140625, 43045132484806.44]
Predicted genre: pop
