In [2]:
import os
import librosa
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, rand
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Audio Classification") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Base directory for audio files
base_dir = os.path.expanduser('~/Downloads/data')
directories = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]

# Function to extract MFCC features
def extract_features(file_path):
    audio, sample_rate = librosa.load(file_path, sr=None)
    mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13)
    return mfcc.mean(axis=1).tolist()

# Load and preprocess data
audio_features = []
for directory in directories:
    genre = directory
    path = os.path.join(base_dir, directory)
    audio_files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith('.wav')]
    for file in audio_files:
        features = extract_features(file)
        audio_features.append((genre, file, features))

# Create DataFrame
df_audio = pd.DataFrame(audio_features, columns=['genre', 'file_path', 'audio_features'])
sdf_audio = spark.createDataFrame(df_audio)

# Convert array of doubles to Vector
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
sdf_audio = sdf_audio.withColumn("features", list_to_vector_udf("audio_features"))

# Index labels
indexer = StringIndexer(inputCol="genre", outputCol="label")
sdf_audio = indexer.fit(sdf_audio).transform(sdf_audio)

# Split the data
train_data, test_data = sdf_audio.randomSplit([0.8, 0.2])

# Logistic Regression Model
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10, family="multinomial")
model = lr.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1 Score: ", f1_score)

spark.stop()


TypeError: 'bytes' object cannot be interpreted as an integer