# Módulo 47 - Big Data 1

## Spark
Spark es una herramienta de apache para la gestión y manejo de Big Data. Es un framework de procesamiento de información que puede generar acciones rápidas sobre cojuntos de datos muy grandes. Tiene capacidades de procesamiento distribuido en varios servidores; ideal para tratamiento de datos de machine learning.

La arquitectura esta diseñada para escalar plataformas. Tiene un **Driver** que es el que convierte en código del usuario en tareas, que son a su vez ejecutadas por los **Executors** que habitan en los *Work Nodes* y que pueden estar distribuidos en los distintos servers. Generalmente existe una capa de manejo de Clusters para que  los Workers se auto adapten a la demanda del sistema. Esta capa puede ser Hadoop Yarn, Kubernetes, Docker Swarm, etc.

### ¿Por qué Spark?
1. Velocidad - Tiene un data-engine en memoria que puede reducir mucho su tiempo de ejecución de hasta 10x en tareas complejas respecto a su tecnología más parecida
2. Spark API Amigable - La facilidad de llamar a la API de Spark lo hace muy atractivo. La implementación de la API esconde mucho del a complejidad de configurar y ejecutar un sistema tan complejo

### Conceptos importantes Spark
1. RDD (Resilient Distributed Dataset) Una abstracción para una colección de objetos que puede estar distribbuida en un cluster de servers. Pueden ser archivos de texto, bases de datos SQL, buckets S3, etc. El API de Spark es´ta construido sobre este concepto, donde se pueden hacer operaciones de unir data sets, filtrar, samplear datos, agregarlos, etc.
2. Spark SQL - Originalmente llamdo Shark. Es la interfaz más conocida para tratar datos en Spark. Se usa el concepto de dataframe, implementación de SQL compliant con todos los estándares.

### Otras librerias de Spark
- Mllib - Framework para pipelines de Machine Learning - K-means, random forests, clasificación, regresión, etc.
- GraphX - Procesamiento de estructura de imágenes Big Data. Se traducen los gráficos a dataframes, además de usar Catalyst
- Streaming - Procesamiento de información en tiempo real. Se divide la información en "micro-batches" orquestados por la API de spark

In [1]:
# Librerias a importar
import sys
from random import random
from operator import add
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark 
print(pyspark.__version__)

3.5.1


In [2]:
spark = SparkSession\
            .builder\
            .master("local")\
            .appName("PythonPi")\
            .getOrCreate()

In [3]:
# Out of stack overflow forum
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

import os
import sys
spark_path = "C:\Spark\spark-3.5.1-bin-hadoop3" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.9.7-src.zip")

In [4]:
#Ejemplo Simple 
partitions = 4
n = 100000 * partitions

def f(_):
    x = random()*2 - 1
    y = random()*2 - 1
    return 1 if x**2 + y**2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add)
print('Pi is roughly %f' % (4.0 * count / n))

spark.stop()

Pi is roughly 3.141200


## Operaciones básicas de Spark
- Lectura de Archivos
- Análisis inicial de datos
    - Crear sesión de Spark
    - Generar un Schema y llenarlo de datos de un CSV
    - Contar el número de filas, etc.

In [5]:
# Importar sesión de Spark
from pyspark.sql import SparkSession

# Generar sesión
spark = SparkSession\
            .builder\
            .appName("LecturaArchivoCSV")\
            .getOrCreate()

In [6]:
# Leer archivo de videojuegos
path_archivo = "D:/DataAnalysis_EBAC/ebac/Python/Modulo19/vgsales.csv"
df = spark.read.csv(path_archivo, header=True)

In [7]:
# Nota como es un dataframe de Spark, no pandas
type(df)

pyspark.sql.dataframe.DataFrame

In [8]:
df.show(10)

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|          33|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|          1|       31.37|
|   6|              Tetris|      GB|1989|      Puzzle| Nintendo|

In [9]:
# Typos de datos
df.dtypes

[('Rank', 'string'),
 ('Name', 'string'),
 ('Platform', 'string'),
 ('Year', 'string'),
 ('Genre', 'string'),
 ('Publisher', 'string'),
 ('NA_Sales', 'string'),
 ('EU_Sales', 'string'),
 ('JP_Sales', 'string'),
 ('Other_Sales', 'string'),
 ('Global_Sales', 'string')]

In [10]:
# Cambiar números de string a float
from pyspark.sql.types import IntegerType, FloatType

df = df.withColumn('NA_Sales', df.NA_Sales.cast(FloatType()))
df = df.withColumn('EU_Sales', df.EU_Sales.cast(FloatType()))
df = df.withColumn('JP_Sales', df.JP_Sales.cast(FloatType()))
df = df.withColumn('Other_Sales', df.Other_Sales.cast(FloatType()))
df = df.withColumn('Global_Sales', df.Global_Sales.cast(FloatType()))

df.dtypes


[('Rank', 'string'),
 ('Name', 'string'),
 ('Platform', 'string'),
 ('Year', 'string'),
 ('Genre', 'string'),
 ('Publisher', 'string'),
 ('NA_Sales', 'float'),
 ('EU_Sales', 'float'),
 ('JP_Sales', 'float'),
 ('Other_Sales', 'float'),
 ('Global_Sales', 'float')]

In [11]:
# Imprime la información de columnas de una manera jerárquica
df.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: float (nullable = true)
 |-- EU_Sales: float (nullable = true)
 |-- JP_Sales: float (nullable = true)
 |-- Other_Sales: float (nullable = true)
 |-- Global_Sales: float (nullable = true)



In [12]:
# Ordenamiento de Resuldados con .sort()
import pyspark.sql.functions as F1

# Se ordena por una columna
df.sort(F1.col("Platform")).show(10)


+----+---------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+
|Rank|           Name|Platform|Year|   Genre|   Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+---------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+
| 545|Missile Command|    2600|1980| Shooter|       Atari|    2.56|    0.17|     0.0|       0.03|        2.76|
|1431|      Centipede|    2600|1981| Shooter|       Atari|    1.26|    0.08|     0.0|       0.01|        1.36|
| 608| Space Invaders|    2600| N/A| Shooter|       Atari|    2.36|    0.14|     0.0|       0.03|        2.53|
| 240|       Pitfall!|    2600|1981|Platform|  Activision|    4.21|    0.24|     0.0|       0.05|         4.5|
| 736|        Frogger|    2600|1981|  Action|Parker Bros.|    2.06|    0.12|     0.0|       0.02|         2.2|
|1108|    Ms. Pac-Man|    2600|1981|  Puzzle|       Atari|    1.54|     0.1|     0.0|       0.02|        1.65|
|

In [13]:
df.count()

16598

In [14]:
# Ordenar por dos columnas
df.sort(F1.col("Platform"), F1.col('Year').desc()).show(10)

# Nota como no se usan corchetes, solamente se listan las columnas por las que se quiere ordenar

+----+--------------------+--------+----+---------+-----------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|    Genre|  Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+---------+-----------+--------+--------+--------+-----------+------------+
|4380|Maze Craze: A Gam...|    2600| N/A|   Action|      Atari|    0.42|    0.02|     0.0|        0.0|        0.45|
|4471|      Super Breakout|    2600| N/A|   Puzzle|      Atari|    0.41|    0.03|     0.0|        0.0|        0.44|
|5063|             Hangman|    2600| N/A|   Puzzle|      Atari|    0.35|    0.02|     0.0|        0.0|        0.38|
|1515|           Adventure|    2600| N/A|Adventure|      Atari|    1.21|    0.08|     0.0|       0.01|         1.3|
|5659|            Dragster|    2600| N/A|   Racing| Activision|     0.3|    0.02|     0.0|        0.0|        0.32|
|2115|      Air-Sea Battle|    2600| N/A|  Shooter|      Atari|    0.91|

In [15]:
# Order by
df.orderBy("Year", 'Platform').show(10)

+----+---------------+--------+----+--------+----------+--------+--------+--------+-----------+------------+
|Rank|           Name|Platform|Year|   Genre| Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+---------------+--------+----+--------+----------+--------+--------+--------+-----------+------------+
| 259|      Asteroids|    2600|1980| Shooter|     Atari|     4.0|    0.26|     0.0|       0.05|        4.31|
| 545|Missile Command|    2600|1980| Shooter|     Atari|    2.56|    0.17|     0.0|       0.03|        2.76|
|1768|        Kaboom!|    2600|1980|    Misc|Activision|    1.07|    0.07|     0.0|       0.01|        1.15|
|1971|       Defender|    2600|1980|    Misc|     Atari|    0.99|    0.05|     0.0|       0.01|        1.05|
|2671|         Boxing|    2600|1980|Fighting|Activision|    0.72|    0.04|     0.0|       0.01|        0.77|
|4027|     Ice Hockey|    2600|1980|  Sports|Activision|    0.46|    0.03|     0.0|       0.01|        0.49|
|5368|        Freew

In [16]:
# Agrupación de resultados
df.groupBy('Platform', "Genre").sum('Global_Sales', 'NA_Sales').show()

+--------+----------+--------------------+------------------+
|Platform|     Genre|   sum(Global_Sales)|     sum(NA_Sales)|
+--------+----------+--------------------+------------------+
|      PS|    Action|  127.05000039748847| 62.87999978289008|
|     NES|    Puzzle|  20.999999906867743|  9.13999992236495|
|    WiiU|  Strategy|  1.2400000132620335|0.5099999867379665|
|     GBA|  Platform|   78.29999975115061| 45.81000040844083|
|     Wii|  Fighting|  23.859999924898148|13.149999974295497|
|    XOne|      Misc|   6.860000053420663| 4.439999930560589|
|    XOne|  Fighting|  2.3100000210106373| 1.580000001937151|
|     3DO|Simulation|0.019999999552965164|               0.0|
|     PS4|    Action|   87.05999964103103|29.699999878183007|
|      PS|    Puzzle|  12.080000067129731| 5.289999892935157|
|      GB|    Puzzle|  47.470000356435776|29.350000709295273|
|    X360|  Strategy|  10.130000134930015| 6.489999949932098|
|      GC|    Racing|   21.88999987952411|14.929999867454171|
|    XOn

### Uso de SQL y tablas temporales dentro de PySpark

In [17]:
# Crear tabla temporal
df.createOrReplaceTempView('VGSALES')

# Comando SQL
sql_str = 'select Genre, Name, Rank, Publisher, NA_Sales, Global_Sales from VGSALES limit 20'

# Ejecutar SQL
spark.sql(sql_str).show()

+------------+--------------------+----+--------------------+--------+------------+
|       Genre|                Name|Rank|           Publisher|NA_Sales|Global_Sales|
+------------+--------------------+----+--------------------+--------+------------+
|      Sports|          Wii Sports|   1|            Nintendo|   41.49|       82.74|
|    Platform|   Super Mario Bros.|   2|            Nintendo|   29.08|       40.24|
|      Racing|      Mario Kart Wii|   3|            Nintendo|   15.85|       35.82|
|      Sports|   Wii Sports Resort|   4|            Nintendo|   15.75|        33.0|
|Role-Playing|Pokemon Red/Pokem...|   5|            Nintendo|   11.27|       31.37|
|      Puzzle|              Tetris|   6|            Nintendo|    23.2|       30.26|
|    Platform|New Super Mario B...|   7|            Nintendo|   11.38|       30.01|
|        Misc|            Wii Play|   8|            Nintendo|   14.03|       29.02|
|    Platform|New Super Mario B...|   9|            Nintendo|   14.59|      

In [18]:
df.createOrReplaceGlobalTempView('VGSALES')
sql_str =  """
                select  Genre, 
                        Publisher,
                        round(sum(Global_Sales),2),
                        round(sum(Global_Sales),2) 
                from VGSALES 
                group by Genre, Publisher 
                order by Genre, Publisher desc
"""

spark.sql(sql_str).show()

+------+--------------------+---------------------------+---------------------------+
| Genre|           Publisher|round(sum(Global_Sales), 2)|round(sum(Global_Sales), 2)|
+------+--------------------+---------------------------+---------------------------+
|Action|           mixi, Inc|                       0.86|                       0.86|
|Action|     dramatic create|                       0.01|                       0.01|
|Action|         Zushi Games|                       0.02|                       0.02|
|Action|           Zoo Games|                       0.27|                       0.27|
|Action|Zoo Digital Publi...|                       0.78|                       0.78|
|Action|                Yeti|                       0.01|                       0.01|
|Action|         Xseed Games|                       0.11|                       0.11|
|Action|  Wizard Video Games|                       0.62|                       0.62|
|Action|Warner Bros. Inte...|                     118.

# Big Data Parte 2  



In [19]:
df.createOrReplaceGlobalTempView('VGSALES')
sql_str =  """
                select  Genre, 
                        Publisher,
                        round(sum(Global_Sales),2),
                        round(sum(Global_Sales),2) 
                from VGSALES 
                group by Genre, Publisher 
                order by Genre, Publisher desc
"""

# El 10 limita  mis resultados a 10 líneas, el 80 hace que el máximo de caracteres que se muestran en una celda se extienda hasta 80
spark.sql(sql_str).show(10, 80)

+------+--------------------------------------+---------------------------+---------------------------+
| Genre|                             Publisher|round(sum(Global_Sales), 2)|round(sum(Global_Sales), 2)|
+------+--------------------------------------+---------------------------+---------------------------+
|Action|                             mixi, Inc|                       0.86|                       0.86|
|Action|                       dramatic create|                       0.01|                       0.01|
|Action|                           Zushi Games|                       0.02|                       0.02|
|Action|                             Zoo Games|                       0.27|                       0.27|
|Action|                Zoo Digital Publishing|                       0.78|                       0.78|
|Action|                                  Yeti|                       0.01|                       0.01|
|Action|                           Xseed Games|                 

In [20]:
df.createOrReplaceGlobalTempView('VGSALES')
sql_str =  """
                select  Genre, 
                        Publisher,
                        round(sum(Global_Sales),2),
                        round(sum(Global_Sales),2) 
                from VGSALES 
                group by Genre, Publisher 
                order by Genre, Publisher desc
"""

# El True hace que el display sea en forma de tarjeta
spark.sql(sql_str).show(5, 80, True)

-RECORD 0---------------------------------------------
 Genre                       | Action                 
 Publisher                   | mixi, Inc              
 round(sum(Global_Sales), 2) | 0.86                   
 round(sum(Global_Sales), 2) | 0.86                   
-RECORD 1---------------------------------------------
 Genre                       | Action                 
 Publisher                   | dramatic create        
 round(sum(Global_Sales), 2) | 0.01                   
 round(sum(Global_Sales), 2) | 0.01                   
-RECORD 2---------------------------------------------
 Genre                       | Action                 
 Publisher                   | Zushi Games            
 round(sum(Global_Sales), 2) | 0.02                   
 round(sum(Global_Sales), 2) | 0.02                   
-RECORD 3---------------------------------------------
 Genre                       | Action                 
 Publisher                   | Zoo Games              
 round(sum

## Particionamiento de Datos - partitionBy()
Debido a la cantidad de información que podemos estar manejando en Spark, podemos dividir la data en pedazos más pequeños. La partición de datos consiste en dividir esa información en porciones más pequeñas pbasadas en algún criterio. Esto se usa para mejorar el perfomrmance en general. Para poder paralelizar el procesamiento.

### Tres Tips:
- No usar tamaños extremos - No particionar en datasets ni muy grandes ni muy pequeñas. Tratar de hacer que todas las particiones tengan aproximadamente el mismo tamaño. Se recomiendan tamaños de 256Mb a 1Gb por particion
- No usar IDs para hacer la partición puesto que esto resultaría en muchas particiones de un registro
- Usar columnas para particionar la data que van a ser usadas en sentencias groupBy

### Tres tipos de particiones:
- PartitionBy - Va a cambiar la estructura del folder uysando el criterio que se le especifique. "Cortame por Género o por país o por plataforma".  No controla cuántos archivos se generan debajo de cada folder. se puede combinar repartition + partitionBy
- Repartition - Este método puede incrementar o reducir el # de particiones. Aqui se especifíca el número de particiones y los registros que hay en cada una se distribuyen al azar. 
- Coalesce - Reduce el número de particiones en un DataFrame. En vez de crear nuevas particiones mueve la data y la ajusta a las particiones existentes, por lo que solamente puede reducir el número de particiones. Es más barato computacionalmente.

En términos prácticos se verá la creación de muchos archivos en los que cada método genera la data. Estos podrán ser varios volders, varios archiovs o una combinación de ambos

In [21]:
df.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: float (nullable = true)
 |-- EU_Sales: float (nullable = true)
 |-- JP_Sales: float (nullable = true)
 |-- Other_Sales: float (nullable = true)
 |-- Global_Sales: float (nullable = true)



In [22]:
# Darse cuenta de que el campo por el que se hace partitionBy no aparece dentro de los datasets exportados
df.write.option('header', True).partitionBy('Platform').mode('overwrite').csv('./partitions/platform')
# df.write.option('header', True).partitionBy('Platform').mode('overwrite').format('csv').save('partitions')

In [23]:
# Se puede hacer por multiples columnas
df.write.option('header', True).partitionBy('Platform', 'Year').mode('overwrite').csv('./partitions/platform_year')

In [24]:
# Otro ejemplo, pero ahora controlando el numero de particiones
df.write.option('header', True).option('maxRecordsPerFile', 10).partitionBy('Genre').mode('overwrite').csv('./partitions/yearMaxRows10')

In [25]:
# Repartition
df.repartition(20).write.mode("overwrite").option('header', True).csv('./partitions/repartition') # Aqui pasamos el número de particiones que le queremos pasar (20)
# Fíjate como en cada uno de los archivos, los indexes salen desordenados puesto que acomoda al azar en cada uno de los archivos

In [26]:
# Coalesce
# A diferencia del repartition, coalesce sirve para disminuir el número de las particiones. Coalesce no aplica un full reshuffle a menos que sea necesario
df.write.mode('overwrite').option('header', True).csv('./partitions/coalesce')  # not sure about this