In [1]:
import random


In [2]:
random.seed(23)
ids = range(12)
data = [ (id_,random.choice(['wizard','warrior','priest'])) for id_ in ids]
data

[(0, 'warrior'),
 (1, 'wizard'),
 (2, 'wizard'),
 (3, 'priest'),
 (4, 'warrior'),
 (5, 'warrior'),
 (6, 'warrior'),
 (7, 'priest'),
 (8, 'warrior'),
 (9, 'wizard'),
 (10, 'priest'),
 (11, 'wizard')]

Esto sería para crear un RDD

In [3]:
rdd = sc.parallelize(data)

Los RDD son más lentos en Python porque no es tipado. Eso no pasa en escala. Además, nosotros querremos trabajar con _tablas_, no con _listas_ de datos. Un RDD tendría sentido por ejemplo si quiero trabajar con una lista  de logs, por ejemplo, pero para datos con varias propiedades, deberíamos usar dataframes, porque son mcuho más fáciles y naturales de usar. 

### Creación de dataframes

In [4]:
df = spark.createDataFrame(data)

In [5]:
df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)



Aquí los nombres los está poniendo a capón, está infiriendo el esquema

Podemos elegirlo nosotros

In [6]:
df = spark.createDataFrame(data, schema=['id', 'category'])
df

DataFrame[id: bigint, category: string]

Por debajo, los dataframes usan RDDs

In [7]:
df.rdd

MapPartitionsRDD[13] at javaToPython at NativeMethodAccessorImpl.java:0

In [8]:
df.take(5)

[Row(id=0, category='warrior'),
 Row(id=1, category='wizard'),
 Row(id=2, category='wizard'),
 Row(id=3, category='priest'),
 Row(id=4, category='warrior')]

In [9]:
onerow = df.first()
onerow

Row(id=0, category='warrior')

No se recomienda hacer lo siguiente

In [10]:
onerow.category

'warrior'

Recomiendo mejor hacer esto, porque hay nombres complejos. No sé si estoy de acuerdo

In [11]:
onerow['category']

'warrior'

También podemos especificar el esquema con más detalle

Cuando nosotros estamos usando pyspark, en el fondo acabamos llamando a Java (Scala está montada sobre Java). Py4Java permite llamar a código Java usando python y eso es lo que usa pyspark por debajo

In [12]:
from pyspark.sql import types

schema_type = types.StructType([
    types.StructField('id', types.IntegerType(), nullable = False),
    types.StructField('category', types.StringType(), nullable = True),
])

schema_type

StructType(List(StructField(id,IntegerType,false),StructField(category,StringType,true)))

In [13]:
df = spark.createDataFrame(data,schema=schema_type)

In [14]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- category: string (nullable = true)



#### Leyendo desde csv 

In [15]:
spark.read.csv('data/coupon150720.csv',inferSchema=True)

DataFrame[_c0: bigint, _c1: int, _c2: string, _c3: string, _c4: string, _c5: string, _c6: double, _c7: string, _c8: int, _c9: string, _c10: string, _c11: string, _c12: int, _c13: string, _c14: string]

Otra forma

In [16]:
spark.sql('SELECT * FROM csv.`data/coupon150720.csv`')

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string]

Esto es util para gente que no tiene mucha idea de programar pero si de SQL o para directamente filtrar datos que no me interesan ya durantre el proceso de lectura

In [17]:
!head -n 1 data/coupon150720.csv

79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0


In [18]:
spark.sql('''SELECT  
            _c0 AS tft_number,
            _c1 AS coupon_number,
            _c2 AS origin,
            _c3 AS amount
            FROM csv.`data/coupon150720.csv`''')

DataFrame[tft_number: string, coupon_number: string, origin: string, amount: string]

### Operaciones sobre dataframes 

In [19]:
df.show()

+---+--------+
| id|category|
+---+--------+
|  0| warrior|
|  1|  wizard|
|  2|  wizard|
|  3|  priest|
|  4| warrior|
|  5| warrior|
|  6| warrior|
|  7|  priest|
|  8| warrior|
|  9|  wizard|
| 10|  priest|
| 11|  wizard|
+---+--------+



In [20]:
df.select('category').show()

+--------+
|category|
+--------+
| warrior|
|  wizard|
|  wizard|
|  priest|
| warrior|
| warrior|
| warrior|
|  priest|
| warrior|
|  wizard|
|  priest|
|  wizard|
+--------+



In [21]:
df.select('category')

DataFrame[category: string]

In [22]:
df['category']

Column<b'category'>

Estas dos operaciones no son equivalente. De hecho, no puedo hacer por ejemplo el .show(). Es una referencia a la columna

In [23]:
# df['category'].show() # cascaría

Para filtrado

In [24]:
df.filter(df['id'] > 5).show()

+---+--------+
| id|category|
+---+--------+
|  6| warrior|
|  7|  priest|
|  8| warrior|
|  9|  wizard|
| 10|  priest|
| 11|  wizard|
+---+--------+



`Where` hace lo mismo, pero el nombre es más _funcional_. De hecho, aquí me dice que lo encapsula

In [25]:
df.where

<bound method filter of DataFrame[id: int, category: string]>

**Ejercicio: extraer ids que correspondan a priest**

In [26]:
df \
.where(df['category'] == 'priest') \
.select('id') \
.show()

+---+
| id|
+---+
|  3|
|  7|
| 10|
+---+



Otra opción para usar select es usar el doble corchete. Recuerda que el corchete simple es solo para meterlo en filtros

In [27]:
df \
.where(df['category'] == 'priest') \
[['id']] \
.show()

+---+
| id|
+---+
|  3|
|  7|
| 10|
+---+



Los dataframe son inmutables. Por tanto, no puedo hacer cosas como esto:

In [28]:
# df['new_column'] = df['id']  * 100 (casca)

Lo correcto sería:

In [29]:
df.select('id', 'category', df['id'] * 100)

DataFrame[id: int, category: string, (id * 100): int]

Otra opción:

In [30]:
df.withColumn('new_column',df['id'] * 100).show()

+---+--------+----------+
| id|category|new_column|
+---+--------+----------+
|  0| warrior|         0|
|  1|  wizard|       100|
|  2|  wizard|       200|
|  3|  priest|       300|
|  4| warrior|       400|
|  5| warrior|       500|
|  6| warrior|       600|
|  7|  priest|       700|
|  8| warrior|       800|
|  9|  wizard|       900|
| 10|  priest|      1000|
| 11|  wizard|      1100|
+---+--------+----------+



__NOTA__: Ojo, todo esto no está ocupando memoria. Por ejemplo, cuando he creado la dataframe a partir del csv, lo que se almacena es que estamos haciendo referencia a un  path de csv. Es a la hora de hacer un take o similar cuando realmente lee de ese fichero. 

### User defined functions

In [31]:
from pyspark.sql import functions

Para hacer transformaciones en columnas. 

Log hace el logaritmo

In [32]:
df[[functions.log('id')]].show()

+------------------+
|           LOG(id)|
+------------------+
|              null|
|               0.0|
|0.6931471805599453|
|1.0986122886681098|
|1.3862943611198906|
|1.6094379124341003|
| 1.791759469228055|
|1.9459101490553132|
|2.0794415416798357|
|2.1972245773362196|
| 2.302585092994046|
|2.3978952727983707|
+------------------+



Otra opción:

In [33]:
df.select(functions.log('id')).show()

+------------------+
|           LOG(id)|
+------------------+
|              null|
|               0.0|
|0.6931471805599453|
|1.0986122886681098|
|1.3862943611198906|
|1.6094379124341003|
| 1.791759469228055|
|1.9459101490553132|
|2.0794415416798357|
|2.1972245773362196|
| 2.302585092994046|
|2.3978952727983707|
+------------------+



Pero quiero hacerme `mi propia función`

In [34]:
mi_func = lambda x :  x * x
mi_udf = functions.udf(mi_func)
otro_df = df.select('id', mi_udf('id'))
otro_df.show()

+---+------------+
| id|<lambda>(id)|
+---+------------+
|  0|           0|
|  1|           1|
|  2|           4|
|  3|           9|
|  4|          16|
|  5|          25|
|  6|          36|
|  7|          49|
|  8|          64|
|  9|          81|
| 10|         100|
| 11|         121|
+---+------------+



In [35]:
otro_df

DataFrame[id: int, <lambda>(id): string]

Lo jodido es que me pone la columna de tipo string el muy c... Esto es porque spark lo intenta inferir, perop python no tiene tipado estático. Spark tira por string 

```Esto con Scala no pasaba```

No es tan dramático, lo puedo solucionar fácil. 

In [36]:
mi_func = lambda x :  x * x
mi_udf = functions.udf(mi_func, returnType=types.IntegerType())
df.select('id', mi_udf('id'))

DataFrame[id: int, <lambda>(id): int]

Crear una columna puntos de vida

In [37]:
def get_puntos_de_vida(category):
    random_deviation = round(random.random() * 5)
    
    if(category == 'priest'): return 10 + random_deviation
    if(category == 'warrior'): return 50 + random_deviation
    
    return 30 + random_deviation

df.select('id', 'category', functions.udf(get_puntos_de_vida)('category')).show()

+---+--------+----------------------------+
| id|category|get_puntos_de_vida(category)|
+---+--------+----------------------------+
|  0| warrior|                          50|
|  1|  wizard|                          32|
|  2|  wizard|                          33|
|  3|  priest|                          14|
|  4| warrior|                          54|
|  5| warrior|                          50|
|  6| warrior|                          54|
|  7|  priest|                          14|
|  8| warrior|                          54|
|  9|  wizard|                          30|
| 10|  priest|                          12|
| 11|  wizard|                          31|
+---+--------+----------------------------+



Más elegante:

In [38]:
hp = functions.udf(lambda category: {'priest': 10, 'warrior': 50, 'wizard':30}[category])

df.select('id', 'category', hp('category').cast(types.IntegerType()).alias('hp')).show()

+---+--------+---+
| id|category| hp|
+---+--------+---+
|  0| warrior| 50|
|  1|  wizard| 30|
|  2|  wizard| 30|
|  3|  priest| 10|
|  4| warrior| 50|
|  5| warrior| 50|
|  6| warrior| 50|
|  7|  priest| 10|
|  8| warrior| 50|
|  9|  wizard| 30|
| 10|  priest| 10|
| 11|  wizard| 30|
+---+--------+---+



O incluso más bonito:

In [39]:
df2 = df.withColumn('puntos de vida', hp('category').cast(types.IntegerType()))
df2.show()

+---+--------+--------------+
| id|category|puntos de vida|
+---+--------+--------------+
|  0| warrior|            50|
|  1|  wizard|            30|
|  2|  wizard|            30|
|  3|  priest|            10|
|  4| warrior|            50|
|  5| warrior|            50|
|  6| warrior|            50|
|  7|  priest|            10|
|  8| warrior|            50|
|  9|  wizard|            30|
| 10|  priest|            10|
| 11|  wizard|            30|
+---+--------+--------------+



### Estadísticas de summary

In [40]:
df2.stat

<pyspark.sql.dataframe.DataFrameStatFunctions at 0x1164ec590>

__Concepto__: Bloom filter: algoritmo que te permite calcular al vuelo (sin guardarte en memoria nada) la pertenecia a conjuntos de forma aproximada.  Da falsos positivos pero no da falsos negativos. 

Correlación:

In [41]:
df2.stat.corr('id', 'puntos de vida')

-0.24161209862606872

In [42]:
df2.cov('id','puntos de vida')

-14.545454545454545

In [43]:
location_udf = functions.udf(lambda: random.choice(['gondor','mordor']))

df3 = df2.withColumn('land', location_udf())
df3.show()

+---+--------+--------------+------+
| id|category|puntos de vida|  land|
+---+--------+--------------+------+
|  0| warrior|            50|mordor|
|  1|  wizard|            30|gondor|
|  2|  wizard|            30|gondor|
|  3|  priest|            10|mordor|
|  4| warrior|            50|mordor|
|  5| warrior|            50|gondor|
|  6| warrior|            50|mordor|
|  7|  priest|            10|mordor|
|  8| warrior|            50|gondor|
|  9|  wizard|            30|mordor|
| 10|  priest|            10|mordor|
| 11|  wizard|            30|gondor|
+---+--------+--------------+------+



`crosstab` genera la tabla de contigencia para dos columnas, que no es más que lo siguiente:

In [44]:
df3.crosstab('category', 'land').show()

+-------------+------+------+
|category_land|gondor|mordor|
+-------------+------+------+
|       priest|     2|     1|
|      warrior|     1|     4|
|       wizard|     3|     1|
+-------------+------+------+



Para abrir consola de spark

In [45]:
spark.sparkContext.uiWebUrl

'http://juans-mbp:4040'

### Agrupaciones

In [46]:
df3.show()

+---+--------+--------------+------+
| id|category|puntos de vida|  land|
+---+--------+--------------+------+
|  0| warrior|            50|gondor|
|  1|  wizard|            30|gondor|
|  2|  wizard|            30|mordor|
|  3|  priest|            10|gondor|
|  4| warrior|            50|gondor|
|  5| warrior|            50|mordor|
|  6| warrior|            50|mordor|
|  7|  priest|            10|mordor|
|  8| warrior|            50|gondor|
|  9|  wizard|            30|gondor|
| 10|  priest|            10|mordor|
| 11|  wizard|            30|gondor|
+---+--------+--------------+------+



In [47]:
groups = df3.groupby('category')

In [48]:
groups.max('id').show()

+--------+-------+
|category|max(id)|
+--------+-------+
|  priest|     10|
| warrior|      8|
|  wizard|     11|
+--------+-------+



Para hacer varias agregaciones:

In [49]:
groups.agg({'id': 'max', 'puntos de vida': 'mean'}).show()

+--------+-------------------+-------+
|category|avg(puntos de vida)|max(id)|
+--------+-------------------+-------+
|  priest|               10.0|     10|
| warrior|               50.0|      8|
|  wizard|               30.0|     11|
+--------+-------------------+-------+



Más flexible y bonita:

In [50]:
groups.agg(functions.max('id'), functions.mean('puntos de vida'), functions.mean('id')).show()

+--------+-------+-------------------+-----------------+
|category|max(id)|avg(puntos de vida)|          avg(id)|
+--------+-------+-------------------+-----------------+
|  priest|     10|               10.0|6.666666666666667|
| warrior|      8|               50.0|              4.6|
|  wizard|     11|               30.0|             5.75|
+--------+-------+-------------------+-----------------+



También puedo agrupar usando expresiones

In [51]:
df3.groupBy(df['id'] > 5).mean('puntos de vida').show()

+--------+-------------------+
|(id > 5)|avg(puntos de vida)|
+--------+-------------------+
|    true|               30.0|
|   false| 36.666666666666664|
+--------+-------------------+



### Intersections 

In [52]:
from random import seed
from random import choices
seed(42)

data = list(zip(
    choices(range(30),k=10),
    choices(['gondor', 'mordor'], k=10),
    choices(range(10000,50000, 1000), k=10)
))

data

[(19, 'gondor', 42000),
 (0, 'mordor', 37000),
 (8, 'gondor', 23000),
 (6, 'gondor', 16000),
 (22, 'mordor', 48000),
 (20, 'mordor', 23000),
 (26, 'gondor', 13000),
 (2, 'mordor', 13000),
 (12, 'mordor', 43000),
 (0, 'gondor', 34000)]

In [53]:
right = spark.createDataFrame(data, schema=['id', 'land', 'hp_bonus'])
right.show()

+---+------+--------+
| id|  land|hp_bonus|
+---+------+--------+
| 19|gondor|   42000|
|  0|mordor|   37000|
|  8|gondor|   23000|
|  6|gondor|   16000|
| 22|mordor|   48000|
| 20|mordor|   23000|
| 26|gondor|   13000|
|  2|mordor|   13000|
| 12|mordor|   43000|
|  0|gondor|   34000|
+---+------+--------+



Esto siguiente casca. Primero porque va a usar dos columnas, y segundo porque tenemos ids repetidos, detecta un prpducto cartesiano y casca porque en big data un producto cartesiano es una big shit. 

In [54]:
# df3.join(right).show() 


In [55]:
joined = df3.join(right, on='id', how='outer')
joined.show()

+---+--------+--------------+------+------+--------+
| id|category|puntos de vida|  land|  land|hp_bonus|
+---+--------+--------------+------+------+--------+
| 26|    null|          null|  null|gondor|   13000|
| 19|    null|          null|  null|gondor|   42000|
|  0| warrior|            50|mordor|mordor|   37000|
|  0| warrior|            50|mordor|gondor|   34000|
| 22|    null|          null|  null|mordor|   48000|
|  7|  priest|            10|gondor|  null|    null|
|  6| warrior|            50|mordor|gondor|   16000|
|  9|  wizard|            30|mordor|  null|    null|
|  5| warrior|            50|mordor|  null|    null|
|  1|  wizard|            30|gondor|  null|    null|
| 10|  priest|            10|gondor|  null|    null|
|  3|  priest|            10|mordor|  null|    null|
| 12|    null|          null|  null|mordor|   43000|
|  8| warrior|            50|mordor|gondor|   23000|
| 11|  wizard|            30|gondor|  null|    null|
|  2|  wizard|            30|mordor|mordor|   

Esto nos da dos columnas diferentes para land, una de una tabla y otra de la otra

Para especificar a cual me refiero, tengo que especificar el dataframe del que quiero sacar la columna land

In [56]:
joined.select(df3['land']).show()

+------+
|  land|
+------+
|  null|
|  null|
|gondor|
|gondor|
|  null|
|gondor|
|gondor|
|gondor|
|gondor|
|gondor|
|mordor|
|gondor|
|  null|
|mordor|
|gondor|
|mordor|
|gondor|
|  null|
+------+



In [57]:
joined.select(right['land']).show()

+------+
|  land|
+------+
|gondor|
|gondor|
|mordor|
|gondor|
|mordor|
|  null|
|gondor|
|  null|
|  null|
|  null|
|  null|
|  null|
|mordor|
|gondor|
|  null|
|mordor|
|  null|
|mordor|
+------+



### Cacheo `

Para cachear por ejemplo este cache, usariamos:

In [58]:
joined.cache()

DataFrame[id: bigint, category: string, puntos de vida: int, land: string, land: string, hp_bonus: bigint]

Pero esto, si nos vamos a la seccion vemos que no nos aparece en el storage

###  Ejercicio

Calcular la z score de puntos de vida de cada jugador para su tierra

In [59]:
df3.show()

+---+--------+--------------+------+
| id|category|puntos de vida|  land|
+---+--------+--------------+------+
|  0| warrior|            50|mordor|
|  1|  wizard|            30|mordor|
|  2|  wizard|            30|mordor|
|  3|  priest|            10|mordor|
|  4| warrior|            50|mordor|
|  5| warrior|            50|mordor|
|  6| warrior|            50|gondor|
|  7|  priest|            10|gondor|
|  8| warrior|            50|mordor|
|  9|  wizard|            30|gondor|
| 10|  priest|            10|gondor|
| 11|  wizard|            30|gondor|
+---+--------+--------------+------+



In [60]:
medias = df3.groupby('land') \
.agg(functions.avg('puntos de vida').alias('avg'), functions.stddev('puntos de vida').alias('stddev')) 
medias.show()


+------+------------------+------------------+
|  land|               avg|            stddev|
+------+------------------+------------------+
|gondor|              42.0|10.954451150103322|
|mordor|27.142857142857142|17.994708216848746|
+------+------------------+------------------+



In [61]:
annotated = df3.join(medias,on='land')
annotated.cache() # si no cacheo, cambian los valores, porque esto es en streaming y tengo randoms por ahi. 
annotated.show()

+------+---+--------+--------------+------------------+------------------+
|  land| id|category|puntos de vida|               avg|            stddev|
+------+---+--------+--------------+------------------+------------------+
|gondor|  2|  wizard|            30|41.111111111111114|10.540925533894598|
|gondor|  4| warrior|            50|41.111111111111114|10.540925533894598|
|gondor|  6| warrior|            50|41.111111111111114|10.540925533894598|
|gondor|  8| warrior|            50|41.111111111111114|10.540925533894598|
|gondor| 11|  wizard|            30|41.111111111111114|10.540925533894598|
|mordor|  0| warrior|            50|              10.0|               0.0|
|mordor|  1|  wizard|            30|              10.0|               0.0|
|mordor|  3|  priest|            10|              10.0|               0.0|
|mordor|  5| warrior|            50|              10.0|               0.0|
|mordor|  7|  priest|            10|              10.0|               0.0|
|mordor|  9|  wizard|    

Ahora calculamos ya los z . Los podemos usar con aritmetica de columnas o con lambda. Lo primero es más eficiente

In [62]:
result = annotated.withColumn('z-score', (annotated['puntos de vida'] - annotated['avg']) / annotated['stddev'] )
result.show()

+------+---+--------+--------------+------------------+------------------+------------------+
|  land| id|category|puntos de vida|               avg|            stddev|           z-score|
+------+---+--------+--------------+------------------+------------------+------------------+
|gondor|  2|  wizard|            30|41.111111111111114|10.540925533894598| -1.05409255338946|
|gondor|  4| warrior|            50|41.111111111111114|10.540925533894598|0.8432740427115676|
|gondor|  6| warrior|            50|41.111111111111114|10.540925533894598|0.8432740427115676|
|gondor|  8| warrior|            50|41.111111111111114|10.540925533894598|0.8432740427115676|
|gondor| 11|  wizard|            30|41.111111111111114|10.540925533894598| -1.05409255338946|
|mordor|  0| warrior|            50|              10.0|               0.0|              null|
|mordor|  1|  wizard|            30|              10.0|               0.0|              null|
|mordor|  3|  priest|            10|              10.0|     

Otra forma (recordemos, algo más lenta)

In [63]:
zscore = functions.udf(lambda xi, average, std: (xi - average) / std)

In [64]:
result2 = annotated.withColumn('z-score', zscore('puntos de vida','avg', 'stddev'))
result2.show()

Py4JJavaError: An error occurred while calling o381.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 208.0 failed 1 times, most recent failure: Lost task 44.0 in stage 208.0 (TID 2780, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-63-55ec9c8cbe08>", line 1, in <lambda>
ZeroDivisionError: float division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-63-55ec9c8cbe08>", line 1, in <lambda>
ZeroDivisionError: float division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Para los null values, ver la solucion. Nada del otro mundo.

Tenemos dropna que borra todos los que tenga alguna columna vacia. Es configurable, por ejemplo, puedo decir que sólo borre las que tenga todas nulas o le puedo poner un threshold para decir que si me paso de un número de columnas nulas, lo quite, si tiene menos, no. También puedo decir: descártame los que tengan nulos en el campo 'land''


### SQL para consultar contra dataframes 

In [None]:
# spark.sql('SELECT * FROM df3 where land = "mordor"') # esto falla, porque no encuentra una tabla df3

In [None]:
df3.registerTempTable('tocoto')
spark.sql('SELECT * FROM tocoto where land = "mordor"').show()

### Interoperacion con Pandas 

In [None]:
pandas = annotated.toPandas()
pandas

__ojo__: esto lo trae a la memoría del nodo driver (del master)

En el otro sentido

In [None]:
spark.createDataFrame(pandas)

### Cómo guardar 

In [None]:
annotated.write.csv('out.csv')

Esto lo guarda en local cuando estoy ejecutando en local. ADemas, lo guarda en un directorio llamadao out.csv con un monton de ficheritos. Cada fichero representa a una partition

### 1.6.0.1  Exercise

Repeat the exercise from the previous notebook, but this time with DataFrames.

Get stats for all tickets with destination MAD from coupons150720.csv.

You will need to extract ticket amounts with destination MAD, and then calculate:

Total ticket amounts per origin
Top 10 airlines by average amount

In [90]:
coupons = spark.sql('''SELECT  
            _c0 AS tft_number,
            _c1 AS coupon_number,
            _c2 AS origin,
            _c3 AS destination,
            _c4 AS carrier,
            CAST(_c6 AS double) AS amount
            FROM csv.`data/coupon150720.csv`''')

coupons.take(5)

[Row(tft_number='79062005698500', coupon_number='1', origin='MAA', destination='AUH', carrier='9W', amount=56.79),
 Row(tft_number='79062005698500', coupon_number='2', origin='AUH', destination='CDG', carrier='9W', amount=84.34),
 Row(tft_number='79062005924069', coupon_number='1', origin='CJB', destination='MAA', carrier='9W', amount=60.0),
 Row(tft_number='79065668570385', coupon_number='1', origin='DEL', destination='DXB', carrier='9W', amount=160.63),
 Row(tft_number='79065668737021', coupon_number='1', origin='AUH', destination='IXE', carrier='9W', amount=152.46)]

Total ticket amounts per origin

In [108]:
(
    coupons 
    .filter(coupons['destination'] == 'MAD') 
    .groupby('origin') 
    .sum('amount') 
    .show()
)

+------+------------------+
|origin|       sum(amount)|
+------+------------------+
|   PMI|40547.170000000035|
|   YUL|            284.44|
|   HEL|           8195.76|
|   SXB|            264.46|
|   UIO|            8547.6|
|   XRY| 9250.230000000001|
|   OLB|            1801.5|
|   CCS|          94528.68|
|   VRN|1020.5399999999998|
|   SPC| 7542.700000000002|
|   AUH|           4393.42|
|   JED|19116.159999999996|
|   CMN|           7494.28|
|   FRA| 38863.54000000001|
|   ALG|14558.570000000002|
|   IST|13363.370000000003|
|   SDR| 4219.360000000001|
|   TXL|15399.790000000003|
|   GRU| 87192.64000000001|
|   VIE|11666.559999999998|
+------+------------------+
only showing top 20 rows



Total 10 airlines by average amount

In [107]:
(
    coupons 
    .filter(coupons['destination'] == 'MAD') 
    .groupby('carrier') 
    .avg('amount') 
    .withColumnRenamed('avg(amount)','avg_amount') 
    .orderBy('avg_amount', ascending=False) 
    .take(10)
)

[Row(carrier='V0', avg_amount=5418.098666666667),
 Row(carrier='AC', avg_amount=740.6200000000001),
 Row(carrier='KE', avg_amount=688.5261538461539),
 Row(carrier='SV', avg_amount=553.1742553191489),
 Row(carrier='OB', avg_amount=535.5044444444444),
 Row(carrier='AR', avg_amount=513.5304761904762),
 Row(carrier='AV', avg_amount=450.19509554140177),
 Row(carrier='AM', avg_amount=440.73421052631585),
 Row(carrier='C2', avg_amount=397.87),
 Row(carrier='LA', avg_amount=379.9537078651686)]

Si los datos del csv, por ejemplo, ya filtrados por MAdrid, 0s voy a usar mucho, podríamos cachear los datos. Esto lo cachea en la memoria del cluster. 

Además, podríamos registralo como una tabla para poder usarlo desde `el mundo SQL`. No tendría por qué está cacheado. Si lo está, es como si estuvieramos creando una tabla y consultando de ella y si no, sería como una sub-query