In [None]:
#Import Libraries
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import yt_dlp
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from scipy.signal import find_peaks
from scipy.spatial.distance import cosine
from googleapiclient.discovery import build
import time
import random 
from googleapiclient.errors import HttpError
import isodate 
from tempfile import NamedTemporaryFile #for Streamlit app
import streamlit as st #streamlit app
import joblib #streamlit/parallel jobs if needed

In [None]:
#Path for the archive file to keep track of downloaded videos
archive_file = 'downloaded_videos.txt'

#Download options
download_options = {
    'format': 'bestaudio/best',
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'wav',
        'preferredquality': '192',
    }],
    'outtmpl': 'film_scores/%(title)s.%(ext)s',
    'ffmpeg_location': 'C:\\ffmpeg\\bin',  
    'download_archive': archive_file,  #We need to prevent re-downloading (it was looping before)
    'noplaylist': False,  # Set to False since  some of the data we are downloading comes from playlists
}

#Ensure the output directory exists
os.makedirs("film_scores", exist_ok=True)

In [None]:
#Extracting Features
audio_folder = "film_scores/" #This folder is in my google drive under unstructed data analytics 

dataset = [] #create an empy data set 

print("Starting feature extraction...")

for file in os.listdir(audio_folder): #create a for loop for .wav files in that directory and load all of the txt.files in 
    if file.endswith(".wav"):
        print(f"Processing {file}...")  # Print current file being processed so we can track for debugging
        file_path = os.path.join(audio_folder, file)
        y, sr = librosa.load(file_path, sr=None)

 #Pull out mfccs(frequencies), rms energy(loudness over time...basically in decibals, mfccs, so the frequency of sound waves, and zero crossing rate (times it crosses over the x axis)
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).mean(axis=1) #doing more research on these
        rms_energy = librosa.feature.rms(y=y).mean()
        zcr = librosa.feature.zero_crossing_rate(y).mean()

        dataset.append([file, *mfccs, rms_energy, zcr]) #pull all 3 variables and put them together into a dataframe

#Save to CSV
columns = ["File_name"] + [f"mfcc_{i}" for i in range(13)] + ["rms_energy", "zcr"]

df = pd.DataFrame(dataset, columns=columns)

df.to_csv("Film_Scores_Dataset.csv", index=False) #Create a csv (Might update this to an absolute path later in ym c drive)

print("Dataset saved as Film_Scores_Dataset.csv")

In [None]:
print("Files in the audio folder:")
for file in os.listdir(audio_folder): #List out all of the tracks I have 
    print(file)

In [None]:
# Plot waveforms (you may need to specify an actual file path if needed)
file_name = "Star Wars Episode IV A New Hope (1977) Soundtrack 23 The Battle of Yavin.wav" #Here is where we will select the one track we want to transition FROM
file_path = os.path.join(audio_folder, file_name.strip()) 

if os.path.exists(file_path):
    print("Loading and Processing File") #Tell us if this file exists and if it doads, load it in
    y, sr = librosa.load(file_path, sr=None)

    #Spectrogram
    D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max) #this helps us visualize frequencies by decibal level/instrument range
    
    #MFCCs for more frequency visual analysis
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) 
    mfccs_mean = np.mean(mfccs, axis=1)
    
    #Zero-Crossing Rate (ZCR) to visualize where it crosses over x axis
    zcr = librosa.feature.zero_crossing_rate(y)[0]
    
    #RMS Energy to visualize loudness
    rms_energy = librosa.feature.rms(y=y)[0]

In [None]:
 #Plotting the  Spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(D, x_axis='time', y_axis='log', sr=sr)
plt.title('Spectrogram')
plt.colorbar(format='%+2.0f dB')
plt.show()

In [None]:
#Plotting MFCCs
plt.figure(figsize=(10, 4))
librosa.display.specshow(mfccs, x_axis='time', sr=sr)
plt.title('MFCCs')
plt.colorbar()
plt.show()

In [None]:
#Plotting Zero-Crossing Rate
plt.figure(figsize=(10, 4))
plt.plot(zcr, label="Zero Crossing Rate")
plt.title('Zero Crossing Rate')
plt.show()

In [None]:
#Plotting RMS Energy
plt.figure(figsize=(10, 4))
plt.plot(rms_energy, label="RMS Energy")
plt.title('RMS Energy')
plt.show()

In [None]:
#Find Peaks in RMS Energy (This can help us detect relative climaxes in the song that can be compared to other tracks)
peaks, _ = find_peaks(rms_energy, height=np.mean(rms_energy))
peak_times = librosa.frames_to_time(peaks, sr=sr)
print("Recommended Transition Times:", peak_times)

In [None]:
#Building a Model
#Variable index
#Ensuring that rms_energy.mean() and zcr.mean() are the same shape as mfccs_mean
rms_energy_mean = rms_energy.mean()  
zcr_mean = zcr.mean()  

#Repeat scalar values to match the length of mfccs_mean
rms_energy_mean = np.repeat(rms_energy_mean, len(mfccs_mean))
zcr_mean = np.repeat(zcr_mean, len(mfccs_mean))

#Stacking them together
features = np.column_stack([mfccs_mean, rms_energy_mean, zcr_mean])
print("Feature Matrix (first 5 rows):", features[:5])

In [None]:
#Training the model
print("Splitting data for training...")
X = features  # Feature matrix (MFCCs + RMS Energy + ZCR)
Y = np.random.randint(0, 2, X.shape[0]) 

#Actual splits at 80% train 20% test
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

#Training the model (This is a classification model i.e ranked ordered levels based on similarity)...checking to see what the target variable is 
print("Training RandomForestClassifier...")
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, Y_train)

#Predict with the model and check accuracy
Y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(Y_test, Y_pred))

#Recommend transitions
print("Recommending transition times...")
rms_energy = librosa.feature.rms(y=y)[0]
peaks, _ = find_peaks(rms_energy, height=np.mean(rms_energy))
peak_times = librosa.frames_to_time(peaks, sr=sr)
print("Recommended Transition Times:", peak_times)

In [None]:
print("Current working directory:", os.getcwd()) #Tell me where I'm working out of just to make sure 

In [None]:

#Create a function to extract features 
def extract_features_with_labels(file_path, window_size=2048, hop_size=512):
    y, sr = librosa.load(file_path, sr=None)

    #Extract features from each file in our file path
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    rms = librosa.feature.rms(y=y)[0]
    zcr = librosa.feature.zero_crossing_rate(y=y)[0]

    #Convert frames to time values
    times = librosa.frames_to_time(np.arange(len(rms)), sr=sr, hop_length=hop_size)

    #Detect peaks in RMS (possible transitions)
    peaks, _ = find_peaks(rms, height=np.mean(rms) * 1.2)  # Peak = 1.2x mean energy (Might calibrate need to research)

    #Label each frame (1 = transition, 0 = not)
    labels = np.zeros_like(rms)
    labels[peaks] = 1  # Mark peaks as transitions

    #Create a list to store the data from each feature
    data = []
    for i in range(len(rms)):
        features = list(mfccs[:, i]) + [rms[i], zcr[i], labels[i]]
        data.append(features)

    return pd.DataFrame(data, columns=[f"MFCC_{i}" for i in range(13)] + ["RMS", "ZCR", "Label"])

#This is where we will store them
audio_folder = r"G:\My Drive\Unstructured Data Analytics\film_scores"
dataframes = []

# Process each file
for file_name in os.listdir(audio_folder):
    if file_name.endswith('.wav'):
        file_path = os.path.join(audio_folder, file_name)
        try:
            df = extract_features_with_labels(file_path)
            df["File"] = file_name  # Add file name for tracking
            dataframes.append(df)
            print(f"Processed: {file_name}")
        except Exception as e:
            print(f"Error processing {file_name}: {e}")

#Create a final dataframe
final_df = pd.concat(dataframes, ignore_index=True)

#Save for model training
final_df.to_csv("transition_training_data.csv", index=False)
print("Data saved to transition_training_data.csv")

In [None]:
##Training Random Forest Binary Classification Model

df = pd.read_csv("transition_training_data.csv")

#Split into X and Y
X = df.drop(columns=["Label", "File"])  # Features (MFCCs, RMS, ZCR)
Y = df["Label"]  # Labels (1 = transition, 0 = no transition)

#Create an 80/20 train test split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

#The model itself
print("Training RandomForestClassifier...")
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, Y_train)

#Predict with the Classification model
Y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(Y_test, Y_pred))

#Save it for future
import joblib
joblib.dump(model, "transition_model.pkl")
print("Model saved as transition_model.pkl")


In [None]:
#Create a new prediction on untrained data with the model
model = joblib.load("transition_model.pkl")

#Load the new song's features
new_song = extract_features_with_labels("new_song.wav") #Insert song here 

#Predict transitions
X_new = new_song.drop(columns=["Label"])  # No labels needed for prediction
predictions = model.predict(X_new)

#Get transition times
transition_times = new_song["Time"][predictions == 1]

print("Predicted transition times:", transition_times)


In [None]:

# Load the trained model
model = joblib.load("transition_model.pkl")

# Feature extraction function
def extract_features_with_labels(file_path):
    y, sr = librosa.load(file_path, sr=None)
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).mean(axis=1)
    rms = librosa.feature.rms(y=y).mean()
    zcr = librosa.feature.zero_crossing_rate(y).mean()
    features = np.hstack([mfccs, rms, zcr]).reshape(1, -1)  # Ensure correct shape
    return features

# Streamlit UI
st.title("Music Transition Prediction App")
st.write("Upload a .wav file to predict the best transition points.")

# File uploader
uploaded_file = st.file_uploader("Upload a .wav file", type=["wav"])

if uploaded_file:
    # Save uploaded file to a temporary location
    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file:
        temp_file.write(uploaded_file.read())
        temp_file_path = temp_file.name

    # Extract features
    new_song_features = extract_features_with_labels(temp_file_path)