In [1]:
import os

# Función para instalar OpenJDK 8 si no está instalado previamente.
def install_java():
    # Verifica si la ruta del JDK existe.
    if not os.path.exists('/usr/lib/jvm/java-8-openjdk-amd64'):
        print("Instalando OpenJDK 8...")
        # Comando para instalar Java en modo silencioso.
        !apt-get install openjdk-8-jdk-headless -qq > /dev/null
        print("OpenJDK 8 instalado correctamente.")
    else:
        print("OpenJDK 8 ya está instalado.")



# Función para descargar Apache Spark solo si no está ya descargado.
def download_spark():
    # URL y nombre del archivo comprimido de Spark.
    spark_url = "https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz"
    spark_tar = "spark-3.4.3-bin-hadoop3.tgz"

    # Verifica si el archivo comprimido ya existe.
    if not os.path.exists(spark_tar):
        print("Descargando Apache Spark...")
        # Comando para descargar el archivo desde la URL.
        !wget -q $spark_url
        print("Descarga completa.")
    else:
        print("El archivo de Apache Spark ya está descargado.")



# Función para descomprimir el archivo de Apache Spark solo si no está descomprimido.
def extract_spark():
    # Nombre de la carpeta descomprimida.
    spark_dir = "spark-3.4.3-bin-hadoop3"
    spark_tar = "spark-3.4.3-bin-hadoop3.tgz"

    # Verifica si la carpeta descomprimida ya existe.
    if not os.path.exists(spark_dir):
        print("Descomprimiendo Apache Spark...")
        # Comando para descomprimir el archivo.
        !tar xf $spark_tar
        print("Apache Spark descomprimido.")
    else:
        print("La carpeta de Apache Spark ya existe.")



# Función para configurar las variables de entorno necesarias para Spark.
def set_environment_variables():
    # Establece la ruta de JAVA_HOME y SPARK_HOME.
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"
    print("Variables de entorno configuradas.")



# Función para verificar si un paquete de Python ya está instalado.
import subprocess
import sys

def is_package_installed(package_name):
    try:
        # Ejecuta el comando `pip show` para verificar si el paquete está instalado.
        subprocess.check_call(
            [sys.executable, "-m", "pip", "show", package_name],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL
        )
        return True
    except subprocess.CalledProcessError:
        return False



# Función para instalar librerías de Python necesarias (findspark y pyspark).
def install_python_libraries():
    # Verifica e instala findspark.
    if not is_package_installed("findspark"):
        print("Instalando findspark...")
        !pip install -q findspark
    else:
        print("findspark ya está instalado.")

    # Verifica e instala pyspark.
    if not is_package_installed("pyspark"):
        print("Instalando pyspark...")
        !pip install -q pyspark
    else:
        print("pyspark ya está instalado.")



install_java()                # Instala OpenJDK 8
download_spark()              # Descarga Apache Spark
extract_spark()               # Descomprime Apache Spark
set_environment_variables()   # Configura las variables de entorno
install_python_libraries()    # Instala las librerías de Python

OpenJDK 8 ya está instalado.
El archivo de Apache Spark ya está descargado.
La carpeta de Apache Spark ya existe.
Variables de entorno configuradas.
findspark ya está instalado.
pyspark ya está instalado.


# Persistencia en Apache Spark

La **persistencia** en Apache Spark es un mecanismo que permite almacenar un **RDD** o un **DataFrame** en memoria, disco, o una combinación de ambos, para evitar recalcularlos desde cero cada vez que se necesitan. Esto es útil para mejorar el rendimiento y optimizar los recursos en aplicaciones que reutilizan datos.

---

## **¿Por qué usar la persistencia?**

1. **Evitar cálculos innecesarios:**
   - Spark recalcula un RDD desde su origen cada vez que es necesario.
   - Persistir almacena el resultado intermedio, eliminando la necesidad de realizar los mismos cálculos varias veces.

2. **Ahorro de recursos computacionales:**
   - Reducir la carga en el clúster al evitar recalculaciones.

3. **Mejor rendimiento:**
   - Las operaciones sobre datos ya almacenados son más rápidas.

---

## **Tipos de persistencia**

1. **`MEMORY_ONLY`:**
   - Guarda los datos en la memoria RAM.
   - Si no cabe todo en memoria, las particiones sobrantes no se almacenan.
   - Es el nivel más rápido, pero puede ocasionar pérdida de datos si hay poca memoria.

2. **`MEMORY_AND_DISK`:**
   - Guarda los datos en memoria si es posible.
   - Si no hay suficiente memoria, las particiones restantes se guardan en el disco.
   - Es el nivel más común por su balance entre velocidad y estabilidad.

3. **`DISK_ONLY`:**
   - Guarda los datos exclusivamente en disco.
   - Es más lento, pero asegura que los datos siempre estén disponibles.

4. **`MEMORY_ONLY_SER`:**
   - Similar a `MEMORY_ONLY`, pero comprime los datos en memoria para ahorrar espacio.
   - Requiere más CPU para descomprimir.

5. **`OFF_HEAP`:**
   - Guarda los datos fuera de la pila de memoria JVM.
   - Es útil para integraciones avanzadas con otros sistemas y evita que la JVM consuma demasiada memoria.

---

## **Cache vs Persistencia**

- **`cache()`**:
  - Alias de `persist(StorageLevel.MEMORY_AND_DISK)`.
  - Práctico para la mayoría de los casos.

- **`persist()`**:
  - Permite elegir niveles de almacenamiento personalizados, como solo memoria, solo disco o combinaciones.

---

In [2]:
# Importar la biblioteca findspark, que ayuda a configurar PySpark correctamente en el entorno de ejecución
import findspark

# Inicializar findspark para establecer las variables necesarias de PySpark
findspark.init()

# Importar SparkSession desde la biblioteca pyspark.sql
# SparkSession es la entrada principal para trabajar con DataFrames en PySpark
from pyspark.sql import SparkSession

# Crear o obtener una SparkSession
# Esto inicializa un entorno de Spark si no existe uno, o reutiliza uno existente
spark = SparkSession.builder.getOrCreate()

# Obtener el contexto de Spark (SparkContext) desde la SparkSession
# SparkContext es la entrada principal para trabajar con RDDs en PySpark
sc = spark.sparkContext

In [3]:
# !pip install --upgrade pyspark
# !pip install --upgrade cloudpickle

# **Ejemplos Practicos**

### **Ejemplo 1: Usar `MEMORY_ONLY` para datos reutilizados en memoria**

In [4]:
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

data = sc.parallelize(range(1, 1000000))  # Crear un RDD con un rango de números

# Persistir en memoria
filtered_data = data.filter(lambda x: x % 2 == 0).persist(StorageLevel.MEMORY_ONLY)

# Primera acción (genera los datos y los guarda en memoria)
print(filtered_data.count())

# Segunda acción (reutiliza los datos desde la memoria)
print(filtered_data.take(5))

499999
[2, 4, 6, 8, 10]


- **Resultado esperado:** El filtrado se calcula una sola vez y las operaciones posteriores son más rápidas porque reutilizan los datos en memoria.

---

### **Ejemplo 2: Usar `MEMORY_AND_DISK` para manejar grandes volúmenes de datos**

In [5]:
from pyspark.storagelevel import StorageLevel

# Persistir en memoria y disco
large_data = sc.parallelize(range(1, 10000000))  # RDD con muchos datos

# Almacenarlo en memoria, y si no cabe, usar el disco
persisted_data = large_data.persist(StorageLevel.MEMORY_AND_DISK)

# Acción que dispara el almacenamiento
print(persisted_data.sum())

49999995000000


- **Resultado esperado:** Los datos que no caben en memoria se almacenan en disco, asegurando que todas las operaciones funcionen sin errores.

---

### **Ejemplo 3: Cache simple con `cache()`**

In [6]:
# Cachear un conjunto de datos
text_data = sc.textFile("/content/sample_data/california_housing_test.csv")

# Cachear para reutilización
cached_text = text_data.cache()

# Primera acción (lectura del archivo y almacenamiento en memoria y disco)
print(cached_text.count())

# Segunda acción (usa el caché y es mucho más rápida)
print(cached_text.first())

3001
"longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households","median_income","median_house_value"


- **Resultado esperado:** La primera acción guarda los datos en memoria/disco, y las acciones posteriores son más rápidas porque reutilizan el caché.

---

## **Consideraciones finales**
- **Liberar recursos:** Usa `unpersist()` para liberar memoria o disco cuando ya no necesites los datos.

In [7]:
cached_text.unpersist()

/content/sample_data/california_housing_test.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

- Persistir o cachear es esencial en aplicaciones donde se procesan datos reutilizables, mejorando tanto el rendimiento como la eficiencia de los recursos en Spark.

Si tienes preguntas o necesitas más ejemplos, ¡estaré encantado de ayudarte! 😊

