# **Data ingestion**

Dataset: https://www.kaggle.com/datasets/maharshipandya/-spotify-tracks-dataset

Architecture Decision: Batch vs. Streaming

Ingestion Strategy: Batch ingestion (spark.read)

Justification: The source data is a static, CSV dataset (dataset.csv) provided via Kaggle. Since the data is not arriving continuously in real-time and requires a one-time load for analysis, Batch ingestion is the most efficient and appropriate method.

In [0]:
# Configuration
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType
from pyspark.sql.functions import current_timestamp, col

source_path = "/Workspace/Shared/bricks_breaks_project/spotify-pipeline/notebooks/data/dataset.csv" 
bronze_table_path = "default.spotify_bronze"
quarantine_table_path = "default.spotify_quarantine"

# Define Schema
spotify_schema = StructType([
    StructField("row_id", IntegerType(), True), 
    StructField("track_id", StringType(), False),
    StructField("artists", StringType(), True),
    StructField("album_name", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("popularity", IntegerType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("explicit", BooleanType(), True),
    StructField("danceability", FloatType(), True),
    StructField("energy", FloatType(), True),
    StructField("key", IntegerType(), True),
    StructField("loudness", FloatType(), True),
    StructField("mode", IntegerType(), True),
    StructField("speechiness", FloatType(), True),
    StructField("acousticness", FloatType(), True),
    StructField("instrumentalness", FloatType(), True),
    StructField("liveness", FloatType(), True),
    StructField("valence", FloatType(), True),
    StructField("tempo", FloatType(), True),
    StructField("time_signature", IntegerType(), True),
    StructField("track_genre", StringType(), True)
])

# Ingestion (Batch Mode)
raw_df = (spark.read
    .format("csv")
    .schema(spotify_schema)
    .option("header", "true")
    .option("mode", "PERMISSIVE") 
    .load(source_path)
)

df_with_meta = raw_df \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("source_file", col("_metadata.file_path"))

is_valid_rule = (col("track_id").isNotNull()) & (col("duration_ms") > 0)

valid_df = df_with_meta.filter(is_valid_rule)
invalid_df = df_with_meta.filter(~is_valid_rule) 

# Storage (Bronze & Quarantine)

# Saving Valid data to bronze
valid_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(bronze_table_path)

# saving invalid data to quarantine
if invalid_df.count() > 0:
    invalid_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable(quarantine_table_path)
    print(f"WARNING: {invalid_df.count()} invalid records quarantined to {quarantine_table_path}")
else:
    print("Success: No invalid records found.")
print(f"Success: {valid_df.count()} valid records ingested to {bronze_table_path}")