<a href="https://colab.research.google.com/github/avalosmarco/portafolioMarco/blob/main/modulo7_actividad3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###1. **Configuración del entorno**:

*   Crear un entorno de Apache Spark local o en la nube.
*   Documentar los pasos seguidos para la configuración.

In [None]:
# Instalando SPARK
try:
    import pyspark
    print("✅ Spark ya está disponible en el sistema")
except ImportError:
    print("⚠️  Instalando Spark...")
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !pip install -q pyspark==3.5.0 findspark
    print("✅ Spark instalado")

# Configurar e inicializar
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print("✅ Spark listo")

✅ Spark ya está disponible en el sistema
✅ Spark listo


In [None]:
# Celda de verificación
print("1. ¿Está findspark instalado y disponible?")
try:
    import findspark
    print("   ✅ Sí, findspark está instalado.")
except ImportError:
    print("   ❌ No, findspark no está instalado.")

print("\n2. ¿Cuál es la ruta de SPARK_HOME?")
import os
spark_home = os.environ.get("SPARK_HOME", "No está configurada")
print(f"   SPARK_HOME = {spark_home}")

print("\n3. Información de la sesión de Spark activa:")
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(f"   Versión de Spark: {spark.version}")
print(f"   URL de la interfaz web: {spark.sparkContext.uiWebUrl}")
print(f"   Modo de ejecución: {spark.sparkContext.master}")

print("\n4. ¡Prueba final! Ejecutando un cálculo simple con Spark:")
data = [1, 2, 3, 4, 5]
distData = spark.sparkContext.parallelize(data)
suma = distData.sum()
print(f"   La suma de {data} es: {suma}")

1. ¿Está findspark instalado y disponible?
   ✅ Sí, findspark está instalado.

2. ¿Cuál es la ruta de SPARK_HOME?
   SPARK_HOME = /content/spark-3.5.0-bin-hadoop3

3. Información de la sesión de Spark activa:
   Versión de Spark: 3.5.0
   URL de la interfaz web: http://26b5c38cb517:4040
   Modo de ejecución: local[*]

4. ¡Prueba final! Ejecutando un cálculo simple con Spark:
   La suma de [1, 2, 3, 4, 5] es: 15


###2. **Obtención de datos**:


*   Elegir una API pública (por ejemplo, OpenWeather, CoinGecko, etc.) para obtener datos en formato JSON o CSV
*   Extraer y cargar los datos en Apache Spark.

In [None]:
# Importar librerías necesarias
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# URL de la API de CoinGecko para obtener los mercados de cripto
url = "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=10&page=1&sparkline=false"

# Hacer la solicitud a la API
response = requests.get(url)
data = response.json() # Decodificar la respuesta JSON

# Convertir los datos JSON a un DataFrame de Pandas para facilitar su manejo
pandas_df = pd.DataFrame(data)

# Ahora, mostramos las primeras filas del DataFrame de Pandas para verificar
print("Datos obtenidos de la API (Pandas DataFrame):")
print(pandas_df.head())

Datos obtenidos de la API (Pandas DataFrame):
            id symbol      name  \
0      bitcoin    btc   Bitcoin   
1     ethereum    eth  Ethereum   
2       ripple    xrp       XRP   
3       tether   usdt    Tether   
4  binancecoin    bnb       BNB   

                                               image  current_price  \
0  https://coin-images.coingecko.com/coins/images...      111335.00   
1  https://coin-images.coingecko.com/coins/images...        4510.85   
2  https://coin-images.coingecko.com/coins/images...           2.98   
3  https://coin-images.coingecko.com/coins/images...           1.00   
4  https://coin-images.coingecko.com/coins/images...         856.33   

      market_cap  market_cap_rank  fully_diluted_valuation  total_volume  \
0  2216112820801                1            2216112820801   38032879153   
1   544090381581                2             544090381581   36003577227   
2   177301473452                3             298032251412    6165646145   
3   16722385

In [None]:
# ----------------------------------------------------
#---------Inicialización de la Sesión de Spark--------
# ----------------------------------------------------
# Crear una SparkSession, que es el punto de entrada para todas las funcionalidades de Spark.
# 'appName' define el nombre de nuestra aplicación que aparecerá en la UI del cluster.
spark = SparkSession.builder \
    .appName("AnalisisCriptomonedas") \
    .getOrCreate()

# Configurar el nivel de log para que solo se muestren errores, evitando advertencias innecesarias.
spark.sparkContext.setLogLevel("ERROR")

# ----------------------------------------------------
# -------Carga de Datos en un DataFrame de Spark------
# ----------------------------------------------------
# Crear un DataFrame de Spark a partir del DataFrame de Pandas que obtuvimos de la API.
# Esto es útil para datos que ya están en memoria en nuestro driver program.
df = spark.createDataFrame(pandas_df)

# Imprimir el esquema del DataFrame. Es crucial para entender la estructura de los datos:
# los nombres de las columnas y sus tipos (StringType, DoubleType, etc.).
print("Esquema del DataFrame de Spark:")
df.printSchema()

# Mostrar las primeras 5 filas del DataFrame para una inspección visual rápida.
# 'truncate=False' asegura que se muestren todos los caracteres de cada celda.
print("Muestra de los datos crudos (primeras 5 filas):")
df.show(5, truncate=False)

Esquema del DataFrame de Spark:
root
 |-- id: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- image: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- market_cap: long (nullable = true)
 |-- market_cap_rank: long (nullable = true)
 |-- fully_diluted_valuation: long (nullable = true)
 |-- total_volume: long (nullable = true)
 |-- high_24h: double (nullable = true)
 |-- low_24h: double (nullable = true)
 |-- price_change_24h: double (nullable = true)
 |-- price_change_percentage_24h: double (nullable = true)
 |-- market_cap_change_24h: double (nullable = true)
 |-- market_cap_change_percentage_24h: double (nullable = true)
 |-- circulating_supply: double (nullable = true)
 |-- total_supply: double (nullable = true)
 |-- max_supply: double (nullable = true)
 |-- ath: double (nullable = true)
 |-- ath_change_percentage: double (nullable = true)
 |-- ath_date: string (nullable = true)
 |-- atl: double (nullable

###3. **Procesamiento de datos en Spark**:

*   Aplicar al menos tres transformaciones sobre los datos (ejemplo: filtrado, agrupación, mapeo,
conversión de tipos, etc.).
*   Aplicar al menos dos acciones para obtener resultados procesados (ejemplo: mostrar datos, contar
registros, guardar resultados en un archivo, etc.).

In [None]:
# ---------------------------------------------
# --------------Transformaciones---------------
# ---------------------------------------------
print("\n--- Aplicando Transformaciones ---")

# Transformación 1: Selección y Renombrado de Columnas.
# Propósito: Crear un nuevo DataFrame con solo las columnas relevantes para nuestro análisis,
# utilizando nombres más descriptivos y fáciles de usar.
df_selected = df.select(
    col("id").alias("Cripto_ID"),
    col("symbol").alias("Simbolo"),
    col("name").alias("Nombre"),
    col("current_price").alias("Precio_Actual_USD"),
    col("price_change_percentage_24h").alias("Cambio_24h_%"),
    col("market_cap").alias("Capitalizacion_Mercado_USD")
)
print("Transformación 1 aplicada: Selección de columnas.")
df_selected.show(5)

# Transformación 2: Filtrado.
# Propósito: Aplicar un filtro para encontrar sólo las criptomonedas que han tenido
# una ganancia en las últimas 24 horas (cambio porcentual > 0).
df_ganancias = df_selected.filter(col("Cambio_24h_%") > 0)
print("Transformación 2 aplicada: Filtrado por ganancias.")
df_ganancias.show()

# Transformación 3: Conversión de Tipo de Datos y Cálculo.
# Propósito: La capitalización de mercado viene como un número muy largo (double).
# La convertimos a billones (trillions en escala corta) de USD para que sea más legible.
# Creamos una nueva columna 'Market_Cap_Trillions' con este valor calculado.
df_final = df_ganancias.withColumn(
    "Capitalizacion_Miles_Millones",
    round(col("Capitalizacion_Mercado_USD") / 1e9, 2)  # Convertir y redondear a 2 decimales
).withColumn(
    "Cap_Mercado_Formateada",
    format_number(col("Capitalizacion_Miles_Millones"), 0)  # Formatear con separadores de miles
)
print("Transformación 3 aplicada: Conversión a miles de millones con formateo.")
df_final.select(
    "Nombre",
    "Precio_Actual_USD",
    "Cambio_24h_%",
    "Capitalizacion_Miles_Millones",
    "Cap_Mercado_Formateada"
).show(truncate=False)

# -----------------------------------------
# ----------------- Acciones --------------
# -----------------------------------------
print("\n--- Aplicando Acciones ---")

# Acción 1: count() - Desencadena un job de Spark para calcular y traer el número total de filas
# en el DataFrame resultante (las criptomonedas en ganancia) al driver program.
conteo = df_final.count()
print(f"Acción 1 (count): Número de criptomonedas en ganancia: {conteo}")

# Acción 2: show() - Desencadena un job para recolectar los datos y mostrarlos.
# Ya hemos usado show() arriba, pero es la acción principal para ver resultados.
print("Acción 2 (show): Mostrando el resultado final ordenado por ganancia:")
df_final.orderBy(col("Cambio_24h_%").desc()).show()

# Acción 3: Escribir resultados - Una acción muy común es guardar el resultado
# en un archivo (por ejemplo, para ser consumido por otro sistema).
# Esto escribe el DataFrame como un único archivo CSV en la carpeta 'resultados_cripto'.
print("Acción 3 (write): Guardando resultados en '/resultados_cripto'...")
df_final.write.mode("overwrite").format("csv").option("header", "true").save("resultados_cripto")
print("¡Escritura completada!")

# ----------------------------------------------
# ------------------ Finalización --------------
# ----------------------------------------------
# Es una buena práctica detener la SparkSession para liberar todos los recursos asociados.
spark.stop()
print("Sesión de Spark finalizada.")


--- Aplicando Transformaciones ---
Transformación 1 aplicada: Selección de columnas.
+-----------+-------+--------+-----------------+------------+--------------------------+
|  Cripto_ID|Simbolo|  Nombre|Precio_Actual_USD|Cambio_24h_%|Capitalizacion_Mercado_USD|
+-----------+-------+--------+-----------------+------------+--------------------------+
|    bitcoin|    btc| Bitcoin|         111335.0|    -0.41261|             2216112820801|
|   ethereum|    eth|Ethereum|          4510.85|    -1.73092|              544090381581|
|     ripple|    xrp|     XRP|             2.98|    -1.26705|              177301473452|
|     tether|   usdt|  Tether|              1.0|    -0.00551|              167223859955|
|binancecoin|    bnb|     BNB|           856.33|    -0.87325|              119267296969|
+-----------+-------+--------+-----------------+------------+--------------------------+
only showing top 5 rows

Transformación 2 aplicada: Filtrado por ganancias.
+---------+-------+------+-----------

###4. **Explicación del código**:

*   Incluir comentarios en el código para describir el propósito de cada bloque de instrucciones.
*   Adjuntar un breve informe explicando el flujo del procesamiento de datos en Spark.

## Explicación del Flujo de Procesamiento

**Inicialización:** Todo comienza con la SparkSession, el coordinador de nuestro trabajo en Spark.

**Carga de Datos:** Los datos JSON obtenidos de la API se convierten en un DataFrame de Spark, la estructura de datos fundamental organizada en columnas.

**Transformaciones:** Definimos una serie de operaciones:
- `select()`: Especificamos qué columnas nos interesan y les damos nombres claros.
- `filter()`: Eliminamos filas que no cumplen con nuestro criterio (criptos con pérdidas).
- `withColumn()`: Creamos una nueva columna derivada de una existente (cálculo en miles de millones).

Estas transformaciones son **perezosas (lazy)**. Spark construye un **plan lógico (Directed Acyclic Graph - DAG)** pero no ejecuta ningún cálculo real hasta que se llama a una acción.

**Acciones:** Las acciones como `count()`, `show()`, y `write()` son las que **desencadenan la ejecución real** del plan lógico. Spark divide el trabajo en **tareas** que son distribuidas y ejecutadas en paralelo en el cluster (o en tu máquina local).

**Resultado:** El producto final es un conjunto de datos transformado que mostramos en pantalla y guardamos en disco, obteniendo insights de los datos originales.

###5. **Análisis de opciones en la nube**:

*   Investigar y seleccionar un proveedor de cloud computing (Amazon Web Services, Microsoft Azure,
Google Cloud, etc.) que permita ejecutar Apache Spark.
*   Analizar y comparar las características del servicio, incluyendo costos, ventajas y limitaciones.
*   Presentar la información en un cuadro comparativo o en un documento estructurado.

**Proveedor Seleccionado:** Amazon Web Services (AWS)

*Servicio Principal para Spark: Amazon EMR (Elastic MapReduce)*

EMR es un servicio gestionado de cluster que simplifica enormemente la ejecución de frameworks de procesamiento de datos distribuidos como Apache Spark, Hadoop, Hive, y Presto.

In [None]:
# @title Análisis Comparativo de Amazon EMR
from IPython.display import HTML

html_table = """
<table border="1" style="border-collapse: collapse; width: 100%;">
<thead>
<tr style="background-color: #f2f2f2;">
<th style="padding: 8px; text-align: left;">Característica</th>
<th style="padding: 8px; text-align: left;">Descripción en AWS EMR</th>
<th style="padding: 8px; text-align: left;">Ventajas</th>
<th style="padding: 8px; text-align: left;">Limitaciones</th>
</tr>
</thead>
<tbody>
<tr>
<td style="padding: 8px;"><strong>Configuración</strong></td>
<td style="padding: 8px;">Totalmente gestionado. Puedes crear un cluster Spark en minutos desde la consola, CLI o SDK.</td>
<td style="padding: 8px;"><strong>Ahorro de tiempo:</strong> Sin necesidad de instalar, configurar o mantener software manualmente. Escalado automático.</td>
<td style="padding: 8px;">Menor control detallado sobre la configuración del cluster en comparación con una configuración auto-gestionada en EC2.</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Costo</strong></td>
<td style="padding: 8px;">Modelo de pago por uso. Se paga por las instancias EC2 y el almacenamiento EBS utilizadas. El costo del servicio EMR es adicional por hora.</td>
<td style="padding: 8px;"><strong>Coste-efectivo:</strong> Puedes usar <strong>Spot Instances</strong> para cargas de trabajo tolerantes a fallos con descuentos de hasta 90%. Sólo pagas mientras el cluster esté corriendo.</td>
<td style="padding: 8px;">Puede volverse costoso si los clusters se dejan corriendo de forma inadvertida ("cluster sprawl"). Los precios de las instancias EC2 varían por región.</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Rendimiento</strong></td>
<td style="padding: 8px;">Optimizado para cargas de datos grandes. Se integra profundamente con otros servicios AWS como S3 (almacenamiento), Glacier, y Redshift (data warehouse).</td>
<td style="padding: 8px;"><strong>Alto rendimiento:</strong> Instancias optimizadas para computación y memoria. Lectura/escritura directa y muy rápida desde/hacia Amazon S3.</td>
<td style="padding: 8px;">La latencia de red puede ser un factor si tus datos de origen no están en AWS.</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Escalabilidad</strong></td>
<td style="padding: 8px;">Escalado vertical (aumentar el tamaño de la instancia) y horizontal (añadir más nodos al cluster) de forma automática o manual.</td>
<td style="padding: 8px;"><strong>Escalado elástico:</strong> Puedes empezar con un solo nodo y escalar a cientos de instancias según la carga.</td>
<td style="padding: 8px;">El escalado automático requiere configuración y monitoreo para optimizar costos/rendimiento.</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Confiabilidad</strong></td>
<td style="padding: 8px;">Alta disponibilidad integrada. Puede configurar clusters multi-AZ (Availability Zones). Reintentos automáticos de tareas fallidas.</td>
<td style="padding: 8px;"><strong>Gestionado:</strong> AWS se encarga de la salud de los nodos, reemplazándolos automáticamente si fallan.</td>
<td style="padding: 8px;">La terminación de Spot Instances puede interrumpir jobs largos si no se maneja correctamente.</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Seguridad</strong></td>
<td style="padding: 8px;">Integración con IAM (Identity and Access Management) para control de acceso. Encriptación de datos en tránsito y en reposo. VPC networking.</td>
<td style="padding: 8px;"><strong>Seguridad empresarial:</strong> Cumple con varios estándares de compliance (PCI DSS, HIPAA, etc.).</td>
<td style="padding: 8px;">La configuración de seguridad (IAM, VPC) añade complejidad inicial.</td>
</tr>
</tbody>
</table>
"""

display(HTML(html_table))

Característica,Descripción en AWS EMR,Ventajas,Limitaciones
Configuración,"Totalmente gestionado. Puedes crear un cluster Spark en minutos desde la consola, CLI o SDK.","Ahorro de tiempo: Sin necesidad de instalar, configurar o mantener software manualmente. Escalado automático.",Menor control detallado sobre la configuración del cluster en comparación con una configuración auto-gestionada en EC2.
Costo,Modelo de pago por uso. Se paga por las instancias EC2 y el almacenamiento EBS utilizadas. El costo del servicio EMR es adicional por hora.,Coste-efectivo: Puedes usar Spot Instances para cargas de trabajo tolerantes a fallos con descuentos de hasta 90%. Sólo pagas mientras el cluster esté corriendo.,"Puede volverse costoso si los clusters se dejan corriendo de forma inadvertida (""cluster sprawl""). Los precios de las instancias EC2 varían por región."
Rendimiento,"Optimizado para cargas de datos grandes. Se integra profundamente con otros servicios AWS como S3 (almacenamiento), Glacier, y Redshift (data warehouse).",Alto rendimiento: Instancias optimizadas para computación y memoria. Lectura/escritura directa y muy rápida desde/hacia Amazon S3.,La latencia de red puede ser un factor si tus datos de origen no están en AWS.
Escalabilidad,Escalado vertical (aumentar el tamaño de la instancia) y horizontal (añadir más nodos al cluster) de forma automática o manual.,Escalado elástico: Puedes empezar con un solo nodo y escalar a cientos de instancias según la carga.,El escalado automático requiere configuración y monitoreo para optimizar costos/rendimiento.
Confiabilidad,Alta disponibilidad integrada. Puede configurar clusters multi-AZ (Availability Zones). Reintentos automáticos de tareas fallidas.,"Gestionado: AWS se encarga de la salud de los nodos, reemplazándolos automáticamente si fallan.",La terminación de Spot Instances puede interrumpir jobs largos si no se maneja correctamente.
Seguridad,Integración con IAM (Identity and Access Management) para control de acceso. Encriptación de datos en tránsito y en reposo. VPC networking.,"Seguridad empresarial: Cumple con varios estándares de compliance (PCI DSS, HIPAA, etc.).","La configuración de seguridad (IAM, VPC) añade complejidad inicial."
