In [None]:

import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import warnings
warnings.filterwarnings('ignore')

# Inicializar Spark
spark = SparkSession.builder \
    .appName("AudioAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("üîÑ INICIANDO COMPARACI√ìN PANDAS vs PYSPARK")
print("=" * 50)

# Medir tiempo de carga de datos
print("\nüìä COMPARACI√ìN DE CARGA DE DATOS")

# Pandas
start_time = time.time()
df_pandas = pd.read_csv("resultados_voz.csv")
pandas_load_time = time.time() - start_time

# PySpark
start_time = time.time()
df_spark = spark.read.csv("resultados_voz.csv", header=True, inferSchema=True)
# Forzar la carga de datos
df_spark.count()  # Esta acci√≥n fuerza la lectura completa
spark_load_time = time.time() - start_time

print(f"Pandas - Tiempo de carga: {pandas_load_time:.4f} segundos")
print(f"PySpark - Tiempo de carga: {spark_load_time:.4f} segundos")
print(f"Ratio Pandas/PySpark: {pandas_load_time/spark_load_time:.2f}x")

ModuleNotFoundError: No module named 'pyspark'

In [None]:

print("\nüìà COMPARACI√ìN DE OPERACIONES B√ÅSICAS")

# Operaciones en Pandas
start_time = time.time()
pandas_stats = df_pandas.describe()
pandas_shape = df_pandas.shape
pandas_null_count = df_pandas.isnull().sum()
pandas_basic_ops_time = time.time() - start_time

# Operaciones en PySpark
start_time = time.time()
spark_stats = df_spark.describe()
spark_shape = (df_spark.count(), len(df_spark.columns))
spark_null_count = df_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in df_spark.columns])
# Forzar ejecuci√≥n
spark_stats.show()
spark_null_count.show()
spark_basic_ops_time = time.time() - start_time

print(f"Pandas - Tiempo operaciones b√°sicas: {pandas_basic_ops_time:.4f} segundos")
print(f"PySpark - Tiempo operaciones b√°sicas: {spark_basic_ops_time:.4f} segundos")
print(f"Ratio Pandas/PySpark: {pandas_basic_ops_time/spark_basic_ops_time:.2f}x")

In [None]:
print("\nüîß COMPARACI√ìN DE TRANSFORMACIONES COMPLEJAS")

# Crear una copia para evitar modificar el original
df_pandas_processed = df_pandas.copy()

# Pandas: An√°lisis de calidad de audio (simulado)
start_time = time.time()
# Calcular m√©tricas adicionales
df_pandas_processed['F0_normalized'] = (df_pandas_processed['F0_Hz'] - df_pandas_processed['F0_Hz'].mean()) / df_pandas_processed['F0_Hz'].std()
df_pandas_processed['Jitter_Shimmer_ratio'] = df_pandas_processed['Jitter_porcentaje'] / df_pandas_processed['Shimmer_porcentaje']
df_pandas_processed['calidad_categoria'] = np.where(df_pandas_processed['HNR_dB'] > 15, 'Buena', 
                                                   np.where(df_pandas_processed['HNR_dB'] > 10, 'Aceptable', 'Mala'))

# Agrupaciones y agregaciones
pandas_grouped = df_pandas_processed.groupby('calidad_categoria').agg({
    'F0_Hz': ['mean', 'std', 'count'],
    'HNR_dB': 'mean',
    'Jitter_porcentaje': 'mean'
})
pandas_transform_time = time.time() - start_time

# PySpark: Mismas operaciones
start_time = time.time()
# Calcular m√©tricas adicionales
df_spark_processed = df_spark.withColumn(
    'F0_normalized', 
    (col('F0_Hz') - mean('F0_Hz').over(window.Window.partitionBy())) / stddev('F0_Hz').over(window.Window.partitionBy())
).withColumn(
    'Jitter_Shimmer_ratio', 
    col('Jitter_porcentaje') / col('Shimmer_porcentaje')
).withColumn(
    'calidad_categoria',
    when(col('HNR_dB') > 15, 'Buena')
    .when(col('HNR_dB') > 10, 'Aceptable')
    .otherwise('Mala')
)

# Agrupaciones y agregaciones
from pyspark.sql import window
spark_grouped = df_spark_processed.groupBy('calidad_categoria').agg(
    mean('F0_Hz').alias('F0_mean'),
    stddev('F0_Hz').alias('F0_std'),
    count('F0_Hz').alias('count'),
    mean('HNR_dB').alias('HNR_mean'),
    mean('Jitter_porcentaje').alias('Jitter_mean')
)
# Forzar ejecuci√≥n
spark_grouped.show()
spark_transform_time = time.time() - start_time

print(f"Pandas - Tiempo transformaciones: {pandas_transform_time:.4f} segundos")
print(f"PySpark - Tiempo transformaciones: {spark_transform_time:.4f} segundos")
print(f"Ratio Pandas/PySpark: {pandas_transform_time/spark_transform_time:.2f}x")

In [None]:
print("\nüéØ COMPARACI√ìN DE FILTRADO Y ORDENAMIENTO")

# Pandas
start_time = time.time()
pandas_filtered = df_pandas[
    (df_pandas['HNR_dB'] > 12) & 
    (df_pandas['Jitter_porcentaje'] < 2.0)
].sort_values('F0_Hz', ascending=False)
pandas_filter_sort_time = time.time() - start_time

# PySpark
start_time = time.time()
spark_filtered = df_spark.filter(
    (col('HNR_dB') > 12) & 
    (col('Jitter_porcentaje') < 2.0)
).orderBy(col('F0_Hz').desc())
# Forzar ejecuci√≥n
spark_filtered.count()
spark_filter_sort_time = time.time() - start_time

print(f"Pandas - Tiempo filtrado/ordenamiento: {pandas_filter_sort_time:.4f} segundos")
print(f"PySpark - Tiempo filtrado/ordenamiento: {spark_filter_sort_time:.4f} segundos")
print(f"Ratio Pandas/PySpark: {pandas_filter_sort_time/spark_filter_sort_time:.2f}x")

In [None]:
print("\nüìä RESUMEN DE RESULTADOS COMPARATIVOS")

# Recolectar todos los tiempos
operaciones = ['Carga de Datos', 'Operaciones B√°sicas', 'Transformaciones', 'Filtrado/Ordenamiento']
tiempos_pandas = [pandas_load_time, pandas_basic_ops_time, pandas_transform_time, pandas_filter_sort_time]
tiempos_spark = [spark_load_time, spark_basic_ops_time, spark_transform_time, spark_filter_sort_time]

# Crear visualizaci√≥n comparativa
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Gr√°fico de barras comparativo
x = np.arange(len(operaciones))
width = 0.35

ax1.bar(x - width/2, tiempos_pandas, width, label='Pandas', alpha=0.8, color='blue')
ax1.bar(x + width/2, tiempos_spark, width, label='PySpark', alpha=0.8, color='red')
ax1.set_xlabel('Operaciones')
ax1.set_ylabel('Tiempo (segundos)')
ax1.set_title('Comparaci√≥n de Tiempos: Pandas vs PySpark')
ax1.set_xticks(x)
ax1.set_xticklabels(operaciones, rotation=45)
ax1.legend()
ax1.grid(True, alpha=0.3)

# Gr√°fico de ratios de velocidad
ratios = [pandas_load_time/spark_load_time, 
          pandas_basic_ops_time/spark_basic_ops_time,
          pandas_transform_time/spark_transform_time,
          pandas_filter_sort_time/spark_filter_sort_time]

colors = ['green' if ratio > 1 else 'red' for ratio in ratios]
ax2.bar(operaciones, ratios, color=colors, alpha=0.7)
ax2.axhline(y=1, color='black', linestyle='--', alpha=0.5)
ax2.set_xlabel('Operaciones')
ax2.set_ylabel('Ratio (Pandas/PySpark)')
ax2.set_title('Ratio de Velocidad: Pandas/PySpark\n>1 = Pandas m√°s r√°pido\n<1 = PySpark m√°s r√°pido')
ax2.set_xticklabels(operaciones, rotation=45)
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Mostrar resumen num√©rico
print("\n" + "="*60)
print("üìã RESUMEN NUM√âRICO DE COMPARACI√ìN")
print("="*60)

for i, op in enumerate(operaciones):
    print(f"\n{op}:")
    print(f"  Pandas: {tiempos_pandas[i]:.4f}s")
    print(f"  PySpark: {tiempos_spark[i]:.4f}s")
    print(f"  Ratio: {ratios[i]:.2f}x")

# Calcular promedios
avg_ratio = np.mean(ratios)
print(f"\nüìà RATIO PROMEDIO: {avg_ratio:.2f}x")

if avg_ratio > 1:
    print("‚úÖ CONCLUSI√ìN: Pandas es m√°s r√°pido en promedio")
else:
    print("‚úÖ CONCLUSI√ìN: PySpark es m√°s r√°pido en promedio")

# Cerrar sesi√≥n de Spark
spark.stop()

In [None]:
print("\nüíæ AN√ÅLISIS DE USO DE MEMORIA")

import psutil
import os

def get_memory_usage():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # Convertir a MB

# Medir memoria para Pandas
mem_before_pandas = get_memory_usage()
df_pandas_large = pd.concat([df_pandas] * 100, ignore_index=True)  # Simular dataset m√°s grande
pandas_mem_usage = get_memory_usage() - mem_before_pandas

# Reiniciar Spark para medici√≥n limpia
spark.stop()
spark = SparkSession.builder.appName("MemoryTest").getOrCreate()

# Medir memoria para PySpark
mem_before_spark = get_memory_usage()
df_spark_large = spark.createDataFrame(pd.concat([df_pandas] * 100, ignore_index=True))
# Forzar caching para medici√≥n real
df_spark_large.cache()
df_spark_large.count()
spark_mem_usage = get_memory_usage() - mem_before_spark

print(f"Pandas - Uso de memoria: {pandas_mem_usage:.2f} MB")
print(f"PySpark - Uso de memoria: {spark_mem_usage:.2f} MB")
print(f"Ratio de memoria Pandas/PySpark: {pandas_mem_usage/spark_mem_usage:.2f}x")

# Limpieza final
spark.stop()