# Procesamiento de datos con spark

### Importar modulos

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, round as spark_round, mean
from pyspark.sql.functions import stddev

### Crear Spark session y cargar la base de datos

In [None]:
# Crear sesión de Spark (agrega el path real del driver JDBC si es necesario)
spark = SparkSession.builder \
    .appName("SpotifyFromDB_Metrics") \
    .master("local[*]") \
    .config("spark.security.manager", "false") \
    .config("spark.hadoop.security.manager", "None") \
    .config("spark.driver.extraClassPath", "./Code/libs/sqlite-jdbc.jar") \
    .getOrCreate()

# Leer desde SQLite
jdbc_url = "jdbc:sqlite:./Code/Data/spotify_data.db"

df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "tracks") \
    .load()

### Metricas

In [None]:

# Confirmar estructura
df.printSchema()
df.select("track_name", "energy", "tempo", "loudness", "danceability", "valence").show(10)

# Filtrar columnas necesarias y eliminar nulos
features = ['energy', 'tempo', 'loudness', 'danceability', 'valence']
df = df.select(*features).na.drop()

# Clipping
df = df.withColumn("tempo", when(col("tempo") < 20, 20).when(col("tempo") > 200, 200).otherwise(col("tempo")))
df = df.withColumn("loudness", when(col("loudness") < -60, -60).when(col("loudness") > 0, 0).otherwise(col("loudness")))
df = df.withColumn("energy", when(col("energy") < 0, 0).when(col("energy") > 1, 1).otherwise(col("energy")))
df = df.withColumn("danceability", when(col("danceability") < 0, 0).when(col("danceability") > 1, 1).otherwise(col("danceability")))

# Escalado Min-Max
df = df.withColumn("tempo", (col("tempo") - 20) / (200 - 20))
df = df.withColumn("loudness", (col("loudness") + 60) / 60)

# Cálculo de 'arousal'
alpha, beta, gamma, delta = 0.5, 0.25, 0.2, 0.05

df = df.withColumn("arousal",
    spark_round(
        alpha * col("energy") +
        beta * col("tempo") +
        gamma * col("loudness") +
        delta * col("danceability"),
        3
    )
)

# Redondear columnas
for feature in ['energy', 'tempo', 'loudness', 'danceability', 'arousal', 'valence']:
    df = df.withColumn(feature, spark_round(col(feature), 3))

# Nueva estructura
df.printSchema()
df.select("arousal", "valence", "energy", "tempo", "loudness", "danceability", "valence").show(10)

# 1. **Metrics for All Columns**
print("\n📊 Metrics for All Columns:")
df.describe().show()

# 2. **Metrics for Selected Features**
print("\n📊 Metrics for Selected Features:")
df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").describe().show()

# 3. **Correlation between energy and arousal**
correlation_matrix = df.select("energy", "arousal").stat.corr("energy", "arousal")
print(f"Correlation between energy and arousal: {correlation_matrix}\n")

# 4. **Skewness of each feature**
skewness = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").agg(
    {"energy": "skewness", "tempo": "skewness", "loudness": "skewness", "danceability": "skewness", "arousal": "skewness", "valence": "skewness"}
)
print("\nSkewness of each feature:")
skewness.show()

# 5. **Quantiles of each feature**
quantiles = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").approxQuantile(
    "energy", [0.25, 0.5, 0.75], 0.05
)
print(f"Energy quantiles: {quantiles}\n")

# 6. **Standard Deviation of each feature**
stddev = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").agg(
    stddev("energy").alias("stddev_energy"),
    stddev("tempo").alias("stddev_tempo"),
    stddev("loudness").alias("stddev_loudness"),
    stddev("danceability").alias("stddev_danceability"),
    stddev("arousal").alias("stddev_arousal"),
    stddev("valence").alias("stddev_valence")
)
print("\nStandard Deviation of each feature:")
stddev.show()

# 7. **Kurtosis of each feature**
kurtosis = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").agg(
    {"energy": "kurtosis", "tempo": "kurtosis", "loudness": "kurtosis", "danceability": "kurtosis", "arousal": "kurtosis", "valence": "kurtosis"}
)
print("\nKurtosis of each feature:")
kurtosis.show()

# 8. **Variance of each feature**
variance = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").agg(
    {"energy": "variance", "tempo": "variance", "loudness": "variance", "danceability": "variance", "arousal": "variance", "valence": "variance"}
)
print("\nVariance of each feature:")
variance.show()

# 9. **Min and Max values of each feature**
min_max_values = df.select("energy", "tempo", "loudness", "danceability", "arousal", "valence").agg(
    {"energy": "min", "energy": "max", "tempo": "min", "tempo": "max", "loudness": "min", "loudness": "max", 
     "danceability": "min", "danceability": "max", "arousal": "min", "arousal": "max", "valence": "min", "valence": "max"}
)
print("\nMin and Max values of each feature:")
min_max_values.show()

# 10. **Final Statistics for arousal and valence**
print("\n📈 Final Statistics for Arousal and Valence:")
df.select("arousal", "valence").describe().show()

### Cerrar sesion y gaurdar en csv

In [None]:
# Guardar (opcional)
df.select("arousal", "valence") \
    .write.mode("overwrite") \
    .option("header", True) \
    .csv("./Code/Data/spark_arousal_valence_output.csv")

# Detener Spark
spark.stop()

# Simulacion de envio (REST) de datos en timepo real (Ingesta)

In [None]:
import pandas as pd
import requests
import time
import logging

logging.basicConfig(level=logging.DEBUG)

df = pd.read_csv('./Code/Source/cancionesSpotify.csv') # executed from powershell 

for index, row in df.head(20).iterrows():
    json_data = row.to_dict()
    
    print(f"Enviando registro {index + 1}...")
    response = requests.post("http://localhost:8888/ingest", json=json_data)

    try:
        print(response.json())
    except:
        print("Error al interpretar la respuesta del servidor.")
    
    time.sleep(1)

# Simulacion de recepcion (REST) de datos y creacion de la base de datos con SQLite

In [None]:

from flask import Flask, request, jsonify
import sqlite3
import os
import json
import logging

logging.basicConfig(level=logging.DEBUG)

app = Flask(__name__)

DB_PATH = './Code/Data/spotify_data.db'

def init_db():
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute('''
            CREATE TABLE IF NOT EXISTS tracks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                track_id TEXT UNIQUE,
                artists TEXT,
                album_name TEXT,
                track_name TEXT,
                popularity INTEGER,
                duration_ms INTEGER,
                explicit BOOLEAN,
                danceability REAL,
                energy REAL,
                key INTEGER,
                loudness REAL,
                mode INTEGER,
                speechiness REAL,
                acousticness REAL,
                instrumentalness REAL,
                liveness REAL,
                valence REAL,
                tempo REAL,
                time_signature INTEGER,
                track_genre TEXT
            )
        ''')
        conn.commit()

@app.route('/ingest', methods=['POST'])
def ingest_data():
    data = request.get_json()
    if not data:
        return jsonify({"status": "error", "message": "No se recibieron datos"}), 400

    with sqlite3.connect('./Code/Data/spotify_data.db') as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT 1 FROM tracks WHERE track_id = ?", (data['track_id'],))
        exists = cursor.fetchone()

        if exists:
            return jsonify({"status": "info", "message": "Canción ya existe, no se insertó"}), 200

        try:
            cursor.execute('''
                INSERT INTO tracks (
                    track_id, artists, album_name, track_name, popularity,
                    duration_ms, explicit, danceability, energy, key,
                    loudness, mode, speechiness, acousticness, instrumentalness,
                    liveness, valence, tempo, time_signature, track_genre
                ) VALUES (
                    :track_id, :artists, :album_name, :track_name, :popularity,
                    :duration_ms, :explicit, :danceability, :energy, :key,
                    :loudness, :mode, :speechiness, :acousticness, :instrumentalness,
                    :liveness, :valence, :tempo, :time_signature, :track_genre
                )
            ''', data)
            conn.commit()
        except Exception as e:
            logging.error("Error al insertar en la base de datos: %s", e)
            return jsonify({"status": "error", "message": str(e)}), 500

    return jsonify({"status": "success", "message": "Canción insertada correctamente"}), 200


if __name__ == '__main__':
    os.makedirs("data", exist_ok=True)
    init_db()
    app.run(debug=True, port=8888)