# 4.5 - PySpark

$$$$

![pyspark](images/pyspark.jpg)

$$$$

Apache Spark es un framework de computación en clúster open-source. Fue desarrollada originariamente en la Universidad de California, en el AMPLab de Berkeley. El código base del proyecto Spark fue donado más tarde a la Apache Software Foundation que se encarga de su mantenimiento desde entonces. Spark proporciona una interfaz para la programación de clusters completos con Paralelismo de Datos implícito y tolerancia a fallos.

Apache Spark se puede considerar un sistema de computación en clúster de propósito general y orientado a la velocidad. Proporciona APIs en Java, Scala, Python y R. También proporciona un motor optimizado que soporta la ejecución de grafos en general. También soporta un conjunto extenso y rico de herramientas de alto nivel entre las que se incluyen Spark SQL (para el procesamiento de datos estructurados basada en SQL), MLlib para implementar machine learning, GraphX para el procesamiento de grafos y Spark Streaming.

In [2]:
%pip install findspark
%pip install pyspark

Collecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=0f3901037435448d4f724d30a3a5c58ce525b0e6f0a105858bd17daa0f7561f8
  Stored in directory: /Users/iudh/Library/Caches/pip/wheels/96/f2/8a/b6465f53b3c21a58960d31ae544861ec2076fc5ae0249ffae8
Su

In [4]:
import warnings
warnings.simplefilter('ignore')

import findspark
findspark.init() 

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark=SparkSession.builder.appName('Nombre').getOrCreate()  # inicia la sesion de spark

path='../data/student-por.csv'

22/11/10 11:53:59 WARN Utils: Your hostname, iudh.local resolves to a loopback address: 127.0.0.1; using 192.168.97.66 instead (on interface en0)
22/11/10 11:53:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/10 11:53:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
data=spark.read.csv(path, header=True, inferSchema=True, sep=';')

data.show(5)

                                                                                

22/11/10 11:54:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+------+---+---+-------+-------+-------+----+----+-------+--------+------+--------+----------+---------+--------+---------+------+----+----------+-------+------+--------+--------+------+--------+-----+----+----+------+--------+---+---+---+
|school|sex|age|address|famsize|Pstatus|Medu|Fedu|   Mjob|    Fjob|reason|guardian|traveltime|studytime|failures|schoolsup|famsup|paid|activities|nursery|higher|internet|romantic|famrel|freetime|goout|Dalc|Walc|health|absences| G1| G2| G3|
+------+---+---+-------+-------+-------+----+----+-------+--------+------+--------+----------+---------+--------+---------+------+----+----------+-------+------+--------+--------+------+--------+-----+----+----+------+--------+---+---+---+
|    GP|  F| 18|      U|    GT3|      A|   4|   4|at_home| teacher|course|  mother|         2|      

In [8]:
display(data.show(5))

+------+---+---+-------+-------+-------+----+----+-------+--------+------+--------+----------+---------+--------+---------+------+----+----------+-------+------+--------+--------+------+--------+-----+----+----+------+--------+---+---+---+
|school|sex|age|address|famsize|Pstatus|Medu|Fedu|   Mjob|    Fjob|reason|guardian|traveltime|studytime|failures|schoolsup|famsup|paid|activities|nursery|higher|internet|romantic|famrel|freetime|goout|Dalc|Walc|health|absences| G1| G2| G3|
+------+---+---+-------+-------+-------+----+----+-------+--------+------+--------+----------+---------+--------+---------+------+----+----------+-------+------+--------+--------+------+--------+-----+----+----+------+--------+---+---+---+
|    GP|  F| 18|      U|    GT3|      A|   4|   4|at_home| teacher|course|  mother|         2|        2|       0|      yes|    no|  no|        no|    yes|   yes|      no|      no|     4|       3|    4|   1|   1|     3|       4|  0| 11| 11|
|    GP|  F| 17|      U|    GT3|      T|

None

In [9]:
drop_cols=['school', 'sex', 'age', 'Mjob', 'Fjob', 'reason', 'guardian']

data=data.select([c for c in data.columns if c not in drop_cols])

data

DataFrame[address: string, famsize: string, Pstatus: string, Medu: int, Fedu: int, traveltime: int, studytime: int, failures: int, schoolsup: string, famsup: string, paid: string, activities: string, nursery: string, higher: string, internet: string, romantic: string, famrel: int, freetime: int, goout: int, Dalc: int, Walc: int, health: int, absences: int, G1: int, G2: int, G3: int]

In [10]:
non_numeric_columns=[item[0] for item in data.dtypes if item[1].startswith('string')]

non_numeric_columns

['address',
 'famsize',
 'Pstatus',
 'schoolsup',
 'famsup',
 'paid',
 'activities',
 'nursery',
 'higher',
 'internet',
 'romantic']

In [11]:
struct_data=data.select('*')

struct_data

DataFrame[address: string, famsize: string, Pstatus: string, Medu: int, Fedu: int, traveltime: int, studytime: int, failures: int, schoolsup: string, famsup: string, paid: string, activities: string, nursery: string, higher: string, internet: string, romantic: string, famrel: int, freetime: int, goout: int, Dalc: int, Walc: int, health: int, absences: int, G1: int, G2: int, G3: int]

In [12]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType


indexers=StringIndexer(inputCols=non_numeric_columns, 
                       outputCols=[c+'_' for c in non_numeric_columns],
                       stringOrderType='alphabetAsc')

struct_data=indexers.fit(struct_data).transform(struct_data)

struct_data=struct_data.select([c for c in struct_data.columns if c not in non_numeric_columns])

for c in struct_data.columns:
    struct_data=struct_data.withColumn(c, struct_data[c].cast(IntegerType()))

    
struct_data.toPandas()

                                                                                

Unnamed: 0,Medu,Fedu,traveltime,studytime,failures,famrel,freetime,goout,Dalc,Walc,...,famsize_,Pstatus_,schoolsup_,famsup_,paid_,activities_,nursery_,higher_,internet_,romantic_
0,4,4,2,2,0,4,3,4,1,1,...,0,0,1,0,0,0,1,1,0,0
1,1,1,1,2,0,5,3,3,1,1,...,0,1,0,1,0,0,0,1,1,0
2,1,1,1,2,0,4,3,2,2,3,...,1,1,1,0,0,0,1,1,1,0
3,4,2,1,3,0,3,2,2,1,1,...,0,1,0,1,0,1,1,1,1,1
4,3,3,1,2,0,4,3,2,1,2,...,0,1,0,1,0,0,1,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
644,2,3,1,3,1,5,4,2,1,2,...,0,1,0,0,0,1,0,1,1,0
645,3,1,1,2,0,4,3,4,1,1,...,1,1,0,1,0,0,1,1,1,0
646,1,1,2,2,0,1,1,1,1,1,...,0,1,0,0,0,1,1,1,0,0
647,3,1,2,1,0,2,4,5,3,4,...,1,1,0,0,0,0,0,1,1,0


### Ejemplo: aproximando $\pi$

Utilizaremos el Método de MonteCarlo para aproximar el número $\pi$. El método Monte Carlo es un método en el que por medio de la estadística y la probabilidad podemos determinar valores o soluciones de ecuaciones que calculados con exactitud son muy complejas, pero que mediante este método resulta sencillo calcular una aproximación al resultado que buscamos.

$$$$

![pi](images/pi.png)

$$$$

Lo primero construir el entorno de trabajo. Este sería:

+ Construiremos un cuadrado de lado 1.
+ Construimos un círculo inscrito en el cuadrado, que tiene de centro, el centro del cuadrado y de radio 1. Su área será $\pi$.
+ Generaremos puntos al azar dentro del cuadrado. Para entenderlo mejor es como lanzar dardos sobre una diana con los ojos vendados, de tal forma que siempre acertamos dentro de los límites de ese cuadrado. 

Aplicamos ahora el Método MonteCarlo:
+ Contaremos el total de puntos generados.
+ Contaremos el total de puntos que cayeron dentro del círculo.
+ Realizaremos el siguiente razonamiento:

$$A0 =  Área_{cuadrado} = N_{puntos} $$
$$$$
$$A1 = Área_{círculo} = \pi · r^{2}$$

Ahora:

$$\frac{\pi · r^{2}}{N_{puntos}} = \frac{Área_{círculo}}{Área_{cuadrado}}$$

Resumiremos en un cuadrante, y los que nos queda es que:

$$\pi=4·Área_{cuadrante}$$

El valor de $\pi$ es 4 veces la probabilidad de que el punto caiga en la zona roja.

In [13]:
import numpy as np
from pyspark import SparkContext

In [14]:
# puntos aleatorios dentro del círculo

def dentro(punto):
    x, y = np.random.random(), np.random.random()
    return x*x + y*y < 1

In [15]:
def estimar_pi(n_total):
    print('Proceso normal...')

    puntos=list(filter(dentro, list(range(n_total)))) 
    
    cuenta=len(puntos)
  
    return 4. * cuenta/n_total

In [16]:
%%time
display(estimar_pi(5000))
display(estimar_pi(50000))
display(estimar_pi(5000000))
        
display('Valor real pi: ' ,np.pi)

Proceso normal...


3.172

Proceso normal...


3.1432

Proceso normal...


3.1414896

'Valor real pi: '

3.141592653589793

CPU times: user 4.67 s, sys: 99.7 ms, total: 4.77 s
Wall time: 4.79 s


**con spark**

In [17]:
sesion=SparkContext.getOrCreate()

In [18]:
def estimar_pi_paralelo(n_total):
    print('Proceso con Spark..')

    cuenta=sesion.parallelize(range(0, n_total)).filter(dentro).count()

    return 4. * cuenta/n_total

In [20]:
%%time
display(estimar_pi_paralelo(5000))
display(estimar_pi_paralelo(50000))
display(estimar_pi_paralelo(5000000))
        
display('Valor real pi: ' ,np.pi)

Proceso con Spark..


3.1488

Proceso con Spark..


3.14624

Proceso con Spark..


                                                                                

3.144416

'Valor real pi: '

3.141592653589793

CPU times: user 21.9 ms, sys: 12.1 ms, total: 34 ms
Wall time: 2.11 s
