# PySpark y Pandas laboratorio Data Engineering / Data Science



## Introducción
Este proyecto combina el poder de PySpark y Pandas para abordar tareas de análisis de datos desde dos enfoques complementarios: el procesamiento distribuido a gran escala y la manipulación eficiente en memoria. A través de esta integración, se busca aprovechar lo mejor de ambos mundos para construir flujos de trabajo robustos, escalables y ágiles.
- PySpark es la interfaz de Python para Apache Spark, diseñada para ejecutar operaciones sobre grandes volúmenes de datos distribuidos en múltiples nodos. Es ideal para entornos donde los datos superan la capacidad de una sola máquina, como en escenarios de Big Data.
- Pandas, por su parte, es una biblioteca especializada en el manejo de datos estructurados en memoria. Ofrece herramientas versátiles para la limpieza, transformación y análisis exploratorio, siendo especialmente útil en etapas de prototipado y validación rápida.

## Objectivos
- Comprender PySpark y Pandas
Dando caso de uso a sus aplicaciones en procesamiento distribuido y manipulación de datos.
- Configurar el entorno
Instalar y preparar PySpark y Pandas para trabajar de forma conjunta en un entorno Python.
- Cargar y explorar datos
Importar conjuntos de datos en DataFrames de Pandas y PySpark, y realizar exploraciones básicas para entender su estructura.
- Convertir entre DataFrames
Transformar un DataFrame de Pandas en uno de PySpark para aprovechar el procesamiento distribuido.
- Manipular datos con PySpark
Crear columnas nuevas, aplicar filtros y realizar agregaciones utilizando las funciones propias de PySpark.
- Ejecutar consultas SQL
Utilizar Spark SQL para realizar consultas sobre los datos y aplicar funciones definidas por el usuario (UDFs) para extender la lógica de análisis.
- Visualizacion y Mineria de datos
Realizar procesamiento y toma de decisiones en funcion de mineria de datos




Data -> COVID-19


### Preparacion del entorno
Instalacion de librerias necesarias


In [4]:
!pip install pyspark
!pip install findspark
!pip install pandas

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


## Inicializacion de Spark
Configuracion del entorno para empezar a trabajar con Apache Spark desde python usando PySpark y pandas para analisis de los datos. Se debe de preparar una sesion de Spark para poder empezar a procesar los datos.
- findspark -> Permite que Python encuentre la instalación de Spark en tu sistema, especialmente útil si Spark no está en el PATH del sistema. 
- findspark.init() -> Inicializa findspark para que Spark esté disponible / Inicializa findspark para que Spark esté disponible
- SparkSession -> Punto de entrada principal para trabajar con DataFrames en PySpark.
- SparkSession.builder -> Crea una nueva sesión de Spark.
- .appName("COVID-19 Data Processing") -> Asigna un nombre identificador a la aplicación Spark.
- .config(...) -> Activa Arrow para mejorar el rendimiento al convertir entre Pandas y PySpark.
- .getOrCreate() -> Obtiene una sesión existente o crea una nueva si no hay ninguna activa.


In [2]:
import findspark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType # Importa tipos de datos
import pandas as pd  
findspark.init()
# Initialize a Spark Session
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Processing") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Check if the Spark Session is active
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession esta activa.")
else:
    print("SparkSession no esta activa.")

SparkSession esta activa.


## Lectura de los datos con Pandas


In [3]:
data_covid = pd.read_csv('covid-latest.csv')
if data_covid is not None:
    print("Datos cargados correctamente en el DataFrame de Pandas.")
else:
    print("Error al cargar los datos.")

Datos cargados correctamente en el DataFrame de Pandas.


## Visualizacion y entendimiento de los datos


In [4]:
print("Mostrando los primeros 5 registros de la base de datos:")
# Funcion para mostrar las primeras 5 filas del DataFrame
data_covid.head()

Mostrando los primeros 5 registros de la base de datos:


Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,population,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2024-08-04,235214.0,0.0,0.0,7998.0,0.0,0.0,...,,37.746,0.5,64.83,0.511,41128770.0,,,,
1,OWID_AFR,,Africa,2024-08-04,13145380.0,36.0,5.143,259117.0,0.0,0.0,...,,,,,,1426737000.0,,,,
2,ALB,Europe,Albania,2024-08-04,335047.0,0.0,0.0,3605.0,0.0,0.0,...,51.2,,2.89,78.57,0.795,2842318.0,,,,
3,DZA,Africa,Algeria,2024-08-04,272139.0,18.0,2.571,6881.0,0.0,0.0,...,30.4,83.741,1.9,76.88,0.748,44903230.0,,,,
4,ASM,Oceania,American Samoa,2024-08-04,8359.0,0.0,0.0,34.0,0.0,0.0,...,,,,73.74,,44295.0,,,,


## Pandas DataFrame a Spark DataFrame


- The resulting spark_df will have the defined schema, which ensures consistency and compatibility with Spark's data processing capabilities.


### Storing the result:


In [None]:
# Pasando a Spark DataFrame
# Definiendo el schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

# Pasando el DataFrame de Pandas a Spark DataFrame con el schema definido
# Asegurando que los tipos de datos coincidan con el schema
data_covid['continent'] = data_covid['continent'].astype(str)  # Ensures continent is a string
data_covid['total_cases'] = data_covid['total_cases'].fillna(0).astype('int64')  # Fill NaNs and convert to int
data_covid['total_deaths'] = data_covid['total_deaths'].fillna(0).astype('int64')  # Fill NaNs and convert to int
data_covid['total_vaccinations'] = data_covid['total_vaccinations'].fillna(0).astype('int64')  # Fill NaNs and convert to int
data_covid['population'] = data_covid['population'].fillna(0).astype('int64')  # Fill NaNs and convert to int

spark_df = spark.createDataFrame(data_covid[schema.fieldNames()])  # Usando solo las columnas definidas en el schema
# Mostrando las primeras 5 filas del Spark DataFrame
spark_df.show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|         Asia|     235214|        7998|                 0|  41128772|
|          nan|   13145380|      259117|                 0|1426736614|
|       Europe|     335047|        3605|                 0|   2842318|
|       Africa|     272139|        6881|                 0|  44903228|
|      Oceania|       8359|          34|                 0|     44295|
|       Europe|      48015|         159|                 0|     79843|
|       Africa|     107481|        1937|                 0|  35588996|
|North America|       3904|          12|                 0|     15877|
|North America|       9106|         146|                 0|     93772|
|South America|   10101218|      130663|                 0|  45510324|
|         Asia|     452273|        8777|                 0|   2780472|
|North

## Mostrando la estructura


In [None]:
print("Schema of the Spark DataFrame:")
spark_df.printSchema()

Schema of the Spark DataFrame:
root
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)



## EDA - Exploracion de los datos


In [7]:
# Lista de columnas a mostrar
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
spark_df.select(columns_to_display).show(5) #" Mostrar las primeras 5 filas de las columnas seleccionadas" con Spark DataFrame

+---------+-----------+------------+------------------+----------+
|continent|total_cases|total_deaths|total_vaccinations|population|
+---------+-----------+------------+------------------+----------+
|     Asia|     235214|        7998|                 0|  41128772|
|      nan|   13145380|      259117|                 0|1426736614|
|   Europe|     335047|        3605|                 0|   2842318|
|   Africa|     272139|        6881|                 0|  44903228|
|  Oceania|       8359|          34|                 0|     44295|
+---------+-----------+------------+------------------+----------+
only showing top 5 rows


### 9.2 Picking specific columns


In [11]:
print("Mostrando el total de casos por el continente:")
spark_df.select('continent', 'total_cases').show(10)

Mostrando el total de casos por el continente:
+-------------+-----------+
|    continent|total_cases|
+-------------+-----------+
|         Asia|     235214|
|          nan|   13145380|
|       Europe|     335047|
|       Africa|     272139|
|      Oceania|       8359|
|       Europe|      48015|
|       Africa|     107481|
|North America|       3904|
|North America|       9106|
|South America|   10101218|
+-------------+-----------+
only showing top 10 rows


In [15]:
print("Filtrando registros con más de 3 millónes de casos:")
spark_df.filter(spark_df['total_cases'] > 3000000).show(10) 

Filtrando registros con más de 3 millónes de casos:
+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|          nan|   13145380|      259117|                 0|1426736614|
|South America|   10101218|      130663|                 0|  45510324|
|          nan|  301499099|     1637249|        9104304615|4721383370|
|      Oceania|   11861161|       25236|                 0|  26177410|
|       Europe|    6082444|       22534|                 0|   8939617|
|       Europe|    4872829|       34339|                 0|  11655923|
|South America|   37511921|      702116|                 0| 215313504|
|North America|    4819055|       55282|         102877159|  38454328|
|South America|    5401126|       62730|                 0|  19603736|
|         Asia|   99373219|      122304|                 0|1425887360|
+-------------+--------

## Modificacion y creacion de nuevas columnas


In [None]:
from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    'death_percentage', 
    (spark_df['total_deaths'] / spark_df['population']) * 100
)
spark_df_with_percentage = spark_df_with_percentage.withColumn(
    'death_percentage',
    F.concat(
        # Aplicando formato de número con 2 decimales
        F.format_number(spark_df_with_percentage['death_percentage'], 2), 
        # Incluyendo el símbolo de porcentaje
        F.lit('%')  
    )
)
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases']
spark_df_with_percentage.select(columns_to_display).show(5)

+------------+----------+----------------+---------+------------------+-----------+
|total_deaths|population|death_percentage|continent|total_vaccinations|total_cases|
+------------+----------+----------------+---------+------------------+-----------+
|        7998|  41128772|           0.02%|     Asia|                 0|     235214|
|      259117|1426736614|           0.02%|      nan|                 0|   13145380|
|        3605|   2842318|           0.13%|   Europe|                 0|     335047|
|        6881|  44903228|           0.02%|   Africa|                 0|     272139|
|          34|     44295|           0.08%|  Oceania|                 0|       8359|
+------------+----------+----------------+---------+------------------+-----------+
only showing top 5 rows


## Agrupacion y sumatorias


In [17]:
print("Calculando el total de muertes por continente:")
spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()  

Calculando el total de muertes por continente:
+-------------+-----------------+
|    continent|sum(total_deaths)|
+-------------+-----------------+
|       Europe|          2102483|
|       Africa|           259117|
|          nan|         22430618|
|North America|          1671178|
|South America|          1354187|
|      Oceania|            32918|
|         Asia|          1637249|
+-------------+-----------------+



## UDFs
- Las UDFs (User Defined Functions) en PySpark son funciones personalizadas que tú defines en Python para aplicar transformaciones sobre columnas de un DataFrame distribuido. Son útiles cuando necesitas lógica que no está disponible en las funciones nativas de Spark.
- Spark tiene muchas funciones integradas (withColumn, filter, groupBy, etc.), pero a veces necesitas aplicar una lógica específica que no existe. Ahí es donde entran las UDFs.


In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
# Creacion de funcion UDF

def convert_total_deaths(x):
    return int(x) * 2 if x is not None else None

spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())

<function __main__.convert_total_deaths(x)>

In [35]:
from pyspark.sql.functions import col

spark_df = spark_df.withColumn("total_deaths", col("total_deaths").cast("int"))


## Spark SQL


In [37]:
# Borrar la tabla temporal si ya existe
spark.sql("DROP VIEW IF EXISTS data_v")
# Crear de nuevo una vista temporal
spark_df.createOrReplaceTempView("data_v")
# Ejecutamos la consulta SQL usando la UDF
spark.sql("""
    SELECT continent, total_deaths, convert_total_deaths(total_deaths) AS converted_total_deaths
    FROM data_v
""")


DataFrame[continent: string, total_deaths: int, converted_total_deaths: int]

SQL queries


In [38]:
spark.sql('SELECT * FROM data_v').show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|         Asia|     235214|        7998|                 0|  41128772|
|          nan|   13145380|      259117|                 0|1426736614|
|       Europe|     335047|        3605|                 0|   2842318|
|       Africa|     272139|        6881|                 0|  44903228|
|      Oceania|       8359|          34|                 0|     44295|
|       Europe|      48015|         159|                 0|     79843|
|       Africa|     107481|        1937|                 0|  35588996|
|North America|       3904|          12|                 0|     15877|
|North America|       9106|         146|                 0|     93772|
|South America|   10101218|      130663|                 0|  45510324|
|         Asia|     452273|        8777|                 0|   2780472|
|North

In [39]:
# SQL filtering
spark.sql("SELECT continent FROM data_v WHERE total_vaccinations > 1000000").show()

+-------------+
|    continent|
+-------------+
|          nan|
|North America|
|       Europe|
|       Europe|
|          nan|
|          nan|
|          nan|
|         Asia|
|         Asia|
|       Europe|
|          nan|
|         Asia|
|      Oceania|
|          nan|
|          nan|
|          nan|
|          nan|
+-------------+

