## 7.1.- Intro a PySpark

Spark, primero de todo, es un "framework" gratuito y de código abierto para la computación en "clusters" de ordenadores. 

¿Pero qué es un "cluster"? Un "cluster" es una agrupación o conglomerado de ordenadores con un mismo "hardware" común que se comportan como si fuesen un único ordenador, que se usan mayoritariamente para procesar cálculos complejos o grandes volúmenes de datos. 

Una de las principales ventajas de Spark es que combina esta capacidad de computación distribuida con un código sencillo y elegante, que además ofrece soporte a múltiples lenguajes de programación, entre ellos el Python. de la fusión entre Python y Spark nace el paquete PySpark. 

Spark fue creado en la Universidad de Berkeley en California en 2014 y es considerado el primer "software" de código abierto que hace la programación distribuida realmente accesible a los científicos de datos, no solo por su gratuidad, sino también por su relativa sencillez. 

El Spark mantiene la escalabilidad lineal y la tolerancia a fallos del MapReduce, pero a su vez amplia sus puntos fuertes gracias a dos nuevas funcionalidades. Aquí vamos a destacar dos: los grafos acíclicos dirigidos, del inglés **"directed acyclic graph"**, y las bases de datos distribuidas y resilientes, del inglés **"resilient distributed dataset"**. 

En primer lugar, el grafo acíclico dirigido, como su nombre indica, es un grafo dirigido que no tiene ciclos, es decir, que para cada nodo o elemento del grafo no hay ningún camino directo que comience y finalice en dicho nodo. En otras palabras, desde una etapa de trabajo o estado concreto de nuestro proyecto vamos a realizar múltiples procesos, transformaciones de nuestros datos, acciones, etc., algunos de los cuales van a realizarse de forma paralela, pero a lo largo de todos los procesos nunca vamos a regresar al estado en el que nos encontrábamos al principio. Esta es la clave. Spark soporta el flujo de datos acíclico. Cada tarea de Spark crea un grafo cíclico dirigido de etapas de trabajo para que se ejecuten en un determinado "cluster". En comparación con MapReduce, el cual crea un grafo cíclico dirigido con dos estados predefinidos, el 'map' y el 'reduce', los grafos acíclicos dirigidos creados por Spark pueden tener cualquier número de etapas. Spark, con esta estructura, es más rápido que MapReduce, por el hecho de que no tiene que escribir en disco los resultados obtenidos en las etapas intermedias del grafo. MapReduce, sin embargo, sí que debe escribir en disco los resultados entre las etapas 'map' y 'reduce'. 

Los "resilient distributed dataset" son una estructura de almacenaje distribuida en particiones, lo que permite a los programadores realizar operaciones sobre grandes cantidades de datos en "clusters" de manera rápida y a prueba de fallos. Esta nueva estructura de datos se originó en respuesta a los problemas que se generaban al manejar datos de manera ineficiente, especialmente a la hora de ejecutar algoritmos iterativos y procesos de minería de datos. En ambos casos, mantener los datos en memoria y de manera distribuida mejora el rendimiento considerablemente. 

Un programa típico de Spark se estructura de la siguiente forma:
 1.- se crea un "resilient distributed dataset" a partir de una variable de entorno llamada 'Context'. Esto se realiza leyendo datos de un fichero, bases de datos o cualquier otra fuente de información, ya sea estática o dinámica. 
 2.- se realizan las transformaciones deseadas para crear más objetos RDD a partir del primero. Estas transformaciones se expresan en términos de programación funcional y no eliminan el RDD original, sino que crean nuevos. 
 3.- se realizan acciones sobre estos RDD y posiblemente más transformaciones sobre los datos. 
 4.- una vez llevados a cabo los procesos de acción y transformación, los objetos RDD que hayamos creado deben converger para crear el RDD final, que es nuestro objetivo de análisis. Estos pueden tener la forma de un solo valor: un vector de resultados, un nuevo "dataset", etc., en función de nuestros objetivos, así que tendremos que ser conscientes de esto a la hora de orientar todo el conjunto de transformaciones y acciones. 
 
Es muy posible que los datos con los que se necesite trabajar estén en diferentes objetos RDD, por lo que Spark define dos tipos de opciones de transformación: la transformación estrecha, "narrow transformation", o la transformación ancha, "wide transformation". 

La **"narrow transformation"** se utiliza cuando los datos que se necesitan tratar están en la misma partición del RDD y no es necesario realizar una mezcla de dichos datos para obtenerlos todos. Algunos ejemplos son las funciones **'filter', 'sample', 'map' o 'flatmap'**. 

La **"wide transformation"** se utiliza cuando la lógica de nuestro proceso necesita datos que se encuentran en diferentes particiones de un RDD y es necesario mezclar dichas particiones para agrupar los datos necesarios en un RDD determinado. Ejemplos de "wide transformation" pueden ser el **'groupByKey' o 'reduceByKey'**. 

Planificación PySpark
 1.- Analiza GAD para optimizar transformaciones
 2.- Realiza transformaciones estrechas
 3.- Realiza transformaciones anchas
 4.- Realiza las acciones
 

## 7.2.- Sintaxis en PySpark

##### Nota las transformaciones no se ejecutan hasta que no se especifica una acción

In [1]:
from pyspark import SparkConf, SparkContext
#1.- Generar una variable de configuración
conf = SparkConf().setMaster("local").setAppName("Mi programa")
#2.- Generar un contexto
sc = SparkContext(conf = conf)

21/09/27 19:41:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/27 19:41:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
#Para ver el objeto creado
sc

In [8]:
lines = sc.textFile("ejemplo.txt")
lines

ejemplo.txt MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
lines.count()

71

In [10]:
lines.first()

'Apache Spark'

In [11]:
lines2 = lines.sample(fraction = 0.1, withReplacement = False)

In [12]:
lines2.first()

'Spark fue desarrollado en sus inicios por Matei Zaharia en el AMPLab de la UC Berkeley en 2009. Fue liberado como código abierto en 2010 bajo licencia BSD.'

## 7.3.- Qué son los RDD (Resilient Distributed Databases)

**Ventajas RDD**
 1.- Guardar en disco y recuperarlos siempre que queramos
 2.- No todos los RDD que generamos son guardados en disco
 3.- Evita leer/escribir en disco constantemente
 4.- Realiza particiones automáticamente
 5.- Crea objetos inmutables

**¿Cómo trabajan los RDD?**
 1.- Crea una secuencia determinista de acciones
 2.- Evaluación perezosa: espera a evaluar transformaciones
 3.- Optimización a través del GAD (estrechas - anchas)
 4.- Permite recuperar fácilmente estados intermedios
 5.- Generan el RDD deseado

## 7.4.- Funciones lambda

In [13]:
#función que detecta si encuentra en cada una de las líneas de este archivo de texto la palabra "Python".
resultado = lines.filter(lambda line: "Python" in line)

In [14]:
resultado

PythonRDD[10] at RDD at PythonRDD.scala:53

In [15]:
resultado.count()

1

In [16]:
resultado.take(3)

['Apache Spark se puede considerar un sistema de computación en clúster de propósito general y orientado a la velocidad. Proporciona APIs en Java, Scala, Python y R. También proporciona un motor optimizado que soporta la ejecución de grafos en general. También soporta un conjunto extenso y rico de herramientas de alto nivel entre las que se incluyen Spark SQL (para el procesamiento de datos estructurados basada en SQL), MLlib para implementar machine learning, GraphX para el procesamiento de grafos y Spark Streaming.']

In [17]:
lines.filter(lambda x: any(i.isdigit() for i in x)).take(5)

['Spark fue desarrollado en sus inicios por Matei Zaharia en el AMPLab de la UC Berkeley en 2009. Fue liberado como código abierto en 2010 bajo licencia BSD.',
 'En 2013, el proyecto fue donado a la Apache Software Foundation y se modificó su licencia a Apache 2.0. En febrero de 2014, Spark se convirtió en un Top-Level Apache Project.1\u200b',
 'En noviembre de 2014, la empresa de su fundador, M. Zaharia, Databricks obtuvo un nuevo récord mundial en la ordenación a gran escala usando Spark.2\u200b',
 'Hacia 2015, Spark tenía más de 1000 contribuidores3\u200b convirtiéndose en uno de los proyectos más activos de la Apache Software Foundation4\u200b y en uno de los proyectos de big data open source más activos.',
 'Dada la popularidad de la plataforma hacia el 2014, programas de pago como la General Assembly and free fellowships like The Data Incubator comenzaron a ofrecer cursos de formación personalizados5\u200b']

In [18]:
lines.filter(lambda x: not any(i.isdigit() for i in x)).take(5)

['Apache Spark',
 '',
 'Apache Spark es un framework de computación (entorno de trabajo) en clúster open-source. Fue desarrollada originariamente en la Universidad de California, en el AMPLab de Berkeley. El código base del proyecto Spark fue donado más tarde a la Apache Software Foundation que se encarga de su mantenimiento desde entonces. Spark proporciona una interfaz para la programación de clusters completos con Paralelismo de Datos implícito y tolerancia a fallos.',
 '',
 'Apache Spark se puede considerar un sistema de computación en clúster de propósito general y orientado a la velocidad. Proporciona APIs en Java, Scala, Python y R. También proporciona un motor optimizado que soporta la ejecución de grafos en general. También soporta un conjunto extenso y rico de herramientas de alto nivel entre las que se incluyen Spark SQL (para el procesamiento de datos estructurados basada en SQL), MLlib para implementar machine learning, GraphX para el procesamiento de grafos y Spark Stream

In [20]:
lines.filter(lambda x: any(i.isdigit() for i in x)).count()

35

In [22]:
numeros = lines.filter(lambda x: any(i.isdigit() for i in x))

In [23]:
#para que sea persistente en memoria
numeros.persist()

PythonRDD[18] at RDD at PythonRDD.scala:53

## 7.5.- Dataframes en PySpark

In [24]:
import pandas as pd
df = pd.read_csv("/Users/raqueldiaz/Documents/Linkedin Learning/Python para data science y big data esencial/base_datos_2008.csv", nrows = 100000)

In [25]:
df.head(2)

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2008,1,3,4,2003.0,1955,2211.0,2225,WN,335,...,4.0,8.0,0,,0,,,,,
1,2008,1,3,4,754.0,735,1002.0,1000,WN,3231,...,5.0,10.0,0,,0,,,,,


In [26]:
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/raqueldiaz/Documents/Linkedin Learning/Python para data science y big data esencial/base_datos_2008.csv")

                                                                                

In [27]:
dfspark.show(2)

21/09/27 23:50:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

In [28]:
dfspark.head(2)

[Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime='2003', CRSDepTime=1955, ArrTime='2211', CRSArrTime=2225, UniqueCarrier='WN', FlightNum=335, TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay='-14', DepDelay='8', Origin='IAD', Dest='TPA', Distance=810, TaxiIn='4', TaxiOut='8', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime='754', CRSDepTime=735, ArrTime='1002', CRSArrTime=1000, UniqueCarrier='WN', FlightNum=3231, TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay='2', DepDelay='19', Origin='IAD', Dest='TPA', Distance=810, TaxiIn='5', TaxiOut='10', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')]

In [29]:
dfspark.count()

                                                                                

7009728

In [30]:
dfspark = dfspark.sample(fraction = 0.001, withReplacement = False)

In [31]:
dfspark.count()

                                                                                

6937

In [32]:
dfspark = dfspark.withColumn("ArrDelay", dfspark["ArrDelay"].cast("integer"))

In [33]:
df2 = dfspark.na.drop(subset = ["ArrDelay", "DepDelay", "Distance"])

In [35]:
df2 = df2.filter("ArrDelay is not NULL")

In [36]:
df2.count()

                                                                                

6783

In [37]:
df2.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Ca

In [38]:
import numpy as np
media = np.mean(df2.select("ArrDelay").collect())

                                                                                

In [39]:
media

8.314020344980097

In [40]:
df2.rdd.getNumPartitions()

6

## 7.6.- Transformaciones básicas en PySpark

In [41]:
df2 = df2.dropDuplicates()

In [42]:
df2.select("ArrDelay").filter("ArrDelay > 60").take(5)

                                                                                

[Row(ArrDelay=231),
 Row(ArrDelay=67),
 Row(ArrDelay=68),
 Row(ArrDelay=62),
 Row(ArrDelay=250)]

In [44]:
df2.select("ArrDelay").filter("ArrDelay > 60").take(1)[0]

                                                                                

Row(ArrDelay=231)

In [43]:
df2.filter("ArrDelay > 60").take(5)

                                                                                

[Row(Year=2008, Month=3, DayofMonth=19, DayOfWeek=3, DepTime='2112', CRSDepTime=1730, ArrTime='2233', CRSArrTime=1842, UniqueCarrier='DL', FlightNum=945, TailNum='N128DL', ActualElapsedTime='261', CRSElapsedTime='252', AirTime='223', ArrDelay=231, DepDelay='222', Origin='ATL', Dest='PHX', Distance=1587, TaxiIn='24', TaxiOut='14', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='0', NASDelay='9', SecurityDelay='0', LateAircraftDelay='222'),
 Row(Year=2008, Month=3, DayofMonth=16, DayOfWeek=7, DepTime='1910', CRSDepTime=1745, ArrTime='2035', CRSArrTime=1928, UniqueCarrier='OH', FlightNum=5521, TailNum='N721CA', ActualElapsedTime='85', CRSElapsedTime='103', AirTime='78', ArrDelay=67, DepDelay='85', Origin='RDU', Dest='MCO', Distance=534, TaxiIn='5', TaxiOut='2', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='67', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=7, DayofMonth=25, DayOfWeek=5, DepT

In [47]:
media = np.mean(df2.select("ArrDelay").collect())

                                                                                

In [48]:
df2.select("ArrDelay").rdd.map(lambda x: (x - media)**2).take(10)

21/09/28 00:28:02 ERROR Executor: Exception in task 0.0 in stage 45.0 (TID 716)]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, enco

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 45.0 failed 1 times, most recent failure: Lost task 0.0 in stage 45.0 (TID 716) (air-de-raquel.home executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'numpy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'numpy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [49]:
df2.groupBy("DayOfWeek").count().show()

                                                                                

+---------+-----+
|DayOfWeek|count|
+---------+-----+
|        1|  951|
|        6|  800|
|        3| 1020|
|        5| 1045|
|        4| 1018|
|        7|  963|
|        2|  986|
+---------+-----+



In [51]:
df2.groupBy("DayOfWeek").mean("ArrDelay").show()

                                                                                

+---------+------------------+
|DayOfWeek|     avg(ArrDelay)|
+---------+------------------+
|        1|7.7896950578338595|
|        6|            3.9425|
|        3| 8.428431372549019|
|        5|  9.08421052631579|
|        4| 7.575638506876228|
|        7|11.973001038421598|
|        2| 8.620689655172415|
+---------+------------------+



In [52]:
df2.select("Origin").rdd.distinct().take(5)

                                                                                

[Row(Origin='PIT'),
 Row(Origin='FSM'),
 Row(Origin='SMF'),
 Row(Origin='CWA'),
 Row(Origin='DLH')]

In [53]:
df2.select("Origin").rdd.distinct().count()

                                                                                

255

In [54]:
df2.orderBy(df2.ArrDelay.desc()).take(5)

                                                                                

[Row(Year=2008, Month=1, DayofMonth=27, DayOfWeek=7, DepTime='642', CRSDepTime=1202, ArrTime='832', CRSArrTime=1349, UniqueCarrier='NW', FlightNum=1640, TailNum='N610NW', ActualElapsedTime='110', CRSElapsedTime='107', AirTime='88', ArrDelay=1123, DepDelay='1120', Origin='SAT', Dest='MEM', Distance=625, TaxiIn='6', TaxiOut='16', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='1120', WeatherDelay='0', NASDelay='3', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=1, DayofMonth=8, DayOfWeek=2, DepTime='836', CRSDepTime=1520, ArrTime='1543', CRSArrTime=2206, UniqueCarrier='AA', FlightNum=254, TailNum='N390AA', ActualElapsedTime='307', CRSElapsedTime='286', AirTime='285', ArrDelay=1057, DepDelay='1036', Origin='OGG', Dest='LAX', Distance=2486, TaxiIn='12', TaxiOut='10', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='1036', WeatherDelay='0', NASDelay='21', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=1, DayofMonth=7, DayOfWeek=

## 7.7.- Acciones básicas en PySpark

In [55]:
df2.select("ArrDelay").describe().show()

                                                                                

+-------+-----------------+
|summary|         ArrDelay|
+-------+-----------------+
|  count|             6783|
|   mean|8.314020344980097|
| stddev|40.72088741513768|
|    min|              -65|
|    max|             1123|
+-------+-----------------+



In [56]:
df2.select("Origin").rdd.countByValue()

                                                                                

defaultdict(int,
            {Row(Origin='SLC'): 147,
             Row(Origin='BOS'): 119,
             Row(Origin='MCI'): 65,
             Row(Origin='PHX'): 197,
             Row(Origin='EWR'): 130,
             Row(Origin='ATL'): 424,
             Row(Origin='ORD'): 366,
             Row(Origin='BOI'): 22,
             Row(Origin='CRP'): 9,
             Row(Origin='SFO'): 146,
             Row(Origin='HOU'): 55,
             Row(Origin='OKC'): 22,
             Row(Origin='TPA'): 69,
             Row(Origin='RDU'): 66,
             Row(Origin='MDW'): 81,
             Row(Origin='OGG'): 21,
             Row(Origin='MSP'): 134,
             Row(Origin='DFW'): 251,
             Row(Origin='LGA'): 127,
             Row(Origin='FLL'): 75,
             Row(Origin='SAN'): 84,
             Row(Origin='GRB'): 8,
             Row(Origin='SNA'): 43,
             Row(Origin='CVG'): 77,
             Row(Origin='DSM'): 20,
             Row(Origin='BTV'): 8,
             Row(Origin='LAX'): 208,
   

In [57]:
df2.select("ArrDelay").rdd.max()

                                                                                

Row(ArrDelay=1123)

In [59]:
df2.select("ArrDelay").rdd.max()[0]

                                                                                

1123

In [58]:
df2.select("Origin").rdd.collect()

                                                                                

[Row(Origin='SLC'),
 Row(Origin='BOS'),
 Row(Origin='MCI'),
 Row(Origin='PHX'),
 Row(Origin='EWR'),
 Row(Origin='PHX'),
 Row(Origin='ATL'),
 Row(Origin='ORD'),
 Row(Origin='ATL'),
 Row(Origin='BOI'),
 Row(Origin='ATL'),
 Row(Origin='CRP'),
 Row(Origin='SFO'),
 Row(Origin='HOU'),
 Row(Origin='OKC'),
 Row(Origin='SFO'),
 Row(Origin='TPA'),
 Row(Origin='RDU'),
 Row(Origin='MDW'),
 Row(Origin='OGG'),
 Row(Origin='MSP'),
 Row(Origin='DFW'),
 Row(Origin='SFO'),
 Row(Origin='MSP'),
 Row(Origin='LGA'),
 Row(Origin='MSP'),
 Row(Origin='FLL'),
 Row(Origin='SAN'),
 Row(Origin='GRB'),
 Row(Origin='RDU'),
 Row(Origin='SNA'),
 Row(Origin='PHX'),
 Row(Origin='CVG'),
 Row(Origin='DSM'),
 Row(Origin='BTV'),
 Row(Origin='LAX'),
 Row(Origin='BNA'),
 Row(Origin='MSP'),
 Row(Origin='DTW'),
 Row(Origin='LAS'),
 Row(Origin='STL'),
 Row(Origin='CVG'),
 Row(Origin='XNA'),
 Row(Origin='SLC'),
 Row(Origin='DTW'),
 Row(Origin='DTW'),
 Row(Origin='EWR'),
 Row(Origin='SLC'),
 Row(Origin='FAT'),
 Row(Origin='SEA'),


In [64]:
df2.select("Origin").rdd.count()

                                                                                

6783

In [61]:
df2.select("Origin").rdd.distinct().collect()

                                                                                

[Row(Origin='PIT'),
 Row(Origin='FSM'),
 Row(Origin='SMF'),
 Row(Origin='CWA'),
 Row(Origin='DLH'),
 Row(Origin='SPI'),
 Row(Origin='AUS'),
 Row(Origin='TUL'),
 Row(Origin='YKM'),
 Row(Origin='DAB'),
 Row(Origin='CVG'),
 Row(Origin='FAT'),
 Row(Origin='BFL'),
 Row(Origin='SDF'),
 Row(Origin='DLG'),
 Row(Origin='ROA'),
 Row(Origin='EGE'),
 Row(Origin='JAN'),
 Row(Origin='WRG'),
 Row(Origin='MCN'),
 Row(Origin='PIA'),
 Row(Origin='MLB'),
 Row(Origin='RST'),
 Row(Origin='FAI'),
 Row(Origin='FWA'),
 Row(Origin='PNS'),
 Row(Origin='MTJ'),
 Row(Origin='BGR'),
 Row(Origin='STL'),
 Row(Origin='BRW'),
 Row(Origin='FLL'),
 Row(Origin='COS'),
 Row(Origin='PSC'),
 Row(Origin='ELM'),
 Row(Origin='CMH'),
 Row(Origin='IPL'),
 Row(Origin='DBQ'),
 Row(Origin='SBA'),
 Row(Origin='MDT'),
 Row(Origin='CLL'),
 Row(Origin='MGM'),
 Row(Origin='EUG'),
 Row(Origin='RDM'),
 Row(Origin='JNU'),
 Row(Origin='ABI'),
 Row(Origin='ILM'),
 Row(Origin='LIT'),
 Row(Origin='TYS'),
 Row(Origin='LEX'),
 Row(Origin='OTH'),


In [63]:
df2.select("Origin").rdd.distinct().count()

                                                                                

255

In [65]:
df2.crosstab("DayOfWeek", "Origin").take(2)

                                                                                

[Row(DayOfWeek_Origin='5', ABE=0, ABI=0, ABQ=3, ABY=0, ACK=0, ACT=1, ACV=1, AEX=0, AGS=0, ALB=0, AMA=0, ANC=1, ASE=0, ATL=71, ATW=0, AUS=5, AVL=1, AVP=0, AZO=1, BDL=6, BET=0, BFL=0, BGM=0, BGR=1, BHM=5, BIL=1, BIS=2, BLI=0, BMI=0, BNA=9, BOI=1, BOS=21, BPT=0, BQN=1, BRO=1, BRW=1, BTM=0, BTR=0, BTV=1, BUF=3, BUR=7, BWI=8, BZN=0, CAE=1, CAK=0, CDC=0, CDV=0, CHA=1, CHS=3, CIC=0, CID=2, CLD=1, CLE=10, CLL=0, CLT=18, CMH=5, CMI=0, COD=0, COS=1, CPR=0, CRP=1, CRW=0, CVG=14, CWA=0, DAB=1, DAL=8, DAY=1, DBQ=0, DCA=13, DEN=33, DFW=35, DHN=0, DLG=0, DLH=0, DRO=0, DSM=0, DTW=20, EGE=0, EKO=0, ELM=0, ELP=3, ERI=0, EUG=0, EVV=0, EWN=1, EWR=20, FAI=1, FAR=0, FAT=2, FAY=0, FLG=0, FLL=13, FLO=1, FNT=4, FSD=1, FSM=1, FWA=1, GEG=3, GFK=0, GGG=0, GJT=1, GPT=0, GRB=1, GRK=0, GRR=1, GSO=2, GSP=3, GTF=0, HDN=1, HHH=0, HLN=0, HNL=12, HOU=6, HPN=0, HRL=1, HSV=2, IAD=15, IAH=28, ICT=2, IDA=1, ILM=1, IND=6, IPL=0, ISP=3, ITO=1, JAC=0, JAN=1, JAX=4, JFK=24, JNU=2, KOA=3, KTN=0, LAN=0, LAS=31, LAW=0, LAX=39, LBB=

## 7.8.- Operaciones numéricas con RDD

In [66]:
lista = sc.parallelize(range(1, 1000000))
lista.reduce(lambda x,y: x+y)

499999500000

In [68]:
lista.sum()

499999500000

In [69]:
from pyspark.sql.functions import mean, stddev, col
media = df2.select(mean(col("ArrDelay"))).collect()
std = df2.select(stddev(col("ArrDelay"))).collect()

                                                                                

In [70]:
std

[Row(stddev_samp(ArrDelay)=40.72088741513768)]

In [71]:
std[0][0]

40.72088741513768

In [72]:
df2.withColumn("Diferencia", df2["ArrDelay"] - df2["DepDelay"]).collect()

                                                                                

[Row(Year=2008, Month=1, DayofMonth=27, DayOfWeek=7, DepTime='1022', CRSDepTime=1025, ArrTime='1615', CRSArrTime=1629, UniqueCarrier='DL', FlightNum=692, TailNum='N378DA', ActualElapsedTime='233', CRSElapsedTime='244', AirTime='208', ArrDelay=-14, DepDelay='-3', Origin='SLC', Dest='TPA', Distance=1887, TaxiIn='3', TaxiOut='22', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Diferencia=-11.0),
 Row(Year=2008, Month=1, DayofMonth=10, DayOfWeek=4, DepTime='1724', CRSDepTime=1725, ArrTime='1916', CRSArrTime=1915, UniqueCarrier='CO', FlightNum=1197, TailNum='N59630', ActualElapsedTime='112', CRSElapsedTime='110', AirTime='68', ArrDelay=1, DepDelay='-1', Origin='BOS', Dest='EWR', Distance=200, TaxiIn='8', TaxiOut='36', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Diferencia=2.0),
 Row(Year=2008, M

In [73]:
df2.withColumn("Standard", (df2["ArrDelay"] - media[0][0])/std[0][0]).collect()

                                                                                

[Row(Year=2008, Month=1, DayofMonth=27, DayOfWeek=7, DepTime='1022', CRSDepTime=1025, ArrTime='1615', CRSArrTime=1629, UniqueCarrier='DL', FlightNum=692, TailNum='N378DA', ActualElapsedTime='233', CRSElapsedTime='244', AirTime='208', ArrDelay=-14, DepDelay='-3', Origin='SLC', Dest='TPA', Distance=1887, TaxiIn='3', TaxiOut='22', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Standard=-0.5479748051041989),
 Row(Year=2008, Month=1, DayofMonth=10, DayOfWeek=4, DepTime='1724', CRSDepTime=1725, ArrTime='1916', CRSArrTime=1915, UniqueCarrier='CO', FlightNum=1197, TailNum='N59630', ActualElapsedTime='112', CRSElapsedTime='110', AirTime='68', ArrDelay=1, DepDelay='-1', Origin='BOS', Dest='EWR', Distance=200, TaxiIn='8', TaxiOut='36', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA', Standard=-0.1796134811