In [1]:
import librosa
import numpy as np
import os

def segment_song(file_path, segment_duration=40, overlap=0):
    """
    Segment a song into fixed-length windows.

    Args:
    - file_path: Path to the MP3 file.
    - segment_duration: Duration of each segment in seconds.
    - overlap: Overlap between segments in seconds.

    Returns:
    - segments: List of audio segments.
    """
    # Load the audio file
    y, sr = librosa.load(file_path, sr=None)  # Load with the native sampling rate
    
    # Calculate the number of samples per segment
    samples_per_segment = segment_duration * sr
    
    # Calculate the hop length if overlap is specified
    hop_length = int((1 - overlap) * samples_per_segment)
    
    # Initialize the list to hold segments
    segments = []
    
    # Generate segments with the specified overlap
    for start in range(0, len(y) - samples_per_segment + 1, hop_length):
        end = start + samples_per_segment
        segments.append(y[start:end])
    
    return segments

# Example: Process all MP3 files in a directory
directory = '/Users/chamudi/Desktop/songs/train_data/10.mp3'
for root, dirs, files in os.walk(directory):
    for file in files:
        if file.endswith('.mp3'):
            file_path = os.path.join(root, file)
            # Segment each song with a specific overlap, e.g., 50% overlap
            segments = segment_song(file_path, overlap=0.5)
            # Further processing such as feature extraction can go here

In [2]:
import librosa
import numpy as np

def preprocess_audio(file_path, target_sr=22050, mono=True):
    """
    Load an audio file, ensuring uniform sample rate and mono channel.

    Args:
    - file_path: Path to the audio file.
    - target_sr: Target sampling rate.
    - mono: Convert audio to mono.

    Returns:
    - y: Audio time series.
    - sr: Sampling rate of `y`.
    """
    y, sr = librosa.load(file_path, sr=target_sr, mono=mono)
    return y, sr

def extract_stft_features(audio_segments, sr, n_fft=2048, hop_length=512, win_length=None):
    """
    Extract STFT features from audio segments.

    Args:
    - audio_segments: List of audio segments.
    - sr: Sampling rate.
    - n_fft: Length of the FFT window.
    - hop_length: Number of samples between successive frames.
    - win_length: Each frame of audio is windowed by `window()` of length `win_length`.

    Returns:
    - stft_features: List of STFT matrices for each segment.
    """
    stft_features = [librosa.stft(segment, n_fft=n_fft, hop_length=hop_length, win_length=win_length) for segment in audio_segments]
    return stft_features

# Example usage for a single MP3 file
file_path = '/Users/chamudi/Desktop/songs/train_data/10.mp3'
audio, sr = preprocess_audio(file_path)

# Assuming you have segmented your audio as described in the previous step
# For demonstration, here's how you might segment the preprocessed audio
segment_duration = 40  # seconds
samples_per_segment = segment_duration * sr
audio_segments = [audio[i:i+samples_per_segment] for i in range(0, len(audio), samples_per_segment)]

# Extract STFT features for each segment
stft_features = extract_stft_features(audio_segments, sr)

# At this point, `stft_features` contains the STFT matrices for each audio segment
# You might want to further process these (e.g., magnitude, power spectrum) before storage or analysis

[src/libmpg123/id3.c:process_comment():584] error: No comment text / valid description?


In [3]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda
from tensorflow.keras import backend as K

def initialize_base_network(input_shape):
    """
    Define the base network (convolutional neural network, for example) to be used within the Siamese architecture.
    """
    input = Input(shape=input_shape)
    x = Conv2D(64, (3, 3), activation='relu')(input)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    return Model(input, x)

def euclidean_distance(vectors):
    """
    Compute Euclidean distance between two vectors.
    """
    vector1, vector2 = vectors
    sum_square = K.sum(K.square(vector1 - vector2), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def euclidean_distance_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

# Assuming fingerprint shape is (128, 128, 1) for example
input_shape = (128, 128, 1)

# Initialize base network
base_network = initialize_base_network(input_shape)

# Create the left input and point to the base network
input_a = Input(shape=input_shape)
processed_a = base_network(input_a)

# Create the right input and point to the base network
input_b = Input(shape=input_shape)
processed_b = base_network(input_b)

# Compute the Euclidean distance between the two vector outputs
distance = Lambda(euclidean_distance, output_shape=euclidean_distance_output_shape)([processed_a, processed_b])

# Define the model to take the two inputs and output their distance
model = Model([input_a, input_b], distance)

In [5]:
import numpy as np

# Example data structure for pairs_train
# pairs_train shape: (number_of_pairs, 2, height, width, channels)
# For simplicity, assuming each fingerprint is 128x128 with 1 channel (grayscale)
# This is just a conceptual structure; you'll need to replace it with your actual data loading logic

# Simulated data
number_of_pairs = 1000  # This should be the actual number of pairs you have
height, width, channels = 128, 128, 1  # Adjust based on your fingerprint dimensions
pairs_train = np.random.rand(number_of_pairs, 2, height, width, channels)

# Labels: 1 for matching pairs, 0 for non-matching pairs
labels_train = np.random.randint(2, size=(number_of_pairs,))

# Now you can use pairs_train and labels_train in model.fit()
# Ensure you replace the random data generation with your actual data loading and preprocessing
model.fit([pairs_train[:, 0], pairs_train[:, 1]], labels_train, batch_size=128, epochs=10)

Epoch 1/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 1s/step - loss: 0.3039
Epoch 2/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.1340
Epoch 3/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0410
Epoch 4/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0248
Epoch 5/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0130
Epoch 6/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0076
Epoch 7/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0042
Epoch 8/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0023
Epoch 9/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0015
Epoch 10/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0011


<keras.src.callbacks.history.History at 0x2c2b97e10>

In [6]:
def contrastive_loss(y_true, y_pred):
    """
    Contrastive loss function.
    """
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

model.compile(optimizer='adam', loss=contrastive_loss)

# Assume `pairs_train` and `labels_train` contain your training data and labels, respectively
model.fit([pairs_train[:, 0], pairs_train[:, 1]], labels_train, batch_size=128, epochs=10)

Epoch 1/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 1s/step - loss: 0.0034
Epoch 2/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.1433
Epoch 3/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0499
Epoch 4/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0228
Epoch 5/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0113
Epoch 6/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0080
Epoch 7/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0054
Epoch 8/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0035
Epoch 9/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0021
Epoch 10/10
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 1s/step - loss: 0.0016


<keras.src.callbacks.history.History at 0x2c2c8bc90>

In [7]:
import librosa

def generate_query_fingerprint(file_path, sr=22050):
    y, _ = librosa.load(file_path, sr=sr, mono=True)
    S = librosa.stft(y)
    fingerprint = np.abs(S)  # Use magnitude for simplicity
    return fingerprint

In [8]:
def find_matching_song(query_fingerprint, siamese_model, threshold=0.5):
    # Placeholder for database retrieval logic
    stored_fingerprints = load_fingerprints_from_database()  # Implement this based on your database schema
    
    for stored_fingerprint in stored_fingerprints:
        # Assuming stored_fingerprint is preprocessed similarly to the training data
        # Calculate the distance or similarity score using the Siamese model
        distance = siamese_model.predict([np.expand_dims(query_fingerprint, axis=0), np.expand_dims(stored_fingerprint, axis=0)])
        
        if distance < threshold:  # Adjust threshold based on your validation results
            return True, stored_fingerprint['song_id']  # Assume each fingerprint includes song_id reference
    
    return False, None

In [11]:
from tensorflow.keras.models import Model
# Other necessary imports...

# Define your Siamese CNN architecture here...
def create_siamese_model():
    # Model definition...
    return model

# Instantiate the model
trained_siamese_model = create_siamese_model()

# Compile the model
trained_siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Assume you have your training data prepared as pairs and labels
# pairs_train, labels_train = ...

# Train the model
# trained_siamese_model.fit([pairs_train[:, 0], pairs_train[:, 1]], labels_train, epochs=10, batch_size=32)

# Now trained_siamese_model is defined and can be used for predictions

In [14]:
pip install pymysql

Collecting pymysql
  Downloading PyMySQL-1.1.0-py3-none-any.whl.metadata (4.4 kB)
Downloading PyMySQL-1.1.0-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.8/44.8 kB[0m [31m181.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.1.0
Note: you may need to restart the kernel to use updated packages.


In [17]:
import sqlite3
import numpy as np
import pickle

def load_fingerprints_from_database_sqlite(db_path='your_database_file.db'):
    fingerprints = []
    # Connect to the SQLite database
    connection = sqlite3.connect(db_path)
    cursor = connection.cursor()

    # Adjust the SQL query for SQLite, if necessary
    cursor.execute("SELECT fingerprint, song_id FROM Fingerprints")

    rows = cursor.fetchall()
    for row in rows:
        # Deserialize fingerprint (if necessary)
        fingerprint = pickle.loads(row[0])
        song_id = row[1]
        fingerprints.append((fingerprint, song_id))

    connection.close()
    return fingerprints

In [22]:
import pymysql
import numpy as np
import pickle

def load_fingerprints_from_database():
    # Placeholder list to store the fingerprints retrieved from the database
    fingerprints = []
    
    # Connect to the database
    connection = pymysql.connect(host='your_host',
                                 user='your_user',
                                 password='your_password',
                                 database='your_database',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)

    try:
        with connection.cursor() as cursor:
            # SQL query to select fingerprints (and any other relevant info)
            # Adjust the SQL based on your schema
            sql = "SELECT fingerprint, song_id FROM Fingerprints"
            cursor.execute(sql)
            
            # Fetch all the rows
            rows = cursor.fetchall()
            
            for row in rows:
                # Assuming the fingerprints are stored as BLOBs and need to be deserialized
                fingerprint = pickle.loads(row['fingerprint'])
                song_id = row['song_id']
                
                # Append a tuple of deserialized fingerprint and song_id to the list
                fingerprints.append((fingerprint, song_id))
                
    finally:
        connection.close()

    return fingerprints

In [25]:
def retrieve_song_details(song_id):
    # Placeholder for database query logic
    song_details = query_database_for_song_details(song_id)  # Implement this based on your database schema
    return song_details

def log_song_occurrence(song_details, timestamp, broadcast_details):
    # Implement logging logic here
    # This could involve writing to a file, sending to a logging service, etc.
    print(f"Match found: {song_details} at {timestamp} during {broadcast_details}")

# Example usage
query_fingerprint = generate_query_fingerprint('/Users/chamudi/Desktop/songs/train_data/7.mp3')
match_found, song_id = find_matching_song(query_fingerprint, trained_siamese_model)

if match_found:
    song_details = retrieve_song_details(song_id)
    log_song_occurrence(song_details, '2024-03-13 12:34:56', 'Example Radio Station')

[src/libmpg123/id3.c:process_comment():584] error: No comment text / valid description?


OperationalError: (2003, "Can't connect to MySQL server on 'your_host' ([Errno 8] nodename nor servname provided, or not known)")