# **Karaoke Scoring System**

### **Overview:**
The Karaoke Scoring System is meticulously designed to evaluate a user's singing performance against an original track. Utilizing advanced audio processing techniques and alignment strategies, it delivers precise and insightful scoring, ensuring users gain deep insights into their performance.

### **KaraokeData:**
At the core of our system is the `KaraokeData` class, serving as the single access point for essential data for a particular song: the original singer's audio, the instrumental track, and synchronized lyrics. Beyond just storage, this class adeptly parses lyrics into a structured format, ensuring time-specific lyric extraction, which is paramount for aligning user feedback with distinct moments in the song.

#### **Utilization Within KaraokeData:**
- The **original singer's audio** sets the standard for user performance comparisons.
- The **instrumental track** is instrumental in audio preprocessing, aiding in identifying and attenuating background noises.
- **Synchronized lyrics** enhance the user experience, providing context to the feedback and ensuring precision in alignment.

### **AudioPreprocessor:**
The `AudioPreprocessor` class refines the user's audio through:
1. **Normalization**: Adjusting the audio to have zero mean and unit variance.
2. **Silence Trimming**: Removing any leading and trailing silences from the user's audio.
3. **Spectral Gate**: Filtering out frequencies below a threshold, significantly reducing low-level noise.
4. **Adaptive Noise Reduction**: Harnessing the instrumental track to pinpoint and eliminate background noise from the user's audio.
5. **Voice Activity Detection (VAD)**: Spotting segments where the user is actively singing, ensuring the vocal's prominence over potential background disturbances.

### **Scoring Mechanisms:**
Our system leverages diverse metrics to deliver a well-rounded evaluation:
1. **Linguistic Accuracy Score**: Employs Google's Speech Transcription service to transcribe the user's audio to text. This transcribed text is then matched with the original lyrics, determining pronunciation and word accuracy.
2. **Amplitude Matching Score**: Utilizes Dynamic Time Warping (DTW) to compare amplitude profiles between the user's audio and the original.
3. **Pitch Matching Score**: Investigates the fundamental frequency contours of both the user's and original audio, ensuring tonal alignment.
4. **Rhythm Score**: Contrasts onset patterns between the user's performance and the original, assessing synchronization and timing.

In [None]:
import os
import librosa
import numpy as np
import pandas as pd


In [None]:
from audio_vis import AudioVis
from karaoke_data import KaraokeData
from audio_scorer import AudioScorer
from audio_preprocessor import AudioPreprocessor
from google_speech_transcription import GoogleSpeechTranscription

av = AudioVis()


## Load data

In [None]:
# Paths
base_dir = "data"
lyrics_dir = os.path.join(base_dir, "SongsLyrics", "Lyrics")
track_dir = os.path.join(base_dir, "SongsLyrics", "Track")
voice_dir = os.path.join(base_dir, "SongsLyrics", "Voice")

# Generate the dictionary
data_dict = {}

# Fill attempted songs
for song_file in os.listdir(base_dir):
    if song_file.endswith(".wav"):
        song_id = os.path.splitext(song_file)[0]
        data_dict[song_id] = {"Attempted": os.path.join(base_dir, song_file)}

# Fill lyrics
for lyrics_file in os.listdir(lyrics_dir):
    song_id = lyrics_file.split('_')[0]
    if song_id in data_dict:
        data_dict[song_id]["Lyrics"] = os.path.join(lyrics_dir, lyrics_file)

# Fill tracks
for track_file in os.listdir(track_dir):
    song_id = os.path.splitext(track_file)[0]
    if song_id in data_dict:
        data_dict[song_id]["Track"] = os.path.join(track_dir, track_file)

# Fill voices
for voice_file in os.listdir(voice_dir):
    if "voice_1" in voice_file:
        song_id = voice_file.split('_')[0]
        if song_id in data_dict:
            data_dict[song_id]["Original"] = os.path.join(voice_dir, voice_file)
    elif "voice_2" in voice_file:
        song_id = voice_file.split('_')[0]
        if song_id in data_dict:
            data_dict[song_id]["Original Second"] = os.path.join(voice_dir, voice_file)
    else:
        song_id = os.path.splitext(voice_file)[0]
        if song_id in data_dict:
            data_dict[song_id]["Original"] = os.path.join(voice_dir, voice_file)

# Print a sample
print(data_dict.get("42029", {}))


In [None]:
# Initialize counters
all_files_count = 0
only_attempt_count = 0
only_original_count = 0

usable_ids = []

# Iterate through the dictionary to count
for song_id, song_data in data_dict.items():
    if "Attempted" in song_data and "Lyrics" in song_data and "Track" in song_data and ("Original" in song_data or "Original Second" in song_data):
        usable_ids.append(song_id)
        all_files_count += 1

# Print the results
print(f"Number of IDs with all files: {all_files_count}")
print(usable_ids)


In [None]:
def get_song_data(song_id):
    song_data = data_dict.get(song_id, {})
    if "Attempted" in song_data and "Lyrics" in song_data and "Track" in song_data and ("Original" in song_data or "Original Second" in song_data):
        original_audio, sr = librosa.load(song_data['Original'], sr=None, mono=True)
        attempted_audio, sr = librosa.load(song_data['Attempted'], sr=None, mono=True)
        track_audio, sr = librosa.load(song_data['Track'], sr=None, mono=True)
        return original_audio, attempted_audio, track_audio, song_data['Lyrics'], sr
    else:
        return None

# Get the song data
song_data = get_song_data("44957")
print(song_data)


In [None]:
original_audio, attempted_audio, track_audio, raw_lyrics_data, sr = get_song_data("27256")


In [None]:
av = AudioVis()

av.wav_plot(original_audio, sr, title="Original Audio")
av.play_audio(original_audio, sr)

av.wav_plot(attempted_audio, sr, title="Attempted Audio")
av.play_audio(attempted_audio, sr)

av.wav_plot(track_audio, sr, title="Track Audio")
av.play_audio(track_audio, sr)


In [None]:
# To simulate receiving audio in chunks, I have created split_into_chunks
def split_into_chunks(audio, num_chunks=5):
    """Splits the audio data into a specified number of chunks."""
    chunk_size = len(audio) // num_chunks
    chunks = [audio[i:i + chunk_size] for i in range(0, len(audio), chunk_size)]
    return chunks[:num_chunks]


In [None]:
chunks = split_into_chunks(attempted_audio, 10)

chunk = chunks[0]
av.wav_plot(chunk, sr, title="Original Audio")
av.play_audio(chunk, sr)


## KaraokeData

In [None]:
# Initialize KaraokeData
karaoke_data = KaraokeData(original_audio=original_audio, track_audio=track_audio, raw_lyrics_data=raw_lyrics_data, sampling_rate=sr)


### Parsing Lyrics:


In [None]:
parsed_lyrics = karaoke_data.lyrics_data
print(parsed_lyrics[:5])  # Displaying the first 5 parsed lyric entries for brevity


### Audio Alignment

In [None]:
karaoke_data.reset_alignment()  # Resetting any prior alignments
karaoke_data.align_audio(chunk, method="start")
print(f"Position after start alignment: {karaoke_data.current_position}")


Align Using Lyrics Data: This method uses the first entry in the parsed lyrics data to align the audio.

In [None]:
karaoke_data.reset_alignment()  # Resetting any prior alignments
karaoke_data.align_audio(chunk, method="lyrics_data")
print(f"Position after lyrics data alignment: {karaoke_data.current_position}")


Align Using Onset Detection:
This method aligns the audio by detecting onsets in both the original audio and the provided audio chunk. It then attempts to align the first onset of the chunk with the corresponding onset in the original.

In [None]:
karaoke_data.reset_alignment()  # Resetting any prior alignments
karaoke_data.align_audio(chunk, method="onset_detection")
print(f"Position after onset detection alignment: {karaoke_data.current_position}")


Align Using Cross-Correlation:
This method computes the cross-correlation between the original audio and the provided audio chunk to find the best alignmen

In [None]:
karaoke_data.reset_alignment()  # Resetting any prior alignments
karaoke_data.align_audio(chunk, method="cross_correlation")
print(f"Position after cross-correlation alignment: {karaoke_data.current_position}")


### Audio Segment Retrieval:

In [None]:
segment_length = len(chunk)  # Using the length of the first audio chunk
retrieved_original_segment, retrieved_track_segment = karaoke_data.get_next_segment(segment_length)


In [None]:
av.wav_plot(chunk, sr, title="Chunk Audio")
av.play_audio(chunk, sr)

av.wav_plot(retrieved_original_segment, sr, title="Original Audio")
av.play_audio(retrieved_original_segment, sr)

av.wav_plot(retrieved_track_segment, sr, title="Track Audio")
av.play_audio(retrieved_track_segment, sr)


In [None]:
segment_lyrics = karaoke_data.get_lyrics()
print(segment_lyrics)


## Preprocessing Audio Chunks

In [None]:
ap = AudioPreprocessor()


In [None]:
def demonstrate_effect(before, after, sr, effect_name, visualization_functions):
    """
    Demonstrates the effect of a preprocessing function by playing and visualizing:
    - The original audio
    - The processed audio
    - (Optional) The removed audio (difference between the original and processed audio)
    - Visualizations specified in visualization_functions for each of the audios
    """
    # Play original audio
    print(f"Original Audio ({effect_name}):")
    av.play_audio(before, sr)

    # Play processed audio
    print(f"\nTransformed Audio ({effect_name}):")
    av.play_audio(after, sr)

    same_length = len(before) == len(after)

    # If the lengths are the same, play the difference audio
    if same_length:
        difference = before - after
        print(f"\nRemoved Audio ({effect_name}):")
        av.play_audio(difference, sr)

    # Display visualizations
    for viz_func in visualization_functions:
        print(f"\nOriginal Audio - {effect_name}:")
        viz_func(before, sr)

        print(f"\nTransformed Audio - {effect_name}:")
        viz_func(after, sr)

        # If the lengths are the same, visualize the difference audio
        if same_length:
            print(f"\nDifference - {effect_name}:")
            viz_func(difference, sr)


### Trim Audio

Description: Trimming silences involves removing any leading or trailing silent parts from an audio signal. This can be useful to eliminate unnecessary silent portions which don't contribute to the actual content.

Implementation: The trim_audio function uses the librosa.effects.trim function to achieve this. The top_db parameter defines a threshold in decibels below which the audio is considered silent.

In [None]:
# vf = [av.wav_plot, av.plot_spectrogram, av.plot_mfcc]
vf = [av.wav_plot]
trimmed_chunk = ap.trim_audio(chunk)
# demonstrate_effect(chunk, trimmed_chunk, sr, "Trimming", vf)


### Normalize Audio

Description: Normalization adjusts the audio amplitude so that its average amplitude is zero, and its standard deviation is one. This ensures that the audio's loudness is relatively consistent, which can be beneficial for further processing or analysis.

Implementation: The _normalize_segment function subtracts the mean from the audio segment and then divides by the standard deviation. The normalize_audio function can normalize the entire audio or perform segment-wise normalization if a segment_length is provided.

In [None]:
# vf = [av.wav_plot, av.plot_spectrogram, av.plot_mfcc]
normalized_chunk = ap.normalize_audio(chunk)
demonstrate_effect(chunk, normalized_chunk, sr, "Normalization", vf)


### Spectral Gate

Description: This involves suppressing frequency components of the signal below a certain threshold. It helps in reducing noise or undesired frequencies.

Implementation: In the spectral_gate function, an STFT (Short-Time Fourier Transform) is performed, and any frequencies below the threshold are set to zero. The processed signal is then reconstructed using the inverse STFT.

In [None]:
spectral_gated_chunk = ap.spectral_gate(chunk, threshold=0.1)
demonstrate_effect(chunk, spectral_gated_chunk, sr, "Spectral Gating", vf)


### Adaptive Noise Reduction

Description: Adaptive noise reduction aims to reduce background noise from the user's audio using a reference (typically the instrumental track). By comparing the reference track with the user's audio, it identifies and subtracts common background elements, reducing interference or bleed from the instrumental.

Implementation: In the given code, the method named spectral_masking is used for this purpose. It calculates a mask based on the ratio of magnitudes of the user audio to the combined magnitudes of the user and reference audios. This mask, when applied to the user's audio STFT, emphasizes the parts where the user's audio is dominant (like vocals) and suppresses the parts that are common with the reference (like instrumental bleed).

In [None]:
adaptively_reduced_chunk = ap.adaptive_noise_reduction(chunk, retrieved_track_segment, sr)
demonstrate_effect(chunk, adaptively_reduced_chunk, sr, "Adaptive Noise Reduction", vf)


### Voice Activity Detection

Description: VAD is employed to detect when a person is speaking/singing in an audio clip. This is valuable when you want to separate or focus on vocal content and exclude long silences or background noise.

Implementation: The voice_activity_detection function uses the librosa.effects.split function, which identifies segments of the signal that are above a certain loudness threshold.

In [None]:
vad_chunk = ap.voice_activity_detection(chunk, sr, top_db=5)  # Adjust the top_db value as needed
demonstrate_effect(chunk, vad_chunk, sr, "Voice Activity Detection", vf)


### Source Separation

Description: Source separation is the process of separating the main audio source from the rest of the audio. The method used here employs Non-negative Matrix Factorization (NMF) on the Mel spectrogram of the audio chunk. NMF factorizes the spectrogram into two matrices: the components matrix and the activations matrix. Each row of the components matrix can be thought of as a "template" spectrum, and the corresponding row of the activations matrix tells when that template is active.

Implementation: In the method source_separation, the code computes the Mel spectrogram of the input audio chunk, then performs NMF to get the components and activations. The main audio source is identified as the component with the highest sum of activations, and it is then synthesized back into the time domain to produce the separated main audio.

In [None]:
def source_separation(audio_chunk: np.array, sr: int = 22050) -> np.array:
    """Separates the harmonic component using Harmonic/Percussive source separation."""
    # Separate harmonic and percussive components
    harmonic, _ = librosa.effects.hpss(audio_chunk)
    return harmonic


In [None]:
source_separated_chunk = source_separation(chunk, sr)
demonstrate_effect(chunk, source_separated_chunk, sr, "Source Separation", vf)


### Spectral Masking

Description: Spectral masking emphasizes certain frequency components based on a reference signal. This can help in reducing interference or background sounds.

Implementation: The spectral_masking function calculates a mask based on the ratio of magnitudes of the user audio to the sum of magnitudes of the user and reference audios. This mask is then applied to the user's audio STFT, and the processed audio is reconstructed.

In [None]:
masked_chunk = ap.spectral_masking(chunk, retrieved_track_segment)
demonstrate_effect(chunk, masked_chunk, sr, "Spectral Masking", vf)


### Pipeline

In [None]:
def demonstrate_pipeline(audio_chunk, pipeline, sr, **kwargs):
    """Demonstrates the effect of a preprocessing pipeline."""
    processed_audio = AudioPreprocessor.preprocess_audio(audio_chunk, pipeline, **kwargs)
    pipeline_name = " -> ".join(pipeline)
    vf = [av.wav_plot]
    demonstrate_effect(audio_chunk, processed_audio, sr, pipeline_name, vf)

# Define the pipelines
pipeline_1 = ["normalize"]
pipeline_2 = ["adaptive_noise_reduction", "normalize"]
pipeline_3 = ["adaptive_noise_reduction", "source_separation", "normalize"]

# Additional arguments for the pipelines
pipeline_args = {
    "adaptive_noise_reduction": {"reference_audio": retrieved_track_segment}
}

# Apply and demonstrate each pipeline
demonstrate_pipeline(chunk, pipeline_1, sr)
demonstrate_pipeline(chunk, pipeline_2, sr, **pipeline_args)
demonstrate_pipeline(chunk, pipeline_3, sr, **pipeline_args)


## AudioScorer

**Linguistic Accuracy**: The transcription is used to determine how closely the sung content matches the actual lyrics. This is a `qualitative measure`.

**Amplitude, Pitch, and Rhythm Matching**: These are `quantitative measures`. They compare the user's sung audio features with the reference (original) audio. 

In [None]:
transcriber = GoogleSpeechTranscription()

#fastdtw is suppose to be much faster but has bug
audio_scorer = AudioScorer(transcriber, 'dtaidistance_fast')


###  Linguistic Accuracy Score

In [None]:
print(karaoke_data.get_lyrics())


In [None]:
transcriber.transcribe(chunk, sr)


The problem here is because the audio is long, for short audio this will work fine

In [None]:
linguistic_score = audio_scorer.linguistic_accuracy_score(chunk, sr, segment_lyrics)
print(f"Linguistic Accuracy Score: {linguistic_score:.2f}")


### Rhythm Score:

**Explanation**: Rhythm score quantifies how closely the rhythm of a user's audio matches a reference audio. It can be computed using onset strength, which is a measure of the abruptness of sound changes.

**Implementation**: It compute onset strength for both user audio and reference audio using the `librosa.onset.onset_strength` function. It then computes the Dynamic Time Warping (DTW) similarity between these onset strength sequences to generate a rhythm score.

In [None]:
rhythm_score = audio_scorer.rhythm_score(np.array(chunk), retrieved_original_segment)
print("Rhythm Score:", rhythm_score)


###  Pitch Matching Score:

**Explanation**: Pitch matching score assesses how closely the pitch contour of a user's audio aligns with that of a reference audio. Pitch contour is the variation of pitch over time.

**Implementation**: Uses the `librosa.pyin` function to extract pitch sequences from the user audio and reference audio. It then computes the DTW similarity between these pitch sequences to yield the pitch matching score.

In [None]:
pitch_score = audio_scorer.pitch_matching_score(chunk, retrieved_original_segment)
print("Pitch Matching Score:", pitch_score)


### Amplitude Matching Score

**Explanation**: Amplitude matching score evaluates how well the amplitude envelope of a user's audio matches that of a reference audio.

**Implementation**: Flattens the multi-dimensional audio arrays to 1D using `numpy.flatten`, then computes the DTW similarity between these 1D amplitude sequences to derive the amplitude matching score.

In [None]:
audio_scorer = AudioScorer(transcriber, 'dtaidistance_fast')


In [None]:
amplitude_score = audio_scorer.amplitude_matching_score(chunk, retrieved_original_segment, sr)
print("Amplitude Matching Score:", amplitude_score)


## Full Pipeline

In [None]:
transcriber = GoogleSpeechTranscription()

class AudioProcessingPipeline:
    def __init__(self, karaoke_data, attempted_audio, sr):
        self.sr = sr
        self.attempted_audio = attempted_audio
        self.karaoke_data = karaoke_data
        self.ap = AudioPreprocessor()
        self.audio_scorer = AudioScorer(transcriber, 'dtaidistance_fast')

    def process_and_score(self, original_pipeline, chunk_audio_pipeline):
        total_scores = {
            "linguistic_score": 0,
            "amplitude_score": 0,
            "pitch_score": 0,
            "rhythm_score": 0,
            }
        num_chunks = 0
        for chunk in split_into_chunks(self.attempted_audio, 20):
            if num_chunks == 0:
                self.karaoke_data.reset_alignment()
                self.karaoke_data.align_audio(chunk, method="start")
            original_segment, track_segment = self.karaoke_data.get_next_segment(len(chunk))
            original_processed = self.ap.preprocess_audio( original_segment, original_pipeline,)
            chunk_processed = self.ap.preprocess_audio(chunk, chunk_audio_pipeline, reference_audio=track_segment)

            scores = self.audio_scorer.process_audio_chunk(
                chunk_processed, original_processed, self.karaoke_data.get_lyrics(), self.sr
            )
            num_chunks += 1
            for score_name, score_value in scores.items():
                total_scores[score_name] += score_value

            print(scores)

        # For now just computing average score
        average_scores = {score_name: score_value / num_chunks for score_name, score_value in total_scores.items()}
        return average_scores


In [None]:
def process_all_songs(usable_ids):
    scores = {}
    for song_id in usable_ids:
        original_audio, attempted_audio, track_audio, raw_lyrics_data, sr = get_song_data(song_id)
        karaoke_data = KaraokeData(
            original_audio=original_audio,
            track_audio=track_audio,
            raw_lyrics_data=raw_lyrics_data,
            sampling_rate=sr
        )
        audio_pipeline = AudioProcessingPipeline(karaoke_data, attempted_audio, sr)
        average_score = audio_pipeline.process_and_score(
            original_pipeline=[],
            chunk_audio_pipeline=[]
            # If any preproceesing is done google is returning empty transcription
            # original_pipeline=["spectral_gate", "normalize"],
            # chunk_audio_pipeline=["adaptive_noise_reduction", "spectral_gate", "normalize"]
        )
        scores[song_id] = average_score
    return scores


In [None]:
all_scores = process_all_songs(['27256', '58659'])
for song_id, score in all_scores.items():
    print(f"Song ID: {song_id}, Average Score: {score}")

    # Playing the song
    original_audio, _, _, _, sr = get_song_data(song_id)
    AudioVis().play_audio(original_audio, sr)
