# Large-Scale Data Engineering for AI
## The Data Engineering Pipeline

In [None]:
# Imports and requirements
import os
import datetime
import kaggle
import shutil
import pandas as pd
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, regexp_replace, trim, lower, upper, to_date, year, month, dayofmonth, explode, split


#### Logging

In [8]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

#### Create SparkSession

#### Function definition

In [10]:
def create_spark_session(app_name="Spotify_ETL"):
    """
    Creates and returns a Spark session.
    
    Args:
        app_name (str): The name of the Spark application.
        
    Returns:
        SparkSession: A configured Spark session.
    """
    logger.info(f"Creating Spark session with app name: {app_name}")
    
    try:
        # Create a SparkSession with appropriate settings
        spark = (SparkSession.builder
                .appName(app_name)
                .config("spark.sql.execution.arrow.pyspark.enabled", "true")
                .config("spark.sql.shuffle.partitions", "10")
                .config("spark.driver.memory", "2g")
                .config("spark.executor.memory", "4g")
                .config("spark.default.parallelism", "4")
                .config("spark.sql.adaptive.enabled", "true")
                .getOrCreate())
        
        # Set log level to ERROR to reduce verbosity
        spark.sparkContext.setLogLevel("ERROR")
        
        logger.info("Spark session created successfully")
        return spark
    
    except Exception as e:
        logger.error(f"Error creating Spark session: {e}")
        raise

#### Execution

In [11]:

spark = create_spark_session()

2025-04-12 12:44:16,129 - __main__ - INFO - Creating Spark session with app name: Spotify_ETL
2025-04-12 12:44:21,013 - __main__ - INFO - Spark session created successfully
2025-04-12 12:44:21,013 - __main__ - INFO - Spark session created successfully


### Landing Zone

In [5]:
# Base directory and Landing Zone directory structure
BASE_DIR = "../../data"
LANDING_ZONE_DIR = os.path.join(BASE_DIR, "landing_zone")
os.makedirs(LANDING_ZONE_DIR, exist_ok=True)

kaggle.api.authenticate()

# List of datasets to ingest from Kaggle
datasets = [
        {
            "kaggle_id": "asaniczka/top-spotify-songs-in-73-countries-daily-updated",
            "dataset_name": "top-spotify-songs-by-country",
            "update": True
        },
        {
            "kaggle_id": "maharshipandya/-spotify-tracks-dataset",
            "dataset_name": "spotify-tracks-dataset",
            "update": False
        },
        {
            "kaggle_id": "terminate9298/songs-lyrics",
            "dataset_name": "songs-lyrics",
            "update": False
        }
        ]

In [12]:
def data_collector_kaggle(kaggle_dataset: dict) -> None:
    """
    Downloads a dataset from Kaggle and saves it to the landing zone.

    Parameters:
    kaggle_dataset (dict): A dictionary containing the Kaggle dataset information.
    """

    # Extract dataset information
    kaggle_id = kaggle_dataset["kaggle_id"]
    dataset_name = kaggle_dataset["dataset_name"]

    # Create a temporary directory for the dataset using the actual dataset name
    dataset_folder = os.path.join(LANDING_ZONE_DIR, f"temp_{dataset_name}")
    os.makedirs(dataset_folder, exist_ok=True)

    try:  
        logging.info(f"Downloading dataset: {kaggle_id}")

        kaggle.api.dataset_download_files(
            kaggle_id,
            path=dataset_folder,
            unzip=True
        )

        csv_found = False
        for filename in os.listdir(dataset_folder):
            if filename in ['songs_details.csv', 'album_details.csv']:
                continue
            if filename.endswith(".csv"):
                csv_found = True
                csv_path = os.path.join(dataset_folder, filename)

                # Read CSV with Pandas
                df = pd.read_csv(csv_path)

                # Write as a single Parquet file
                final_path = os.path.join(LANDING_ZONE_DIR, f"{dataset_name}.parquet")
                df.to_parquet(final_path, index=False)

                logging.info(f"CSV '{filename}' converted to single Parquet file and saved as '{final_path}'.")
                
        if not csv_found:
            logging.info(f"No CSV file found in the downloaded dataset. Check the contents of the download.")
            
        # Remove the temporary dataset folder
        shutil.rmtree(dataset_folder)

    except Exception as e:
         # Remove the dataset folder if it exists
        if os.path.exists(dataset_folder):
            shutil.rmtree(dataset_folder)

        # Log the error
        logging.error(f"Error downloading dataset '{kaggle_id}': {e}")
       
        return

    # Log the successful download
    logging.info(f"Dataset '{dataset_name}' downloaded successfully.")

def download_and_store_datasets(update: bool = False) -> None:
    """
    Downloads and stores datasets from Kaggle into the landing zone.
    """
    logging.info("Starting the creation of the Landing Zone using Kaggle API")

    for kaggle_dataset in datasets:
        if update and not kaggle_dataset["update"]:
            logging.info(f"Skipping dataset '{kaggle_dataset['dataset_name']}' as update is set to False.")
            continue
        try:
            dataset_name = kaggle_dataset["dataset_name"]
            data_collector_kaggle(kaggle_dataset)
            logging.info(f"Dataset '{dataset_name}' processed successfully.")
        except Exception as e:
            logging.error(f"Error processing dataset '{dataset_name}': {e}")

    logging.info("All datasets have been processed.")
    logging.info("Landing Zone creation completed.")

download_and_store_datasets(update=False)

2025-04-12 12:45:53,082 - root - INFO - Starting the creation of the Landing Zone using Kaggle API
2025-04-12 12:45:53,083 - root - INFO - Downloading dataset: asaniczka/top-spotify-songs-in-73-countries-daily-updated
2025-04-12 12:45:53,083 - root - INFO - Downloading dataset: asaniczka/top-spotify-songs-in-73-countries-daily-updated
2025-04-12 12:46:08,296 - root - INFO - CSV 'universal_top_spotify_songs.csv' converted to single Parquet file and saved as '../../data\landing_zone\top-spotify-songs-by-country.parquet'.
2025-04-12 12:46:08,296 - root - INFO - CSV 'universal_top_spotify_songs.csv' converted to single Parquet file and saved as '../../data\landing_zone\top-spotify-songs-by-country.parquet'.
2025-04-12 12:46:08,354 - root - INFO - Dataset 'top-spotify-songs-by-country' downloaded successfully.
2025-04-12 12:46:08,354 - root - INFO - Dataset 'top-spotify-songs-by-country' downloaded successfully.
2025-04-12 12:46:08,404 - root - INFO - Dataset 'top-spotify-songs-by-country' 

### Formatted Zone

#### Function definition

In [18]:
def process_spotify_tracks(spark, input_path, output_path):
    """
    Process the Spotify tracks dataset.
    
    Args:
        spark (SparkSession): The Spark session.
        input_path (str): The input file path.
        output_path (str): The output directory path.
    """
    logger.info(f"Processing Spotify tracks dataset from {input_path}")
    
    try:
        # Read the parquet file
        df = spark.read.parquet(input_path)
        
        # Print schema and count before processing
        logger.info("Original schema:")
        df.printSchema()
        count_before = df.count()
        logger.info(f"Count before processing: {count_before}")
        
        # Clean and transform the data
        processed_df = df.select(
            col("track_id").alias("track_id"),
            col("track_name").alias("track_name"),
            col("artists").alias("artist_name"),
            col("album_name"),
            col("popularity").cast("integer").alias("popularity"),
            col("duration_ms").cast("long").alias("duration_ms"),
            col("explicit").cast("boolean").alias("explicit"),
            col("danceability").cast("double").alias("danceability"),
            col("energy").cast("double").alias("energy"),
            col("key").cast("integer").alias("key"),
            col("loudness").cast("double").alias("loudness"),
            col("mode").cast("integer").alias("mode"),
            col("speechiness").cast("double").alias("speechiness"),
            col("acousticness").cast("double").alias("acousticness"),
            col("instrumentalness").cast("double").alias("instrumentalness"),
            col("liveness").cast("double").alias("liveness"),
            col("valence").cast("double").alias("valence"),
            col("tempo").cast("double").alias("tempo")
        )
        
        # Remove rows with null track_id or track_name
        # TRUSTED ZONE
        # processed_df = processed_df.filter(
        #     col("track_id").isNotNull() & 
        #     col("track_name").isNotNull()
        # )
        
        # Print schema and count after processing
        logger.info("Processed schema:")
        processed_df.printSchema()
        count_after = processed_df.count()
        logger.info(f"Count after processing: {count_after}")
        logger.info(f"Removed {count_before - count_after} rows during processing")
        
        # Write the processed data as Parquet
        processed_df.write.mode("overwrite").parquet(output_path)
        logger.info(f"Processed Spotify tracks data saved to {output_path}")
        
        return output_path
        
    except Exception as e:
        logger.error(f"Error processing Spotify tracks dataset: {e}")
        raise

def process_top_songs(spark, input_path, output_path):
    """
    Procesa el dataset de Spotify con el siguiente schema:
    
    root
     |-- spotify_id: string (nullable = true)
     |-- name: string (nullable = true)
     |-- artists: string (nullable = true)
     |-- daily_rank: string (nullable = true)
     |-- daily_movement: string (nullable = true)
     |-- weekly_movement: string (nullable = true)
     |-- country: string (nullable = true)
     |-- snapshot_date: string (nullable = true)
     |-- popularity: string (nullable = true)
     |-- is_explicit: string (nullable = true)
     |-- duration_ms: string (nullable = true)
     |-- album_name: string (nullable = true)
     |-- album_release_date: string (nullable = true)
     |-- danceability: string (nullable = true)
     |-- energy: string (nullable = true)
     |-- key: string (nullable = true)
     |-- loudness: string (nullable = true)
     |-- mode: string (nullable = true)
     |-- speechiness: string (nullable = true)
     |-- acousticness: double (nullable = true)
     |-- instrumentalness: double (nullable = true)
     |-- liveness: double (nullable = true)
     |-- valence: double (nullable = true)
     |-- tempo: double (nullable = true)
     |-- time_signature: double (nullable = true)
    
    Args:
        spark (SparkSession): La sesión de Spark.
        input_path (str): Ruta del archivo CSV de entrada.
        output_path (str): Ruta del directorio de salida en HDFS.
    """
    logger.info(f"Procesando datos de Spotify desde {input_path}")
    
    try:
        # Leer el archivo parquet
        df = spark.read.parquet(input_path)
        
        # Imprimir schema y contar filas antes del procesamiento
        logger.info("Schema original:")
        df.printSchema()
        count_before = df.count()
        logger.info(f"Filas antes del procesamiento: {count_before}")
        
        # Seleccionar y transformar las columnas según el nuevo schema
        processed_df = df.select(
            col("spotify_id").alias("spotify_id"),
            col("name").alias("track_name"),        
            col("artists").alias("artist_name"),
            col("daily_rank").cast("integer").alias("daily_rank"),
            col("daily_movement").alias("daily_movement"),
            col("weekly_movement").alias("weekly_movement"),
            col("country").alias("country"),
            col("snapshot_date").alias("snapshot_date"),
            col("popularity").cast("integer").alias("popularity"),
            col("is_explicit").alias("is_explicit"),
            col("duration_ms").cast("long").alias("duration_ms"),
            col("album_name").alias("album_name"),
            col("album_release_date").alias("album_release_date"),
            col("danceability").alias("danceability"),
            col("energy").alias("energy"),
            col("key").alias("key"),
            col("loudness").alias("loudness"),
            col("mode").alias("mode"),
            col("speechiness").alias("speechiness"),
            col("acousticness").cast("double").alias("acousticness"),
            col("instrumentalness").cast("double").alias("instrumentalness"),
            col("liveness").cast("double").alias("liveness"),
            col("valence").cast("double").alias("valence"),
            col("tempo").cast("double").alias("tempo"),
            col("time_signature").cast("double").alias("time_signature")
        )
        
        # Manejo de valores nulos para algunas columnas numéricas
        # TRUSTED ZONE
        # processed_df = processed_df.na.fill({
        #     "daily_rank": 0,
        #     "popularity": 0,
        #     "duration_ms": 0,
        #     "acousticness": 0.0,
        #     "instrumentalness": 0.0,
        #     "liveness": 0.0,
        #     "valence": 0.0,
        #     "tempo": 0.0,
        #     "time_signature": 0.0
        # })
        
        # Convertir snapshot_date y album_release_date a formato fecha (ajustar el patrón si es necesario)
        processed_df = processed_df.withColumn("snapshot_date", to_date(col("snapshot_date"), "yyyy-MM-dd"))
        processed_df = processed_df.withColumn("album_release_date", to_date(col("album_release_date"), "yyyy-MM-dd"))
        
        # Extraer año, mes y día a partir de snapshot_date para posibles análisis adicionales
        processed_df = processed_df.withColumn("snapshot_year", year(col("snapshot_date"))) \
                                   .withColumn("snapshot_month", month(col("snapshot_date"))) \
                                   .withColumn("snapshot_day", dayofmonth(col("snapshot_date")))
        
        # Limpiar el campo country: eliminar espacios extras y convertir a mayúsculas
        processed_df = processed_df.withColumn(
            "country", 
            upper(trim(regexp_replace(col("country"), "\\s+", " ")))
        )
        
        # Filtrar filas donde el nombre de la canción (track_name) no sea nulo
        # TRUSTED ZONE
        # processed_df = processed_df.filter(col("track_name").isNotNull())
        
        # Imprimir el schema y contar las filas después del procesamiento
        logger.info("Schema procesado:")
        processed_df.printSchema()
        count_after = processed_df.count()
        logger.info(f"Filas después del procesamiento: {count_after}")
        logger.info(f"Se removieron {count_before - count_after} filas durante el procesamiento")
        
        # Escribir el DataFrame procesado en formato Parquet en HDFS
        processed_df.write.mode("overwrite").parquet(output_path)
        logger.info(f"Datos procesados de Spotify guardados en {output_path}")
        
        return output_path
        
    except Exception as e:
        logger.error(f"Error al procesar los datos de Spotify: {e}")
        raise

def process_song_lyrics(spark, input_path, output_path):
    """
    Process the song lyrics dataset.
    
    Args:
        spark (SparkSession): The Spark session.
        input_path (str): The input file path.
        output_path (str): The output directory path.
    """
    logger.info(f"Processing song lyrics from {input_path}")
    
    try:
        # Read the CSV file with proper encoding
        df = spark.read.parquet(input_path)
        
        # Print schema and count before processing
        logger.info("Original schema:")
        df.printSchema()
        count_before = df.count()
        logger.info(f"Count before processing: {count_before}")
        
        # Clean and transform the data based on the actual columns in songs-lyrics.csv
        processed_df = df.select(
            col("Unnamed: 0").cast("integer").alias("song_id"),
            col("artist").alias("artist_name"),
            col("song_name").alias("song_name"),
            col("lyrics").alias("song_lyrics")
        )
        
        # Clean artist and track names
        processed_df = processed_df.withColumn("artist_name", trim(col("artist_name"))) \
                                 .withColumn("song_name", trim(col("song_name")))
        
        # Filter rows with valid song_id and track_name
        # TRUSTED ZONE
        # processed_df = processed_df.filter(col("song_id").isNotNull() & col("track_name").isNotNull())
        
        # Print schema and count after processing
        logger.info("Processed schema:")
        processed_df.printSchema()
        count_after = processed_df.count()
        logger.info(f"Count after processing: {count_after}")
        logger.info(f"Removed {count_before - count_after} rows during processing")
        
        # Write the processed data as Parquet
        processed_df.write.mode("overwrite").parquet(output_path)
        logger.info(f"Processed song lyrics data saved to {output_path}")
        
        return output_path
        
    except Exception as e:
        logger.error(f"Error processing song lyrics dataset: {e}")
        raise

#### Execution

In [19]:
# Define input and output paths
input_paths = {
    'spotify_tracks': '../../data/landing_zone/spotify-tracks-dataset.parquet',
    'top_songs': '../../data/landing_zone/top-spotify-songs-by-country.parquet',
    'song_lyrics': '../../data/landing_zone/songs-lyrics.parquet'
}
output_paths = {
    'spotify_tracks': '../../data/formatted_zone/spotify-tracks-dataset',
    'top_songs': '../../data/formatted_zone/top-spotify-songs-by-country',
    'song_lyrics': '../../data/formatted_zone/songs-lyrics'
}

# Process datasets
process_spotify_tracks(spark, input_paths['spotify_tracks'], output_paths['spotify_tracks'])
process_top_songs(spark, input_paths['top_songs'], output_paths['top_songs'])
process_song_lyrics(spark, input_paths['song_lyrics'], output_paths['song_lyrics'])

2025-04-12 12:54:32,291 - __main__ - INFO - Processing Spotify tracks dataset from ../../data/landing_zone/spotify-tracks-dataset.parquet
2025-04-12 12:54:32,388 - __main__ - INFO - Original schema:
2025-04-12 12:54:32,388 - __main__ - INFO - Original schema:
2025-04-12 12:54:32,486 - __main__ - INFO - Count before processing: 114000
2025-04-12 12:54:32,486 - __main__ - INFO - Count before processing: 114000
2025-04-12 12:54:32,521 - __main__ - INFO - Processed schema:
2025-04-12 12:54:32,521 - __main__ - INFO - Processed schema:


root
 |-- Unnamed: 0: long (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: long (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: long (nullable = true)
 |-- track_genre: string (nullable = true)

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- album_n

2025-04-12 12:54:32,614 - __main__ - INFO - Count after processing: 114000
2025-04-12 12:54:32,616 - __main__ - INFO - Removed 0 rows during processing
2025-04-12 12:54:32,616 - __main__ - INFO - Removed 0 rows during processing
2025-04-12 12:54:33,555 - __main__ - INFO - Processed Spotify tracks data saved to ../../data/formatted_zone/spotify-tracks-dataset
2025-04-12 12:54:33,556 - __main__ - INFO - Procesando datos de Spotify desde ../../data/landing_zone/top-spotify-songs-by-country.parquet
2025-04-12 12:54:33,555 - __main__ - INFO - Processed Spotify tracks data saved to ../../data/formatted_zone/spotify-tracks-dataset
2025-04-12 12:54:33,556 - __main__ - INFO - Procesando datos de Spotify desde ../../data/landing_zone/top-spotify-songs-by-country.parquet
2025-04-12 12:54:33,625 - __main__ - INFO - Schema original:
2025-04-12 12:54:33,625 - __main__ - INFO - Schema original:
2025-04-12 12:54:33,757 - __main__ - INFO - Filas antes del procesamiento: 1919457
2025-04-12 12:54:33,757 

root
 |-- spotify_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- daily_rank: long (nullable = true)
 |-- daily_movement: long (nullable = true)
 |-- weekly_movement: long (nullable = true)
 |-- country: string (nullable = true)
 |-- snapshot_date: string (nullable = true)
 |-- popularity: long (nullable = true)
 |-- is_explicit: boolean (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- tim

2025-04-12 12:54:33,877 - __main__ - INFO - Schema procesado:
2025-04-12 12:54:34,025 - __main__ - INFO - Filas después del procesamiento: 1919457
2025-04-12 12:54:34,026 - __main__ - INFO - Se removieron 0 filas durante el procesamiento
2025-04-12 12:54:34,025 - __main__ - INFO - Filas después del procesamiento: 1919457
2025-04-12 12:54:34,026 - __main__ - INFO - Se removieron 0 filas durante el procesamiento


root
 |-- spotify_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- daily_rank: integer (nullable = true)
 |-- daily_movement: long (nullable = true)
 |-- weekly_movement: long (nullable = true)
 |-- country: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- is_explicit: boolean (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: date (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = tr

2025-04-12 12:54:42,016 - __main__ - INFO - Datos procesados de Spotify guardados en ../../data/formatted_zone/top-spotify-songs-by-country
2025-04-12 12:54:42,017 - __main__ - INFO - Processing song lyrics from ../../data/landing_zone/songs-lyrics.parquet
2025-04-12 12:54:42,017 - __main__ - INFO - Processing song lyrics from ../../data/landing_zone/songs-lyrics.parquet
2025-04-12 12:54:42,098 - __main__ - INFO - Original schema:
2025-04-12 12:54:42,098 - __main__ - INFO - Original schema:
2025-04-12 12:54:42,196 - __main__ - INFO - Count before processing: 25742
2025-04-12 12:54:42,213 - __main__ - INFO - Processed schema:
2025-04-12 12:54:42,196 - __main__ - INFO - Count before processing: 25742
2025-04-12 12:54:42,213 - __main__ - INFO - Processed schema:


root
 |-- Unnamed: 0: long (nullable = true)
 |-- link: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- lyrics: string (nullable = true)

root
 |-- song_id: integer (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- song_lyrics: string (nullable = true)



2025-04-12 12:54:42,305 - __main__ - INFO - Count after processing: 25742
2025-04-12 12:54:42,306 - __main__ - INFO - Removed 0 rows during processing
2025-04-12 12:54:42,306 - __main__ - INFO - Removed 0 rows during processing
2025-04-12 12:54:42,749 - __main__ - INFO - Processed song lyrics data saved to ../../data/formatted_zone/songs-lyrics
2025-04-12 12:54:42,749 - __main__ - INFO - Processed song lyrics data saved to ../../data/formatted_zone/songs-lyrics


'../../data/formatted_zone/songs-lyrics'

### The Trusted Zone

### The Exploitation Zone

## The Data Analysis Pipelines