# Creating Fingerprints

In [None]:
pip install librosa

I initially wanted to stream the binary audio files into kafka and then transform them into fingerprints so I wouldn't have had to download them onto disk. However, kafka has a limitation at 1MB for streaming files and the audio files were well over that.

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2" pyspark-shell'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MP3 Binary Kafka Producer") \
    .getOrCreate()

# Define your HDFS silver layer path (where binary mp3 Parquet is stored)
input_hdfs_path = "hdfs://localhost:9000/lakehouse/silver/mp3_binary/"

In [None]:
binary_df = spark.read.parquet(input_hdfs_path)
binary_df.printSchema()
binary_df.select("filename").show(truncate=False)

In [None]:
kafka_ready_df = binary_df.selectExpr(
    "CAST(filename AS STRING) AS key",
    "CAST(content AS BINARY) AS value"
)

In [None]:
kafka_ready_df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "mp3_binary_stream") \
    .save()

Code for Download on disk and conversion

In [None]:
from pyspark.sql import SparkSession, Row
import os
import tempfile
import librosa
import numpy as np
from datetime import datetime

In [None]:
spark = SparkSession.builder \
    .appName("Audio Fingerprint Extraction") \
    .getOrCreate()

In [None]:
hdfs_input_path = "hdfs://localhost:9000/lakehouse/silver/mp3_binary/"
hdfs_output_path = "hdfs://localhost:9000/warehouse/fingerprints_parquet/"

In [None]:
temp_input_dir = tempfile.mkdtemp(prefix="mp3_input_")
print(f"[INFO] Temporary local input folder created at: {temp_input_folder}")

In [None]:
binary_df = spark.read.parquet(hdfs_input_path)

In [None]:
records = binary_df.select("filename", "content").collect()

In [None]:
import librosa
import numpy as np
from datetime import datetime
import os
import tempfile

fingerprint_entries = []

for row in records:
    filename = row["filename"]
    content = row["content"]

    try:
        # Save MP3 from binary
        tmp_mp3_path = os.path.join(tempfile.gettempdir(), filename)
        with open(tmp_mp3_path, "wb") as f:
            f.write(content)

        # Extract fingerprint
        y, sr = librosa.load(tmp_mp3_path, sr=44100)
        D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
        peaks = np.argwhere(D > np.percentile(D, 95)).tolist()

        fingerprint_entries.append((filename, peaks, str(datetime.now())))

    except Exception as e:
        fingerprint_entries.append((filename, None, f"ERROR: {e}"))

# Create DataFrame from results
fingerprint_df = spark.createDataFrame(fingerprint_rdd, schema=["filename", "fingerprint"])

In [None]:
fingerprint_df.repartition(10).write.mode("overwrite").parquet(hdfs_output_path)

In [12]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType, IntegerType
import librosa
import numpy as np
import os
from datetime import datetime

In [13]:
spark = SparkSession.builder.appName("MP3 Fingerprint Extraction").getOrCreate()

In [18]:
mp3_input_dir = "songs_mp3/"
hdfs_output_path = "hdfs://localhost:9000/warehouse/fingerprints/"

In [15]:
def extract_fingerprint(file_path):
    try:
        y, sr = librosa.load(file_path, sr=44100)
        D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
        peaks = np.argwhere(D > np.percentile(D, 95))  # Top 5% peaks
        flattened = [f"{row}-{col}" for row, col in peaks]  # Flatten into strings
        return Row(
            filename=os.path.basename(file_path),
            fingerprint=flattened,
            status="Success",
            message="Flattened fingerprint generated",
            timestamp=str(datetime.now())
        )
    except Exception as e:
        return Row(
            filename=os.path.basename(file_path),
            fingerprint=[],
            status="Error",
            message=str(e),
            timestamp=str(datetime.now())
        )

In [16]:
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("fingerprint", ArrayType(StringType()), True),
    StructField("status", StringType(), True),
    StructField("message", StringType(), True),
    StructField("timestamp", StringType(), True),
])

In [19]:
mp3_files = [os.path.join(mp3_input_dir, f) for f in os.listdir(mp3_input_dir) if f.endswith(".mp3")]
fingerprint_rdd = spark.sparkContext.parallelize(mp3_files).map(extract_fingerprint)

In [None]:
fingerprint_df = spark.createDataFrame(fingerprint_rdd, schema=schema)
fingerprint_df.write.mode("overwrite").parquet(hdfs_output_path)

In [None]:
df_check = spark.read.parquet(hdfs_output_path)

# Reconstruct peaks into row/col struct
df_with_peaks = df_check.withColumn(
    "reconstructed_peaks",
    F.expr("transform(fingerprint, x -> struct(cast(split(x, '-')[0] as int) as row, cast(split(x, '-')[1] as int) as col))")
)

df_with_peaks.select("filename", "reconstructed_peaks").show(truncate=False)

Writing to Parquet Locally

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
import numpy as np
import os
from datetime import datetime

# Initialize Spark
spark = SparkSession.builder.appName("Convert Fingerprints to Parquet").getOrCreate()

# Paths
fingerprint_folder = "fingerprints"
local_parquet_output = "parquet"

# Prepare data
fingerprint_data = []

for file in os.listdir(fingerprint_folder):
    if file.endswith(".npy"):
        try:
            filepath = os.path.join(fingerprint_folder, file)
            peaks = np.load(filepath, allow_pickle=True)

            fingerprint_data.append(Row(
                filename=file.replace(".npy", ".mp3"),
                fingerprint=peaks.tolist(),
                status="Success",
                message="Converted from .npy",
                timestamp=str(datetime.now())
            ))
        except Exception as e:
            fingerprint_data.append(Row(
                filename=file.replace(".npy", ".mp3"),
                fingerprint=[],
                status="Error",
                message=str(e),
                timestamp=str(datetime.now())
            ))

# Define schema
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("fingerprint", ArrayType(ArrayType(IntegerType())), True),
    StructField("status", StringType(), True),
    StructField("message", StringType(), True),
    StructField("timestamp", StringType(), True)
])

# Create DataFrame
fingerprint_df = spark.createDataFrame(fingerprint_data, schema=schema)

# Write to local Parquet
fingerprint_df.write.mode("overwrite").parquet(local_parquet_output)

25/03/16 21:10:55 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/16 21:10:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/16 21:11:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/16 21:11:13 WARN TaskSetManager: Stage 0 contains a task of very large size (4106 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Example of Writing it to Hadoop

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
import numpy as np
import os
from datetime import datetime

# Initialize Spark
spark = SparkSession.builder.appName("Convert Fingerprints to Parquet").getOrCreate()

# Paths
fingerprint_folder = "fingerprints"
local_parquet_output = "parquet"

# Prepare data
fingerprint_data = []

for file in os.listdir(fingerprint_folder):
    if file.endswith(".npy"):
        try:
            filepath = os.path.join(fingerprint_folder, file)
            peaks = np.load(filepath, allow_pickle=True)

            fingerprint_data.append(Row(
                filename=file.replace(".npy", ".mp3"),
                fingerprint=peaks.tolist(),
                status="Success",
                message="Converted from .npy",
                timestamp=str(datetime.now())
            ))
        except Exception as e:
            fingerprint_data.append(Row(
                filename=file.replace(".npy", ".mp3"),
                fingerprint=[],
                status="Error",
                message=str(e),
                timestamp=str(datetime.now())
            ))

# Define schema
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("fingerprint", ArrayType(ArrayType(IntegerType())), True),
    StructField("status", StringType(), True),
    StructField("message", StringType(), True),
    StructField("timestamp", StringType(), True)
])

# Create DataFrame
fingerprint_df = spark.createDataFrame(fingerprint_data, schema=schema)

# Write to local Parquet
fingerprint_df.write.mode("overwrite").parquet("hdfs://localhost:9000/warehouse/fingerprints_parquet/")

25/03/16 21:15:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/03/16 21:15:02 WARN TaskSetManager: Stage 1 contains a task of very large size (4106 KiB). The maximum recommended task size is 1000 KiB.
                                                                                