<a href="https://colab.research.google.com/github/Walter7419/Adivina-Quien-Archivos/blob/main/21_Introduccion_a_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Programación Concurrente
## 21. Introducción a Spark

Spark es la herramienta, dentro del ecosistema de Hadoop, que permite trabajar con Cómputo Paralelo y Distribuido.

Para ello, un ambiente de Spark necesita contar con los siguientes elementos.


![Spark Nodes](https://www.edureka.co/blog/wp-content/uploads/2018/09/Picture6-2.png)

- El Master Node contiene el Drive: distribuye y asigna tareas a los Workers.
- Los Worker Nodes ejecutan las tareas.

(Nota: estos elementos son Procesos al fin y al cabo, entonces pueden ser ejecutados en una sola computadora, si trabajamos de manera local).

- El Cluster Manager administra los recursos, para que las tareas puedan ser completados.



### Particiones

Para poder paralelizar tareas, nuestros DataFrames de Spark se guardan con particiones: es decir, un grupo de hileras serán procesados un Worker nodo, otras por otro nodo, etc. Las particiones normalmente se crean en automático.

### Lazy Evaluation

Además, Spark no ejecuta las tareas de inmediato. En realidad, se espera a tener un grupo de tareas y se espera hasta que estas realmente tengan que ser ejecutadas (por ejemplo, al mandar a guardar un output), para empezar el procesamiento.

Esto lo hace porque se espera a tener un set de instrucciones y así poder optimizarlas.

### Iniciar un programa con Spark

Para empezar a trabajar con Spark, necesitamos forzosamente crear una SparkSession. Además, si no lo hemos hecho ya, es necesario installar e importar la librería `pyspark`.



In [9]:
!pip install pyspark==3.4.0



In [10]:
import os
java_version = os.popen('java -version').read()
print(java_version)





In [12]:
!apt-get install openjdk-11-jdk -y


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  x11-utils
Suggested packages:
  default-jre pcscd libxt-doc openjdk-11-demo openjdk-11-source visualvm
  libnss-mdns fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre
  openjdk-11-jre-headless x11-utils
0 upgraded, 15 newly installed, 0 to remove and 41 not upgraded.
Need to get 122 MB of archives.


In [16]:
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!mv spark-3.3.1-bin-hadoop3 /opt/spark

In [17]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"
os.environ["PATH"] = f"{os.environ['SPARK_HOME']}/bin:{os.environ['PATH']}"

In [18]:
!pip install pyspark




In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab PySpark") \
    .getOrCreate()

Vamos a trabajar con datos del Covid-19 en México. Revísalos y descárgalos de la siguiente página web:

https://www.kaggle.com/datasets/tavoglc/covid19-data-from-mexico?resource=download

In [23]:
df = spark.read.csv('covidmex.csv',header=True)
df.show(5)

+-----------+-----------+-------------+-------------+--------------+------+-----+------------------+-------------------+------------------+--------------------+---------+------------------+
|ID_REGISTRO|ENTIDAD_RES|MUNICIPIO_RES|FECHA_INGRESO|FECHA_SINTOMAS|covidt|delta|               lat|               long|               alt|                 qry|dayofyear|       lengthofday|
+-----------+-----------+-------------+-------------+--------------+------+-----+------------------+-------------------+------------------+--------------------+---------+------------------+
|     z526b3|          9|           12|   2020-12-21|    2020-12-18|     1|    3| 19.20155345588235| -99.20180252450989|3008.9460784313724|Cve_Ent==9 & Cve_...|      356|11.768371962176587|
|     z3d1e2|          9|            5|   2020-04-22|    2020-04-20|     1|    2|         19.482945|         -99.113471|            2229.0|Cve_Ent==9 & Cve_...|      113|13.441805501598786|
|     z21f6f|          7|            9|   2020-04-

### Análisis exploratorio usando SparkDataFrames

Manipular datos y hacer cálculos a veces es similar, pero normalmente es distinto a como lo haríamos en Pandas. De hecho, ya viste una diferencia: el uso del método `.show(n)`

Otras diferencias son:

In [24]:
# Conocer la dimension de mi DataFrame

print((df.count(), len(df.columns)))

(23375, 13)


In [25]:
# ¿Qué tipos de datos tengo?

df.printSchema()

root
 |-- ID_REGISTRO: string (nullable = true)
 |-- ENTIDAD_RES: string (nullable = true)
 |-- MUNICIPIO_RES: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- covidt: string (nullable = true)
 |-- delta: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- qry: string (nullable = true)
 |-- dayofyear: string (nullable = true)
 |-- lengthofday: string (nullable = true)



### Tarea

Aplica los cambios siguientes:
1. Modifica tu DataFrame: agrega una columna nueva llamada "PAIS", en donde todas las hileras tendrán el valor "Mexico".

2. Elimina la columna *lengthofday* de tu DataFrame.

3. Crea un nuevo DataFrame, llamado `ubicacion`, que tendrá solamente los valores de *lat*, *lon*, *ENTIDAD_RES* y *PAIS* de `df`.

4. Filtra la columna *ENTIDAD_RES* dentro de `ubicacion` para quedarte con solo los valores = 9 (Ciudad de México). Guarda el resultado en un DataFrame llamado `cdmx`

5. Selecciona los primeros 5 valores de `cdmx` y guárdalos como un DataFrame de Pandas, con el nombre *df_pandas*.

####Tip
Métodos y Funciones que podrían servirte:

- .select()
- .drop()
- .withcolumn()
- lit()
- .filter()
- .limit()
- toPandas()



In [28]:
from pyspark.sql import functions as F


# 1. Agregar la columna "PAIS" con el valor "Mexico"
df = df.withColumn("PAIS", F.lit("Mexico"))

# 2. Eliminar la columna "lengthofday"
df = df.drop("lengthofday")

# 3. Crear un nuevo DataFrame 'ubicacion' con las columnas deseadas
ubicacion = df.select("lat", "long", "ENTIDAD_RES", "PAIS")

# 4. Filtrar 'ubicacion' para quedarte solo con los valores de ENTIDAD_RES == 9
cdmx = ubicacion.filter(ubicacion.ENTIDAD_RES == 9)

# 5. Seleccionar los primeros 5 valores y convertirlos a un DataFrame de Pandas
df_pandas = cdmx.limit(5).toPandas()

# Mostrar el DataFrame de Pandas
print(df_pandas)

                 lat                long ENTIDAD_RES    PAIS
0  19.20155345588235  -99.20180252450989           9  Mexico
1          19.482945          -99.113471           9  Mexico
2          19.359004          -99.092623           9  Mexico
3          19.395901          -99.097613           9  Mexico
4  19.20155345588235  -99.20180252450989           9  Mexico
