# 4. Data Frames y SQL.

### Lección 13: Creación de DataFrames.

Primero debemos importar las librerias necesarias para la creación de las estructuras (DataFrames)

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row

In [2]:
from pyspark.sql import SQLContext

Iniciamos dos contextos de spark, SparkContex, que requiere dos parametros (master es la dirección ip de la máquina donde se ejecuta la aplicación, appName es el nobre de la aplicación)

In [3]:
spark = SparkContext(master="local", appName="DataFrames")
sqlContext = SQLContext(spark)

Podemos crear DataFrames de distintas maneras, a continuación crearemos un DF a partir de un archivo csv ya existente.

**Nota:** Es altamente recomendable que en la medida de lo posible, **todos** los DF se creen directamente desde los datos y no desde RDDs.

Las siguientes celdas son para deducir la ruta del archivo que se convertirá en un DataFrame

In [4]:
!ls

codeExample.py
data.csv
files
Images
Leccion_6_Jupyter_vs_CLI.ipynb
modelo_relacional_4ce1ab04-d36f-4c9e-afd0-03e4a7d874b9.jpeg
Seccion_3_Operaciones_sobre_RDD.ipynb
Seccion_4_Data_Frames_y_SQL.ipynb
Seccion_5_Persistencia_y_particionado.ipynb
venv


In [5]:
!ls ./files/
!head -n 5 ./files/juegos.csv

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv
,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [6]:
!pwd

/home/cleto/Desktop/Intro_to_Spark


In [7]:
path = './files/'

La siguiente celda muestra la definición del esquema del dataframe (juegoSchema), muy similar a la definición de una tabla de SQL

In [8]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(),False),
    StructField("year", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

Cada campo requiere de tres parámetros; el nombre del campo, el tipo de dato en el campo y si es posible que sea nulo o no (True si puede ser nulo, False si no puede ser nulo)

A continuación, se asigna a la variable juegoDF el contexto sql, que posee una definición llamada juegoSchema, con la opción de que la primera fila sea considerada como el encabezado de la tabla. Finalmente, toda esa estructura funciona como contenedor del contenido del archivo tipo csv que habita en una ruta predeterminada y un nombre definido.

In [9]:
juegoDF = sqlContext.read.schema(juegoSchema)\
    .option("header","true").csv(path + "juegos.csv")

El metodo ```show(4)``` muestra las primeras 4 filas de la estructura. 

In [10]:
#juegoDF.show(4)

In [11]:
spark

Nota: la dirección de la Interfaz del usuario de la aplicación, para este caso, es la siguiente: http://192.168.1.69:4040/jobs/job/?id=0, donde la ip del master es la local (192.168.1.69). La aplicación de Spark siempre se abre en el puerto 4040, si y solo si las sesiones previas de Spark ya fueron cerradas previamente, de lo contrario se utilizara otro puerto.

### Fin de la lección 13

### Lección 14: Inferencia de tipos de datos.

En esta lección vamos a crear un Dataframe a partir de un RDD existente, para esto vamos a recrear un RDD (ver la lección 9)

**Nota** : Es recomendable que el origen de los DF sean los archivos y no RDDs.

In [12]:
deportistaOlimpicoRDD = spark.textFile(path + "deportista.csv")\
    .map(lambda l : l.split(","))
deportistaOlimpicoRDD2 = spark.textFile(path + "deportista2.csv")\
    .map(lambda l : l.split(","))

In [13]:
DF_From_RDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)
DF_From_RDD.count()

135572

Primero vamos a eliminar el encabezado del RDD con una función especializada:

In [14]:
def eliminaEncabezado(indice, interador):
    return iter(list(interador)[1:])

In [15]:
DF_From_RDD = DF_From_RDD.mapPartitionsWithIndex(eliminaEncabezado)

In [16]:
DF_From_RDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

Procederemos a convertir los tipos de cada campo del RDD

In [17]:
DF_From_RDD = DF_From_RDD.map(lambda l :(
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

Luego crearemos el esquema, que es la definición del DataFrame

In [18]:
schemaDeportistaOlimpico = StructType([
    StructField("deportista_id", IntegerType(),False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False),
])

Por último, pasaremos los argumentos necesarios para crear el DF a partir del RDD

In [19]:
deportistaOlimpicoDF_demo = sqlContext\
                        .createDataFrame(DF_From_RDD, schemaDeportistaOlimpico)

In [20]:
deportistaOlimpicoDF_demo.count()

135570

Recuerda guardar las estructuras omitiendo los metodos de visualización, como ```show``` y ```take```

Ahora debemos transformar los RDDs pasados en DF (incluyendo Deportistas olimpicos desde archivo)

In [21]:
deportistaOlimpicoDF_pt1 = sqlContext.read.schema(schemaDeportistaOlimpico)\
    .option("header","true").csv(path + "deportista.csv")

In [22]:
deportistaOlimpicoDF_pt2 = sqlContext.read.schema(schemaDeportistaOlimpico)\
    .option("header","true").csv(path + "deportista2.csv")

In [23]:
deportistaOlimpicoDF=deportistaOlimpicoDF_pt1.union(deportistaOlimpicoDF_pt2)
deportistaOlimpicoDF.count()

135570

In [24]:
schemaequiposOlimpicos = StructType([
    StructField("equipo_id", IntegerType(),False),
    StructField("nombre_equipo", StringType(), False),
    StructField("pais", StringType(), False)
])

In [25]:
paisesDF = sqlContext.read.schema(schemaequiposOlimpicos)\
    .option("header","true").csv(path + "paises.csv")

In [26]:
schemaResultado = StructType([
    StructField("resultado_id", IntegerType(),False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])

In [27]:
resultadoDF = sqlContext.read.schema(schemaResultado)\
    .option("header","true").csv(path + "resultados.csv")

### Fin de la lección 14

### Lección 15: Operaciones sobre DF.

Faltaron cargar algunos DF, cargaremos estas estructuras desde los archivos. 

In [28]:
#!ls ./files/
!head -n 5 ./files/deporte.csv

deporte_id,deporte
1,Basketball
2,Judo
3,Football
4,Tug-Of-War


In [29]:
deporteSchema = StructType([
    StructField("deporte_id", IntegerType(),False),
    StructField("deporte", StringType(), False)
])

In [30]:
deportesDF = sqlContext.read.schema(deporteSchema)\
    .option("header","true").csv(path + "deporte.csv")

In [31]:
deportesDF.show(4)

+----------+----------+
|deporte_id|   deporte|
+----------+----------+
|         1|Basketball|
|         2|      Judo|
|         3|  Football|
|         4|Tug-Of-War|
+----------+----------+
only showing top 4 rows



In [32]:
#!ls ./files/
!head -n 5 ./files/evento.csv

evento_id,evento,deporte_id
1,Basketball Men's Basketball,1
2,Judo Men's Extra-Lightweight,2
3,Football Men's Football,3
4,Tug-Of-War Men's Tug-Of-War,4


In [33]:
eventoSchema = StructType([
    StructField("evento_id", IntegerType(),False),
    StructField("evento", StringType(), False),
    StructField("deporte_id", IntegerType(), False)
])

In [34]:
deportesOlimpicosDF = sqlContext.read.schema(eventoSchema)\
    .option("header","true").csv(path + "evento.csv")

In [35]:
deportesOlimpicosDF.show(4)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
+---------+--------------------+----------+
only showing top 4 rows



Para mostrar el esquema de un DF, necesitamos hacer uso del método ```printSchema```

In [36]:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [37]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



Para renmbrar columnas, necesitamos seguir los siguientes pasos

In [38]:
deportistaOlimpicoDF = deportistaOlimpicoDF.withColumnRenamed("genero","sexo").drop("altura")

Con el método ```drop``` eliminas un campo, y cone método ```withColumnRenamed``` cambias el encabezado de alguna de las columnas.

### Recuerda:
Todos los elementos de la estructura (DF) son inmutables, por lo que estas acciones deben guardarse en otra variable, o en la misma variable si se desean efectuar los cambios.

In [39]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



Ahora vamos a importar un método desde pyspark.sql llamado col, que nos permitirá, entre otras cosas, renombrar  una columna (Como en el ejemplo anterior)

In [40]:
from pyspark.sql.functions import *

In [41]:
deportistaOlimpicoDF=deportistaOlimpicoDF.select("deportista_id","nombre",
                            col("edad").alias("edad_al_jugar"),"equipo_id")

In [42]:
deportistaOlimpicoDF.show(5)

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|            1|           A Dijiang|           24|      199|
|            2|            A Lamusi|           23|      199|
|            3| Gunnar Nielsen Aaby|           24|      273|
|            4|Edgar Lindenau Aabye|           34|      278|
|            5|Christine Jacoba ...|           21|      705|
+-------------+--------------------+-------------+---------+
only showing top 5 rows



In [43]:
deportistaOlimpicoDF.sort("edad_al_jugar").show()

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|          224|     Mohamed AbdelEl|            0|      308|
|          487|      Inni Aboubacar|            0|      721|
|          226|Sanad Bushara Abd...|            0|     1003|
|           58|    Georgi Abadzhiev|            0|      154|
|          230|    Moustafa Abdelal|            0|      308|
|          102|   Sayed Fahmy Abaza|            0|      308|
|          260|  Ahmed Abdo Mustafa|            0|     1003|
|          139|George Ioannis Abbot|            0|     1043|
|          281|      S. Abdul Hamid|            0|      487|
|          163|     Ismail Abdallah|            0|     1095|
|          285|Talal Hassoun Abd...|            0|      497|
|          173| Mohamed Abdel Fatah|            0|     1003|
|          179|Ibrahim Saad Abde...|            0|     1003|
|          378|     Ange

En la tabla anterior podemos observar que hay jugadores con edad 0. Esto es absurdo, ya que la edad 0 es un método para llenar valores no disponibles. 

A continuación vamos a limpiar nuesto DF con un filtro.

In [44]:
deportistaOlimpicoDF= deportistaOlimpicoDF.filter((deportistaOlimpicoDF.edad_al_jugar != 0))

In [45]:
deportistaOlimpicoDF.sort("edad_al_jugar").show()

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|        71691|  Dimitrios Loundras|           10|      333|
|        70616|          Liu Luyang|           11|      199|
|       118925|Megan Olwen Deven...|           11|      413|
|        52070|        Etsuko Inada|           11|      514|
|        22411|Magdalena Cecilia...|           11|      413|
|        40129|    Luigina Giavotti|           11|      507|
|        47618|Sonja Henie Toppi...|           11|      742|
|        76675|   Marcelle Matthews|           11|      967|
|        37333|Carlos Bienvenido...|           11|      982|
|        51268|      Beatrice Hutiu|           11|      861|
|       126307|        Liana Vicens|           11|      825|
|        48939|             Ho Gang|           12|      738|
|        49142|        Jan Hoffmann|           12|      302|
|        42835|   Werner

### Fin de la lección 15

### Lección 16: Agrupaciones y operaciones join sobre DF

Ahora estamos validando las definiciones de algunos DF

In [46]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad_al_jugar: integer (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [47]:
resultadoDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [48]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [49]:
#juegoDF=juegoDF.select("juego_id",
#                            col("year").alias("anio"),"temporada","ciudad")

In [50]:
#juegoDF.printSchema()

In [51]:
deportesOlimpicosDF=deportesOlimpicosDF.select("evento_id",
                            col("evento").alias("Nombre_de_disciplina"),"deporte_id")

In [52]:
deportesOlimpicosDF.printSchema()

root
 |-- evento_id: integer (nullable = true)
 |-- Nombre_de_disciplina: string (nullable = true)
 |-- deporte_id: integer (nullable = true)



In [53]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad_al_jugar: integer (nullable = true)
 |-- equipo_id: integer (nullable = true)



Una vez que expusimos los esquemas, ahora procederemos a hacer un join entre dos DF

In [54]:
demoDF = deportistaOlimpicoDF.\
            join(resultadoDF,\
                 deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,"left").\
            join(juegoDF,juegoDF.juego_id == resultadoDF.juego_id,"left").\
            join(deportesOlimpicosDF,\
            deportesOlimpicosDF.evento_id == resultadoDF.evento_id,"left").\
            select(deportistaOlimpicoDF.nombre,deportistaOlimpicoDF.edad_al_jugar,\
                   resultadoDF.medalla,juegoDF.year,deportesOlimpicosDF.Nombre_de_disciplina)

In [55]:
demoDF.show()

+--------------------+-------------+-------+-------------+--------------------+
|              nombre|edad_al_jugar|medalla|         year|Nombre_de_disciplina|
+--------------------+-------------+-------+-------------+--------------------+
|           A Dijiang|           24|     NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|           23|     NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1988 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|    

**Nota:** Hay un error cuando los origenes de los DF no son los archivos. El error ocurre cuando el origen de los DF no es el mismo, la solución es crear todos los DF desde una misma fuente (archivos CSV).

### Fin la lección 16

### Lección 17: Solución al reto de joins.

In [56]:
resultadoDF.filter(resultadoDF.medalla != "NA")\
    .join(deportistaOlimpicoDF,deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,"left")\
    .join(paisesDF,paisesDF.equipo_id == deportistaOlimpicoDF.equipo_id,"left")\
    .select("medalla","nombre_equipo","pais").sort(col("pais").desc()).show()

+-------+-------------+----+
|medalla|nombre_equipo|pais|
+-------+-------------+----+
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
| Bronze|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
+-------+-------------+----+
only showing top 20 rows



### Fin de la lección 17.

### Lección 18: Funciones de Agrupación.

In [57]:
medallistaXAnio = deportistaOlimpicoDF \
    .join(
        resultadoDF, 
        deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id, 
        "left"
    ) \
    .join(
        juegoDF, 
        juegoDF.juego_id == resultadoDF.juego_id, 
        "left"
    ) \
    .join(
        paisesDF, 
        deportistaOlimpicoDF.equipo_id == paisesDF.equipo_id, 
        "left"
    ) \
    .join(
        deportesOlimpicosDF, 
        deportesOlimpicosDF.evento_id == resultadoDF.evento_id, 
        "left"
    ) \
    .join(
        deportesDF, 
        deportesOlimpicosDF.deporte_id == deportesDF.deporte_id, 
        "left"
    ) \
    .select(
        "pais",
        "year",
        "medalla",
        deportesOlimpicosDF.Nombre_de_disciplina.alias("Nombre_Subdisciplina"),
        deportesDF.deporte.alias("Nombre disciplina"),
        deportistaOlimpicoDF.nombre    
    )

In [58]:
medallistaXAnio.show()

+----+-------------+-------+--------------------+--------------------+--------------------+
|pais|         year|medalla|Nombre_Subdisciplina|   Nombre disciplina|              nombre|
+----+-------------+-------+--------------------+--------------------+--------------------+
| CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
| CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
| DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
| SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
| NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
| NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
| NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
| NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine 

In [59]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != "NA")\
    .sort("year")\
    .groupby("pais", "year", "Nombre_Subdisciplina")\
    .count()

In [60]:
medallistaXAnio2.printSchema()

root
 |-- pais: string (nullable = true)
 |-- year: string (nullable = true)
 |-- Nombre_Subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



Esta manera de agrupar no es muy recomendadad, debido a que Python asigna un tipo "long" al resultado de conteo. A continuación se muestra la manera màs recomendadaa de hacer una agrupación.

In [61]:
medallistaXAnio2.groupBy("pais", "year")\
    .agg(sum("count").alias("Total de medallas"),\
        avg("count").alias("Medallas promedio")).show()

+----+-----------+-----------------+------------------+
|pais|       year|Total de medallas| Medallas promedio|
+----+-----------+-----------------+------------------+
| USA|1896 Verano|               20|1.6666666666666667|
| GER|1896 Verano|               30|               2.5|
| GBR|1896 Verano|                8|1.1428571428571428|
| FRA|1896 Verano|               11|             1.375|
| GRE|1896 Verano|                9|1.2857142857142858|
| HUN|1896 Verano|                6|               1.0|
| AUS|1896 Verano|                3|               1.0|
| AUT|1896 Verano|                5|               1.0|
| DEN|1896 Verano|                6|               1.0|
| SUI|1896 Verano|                3|               1.0|
| SWE|1900 Verano|                6|               3.0|
| USA|1900 Verano|               65|2.4074074074074074|
| FRA|1900 Verano|              179| 3.314814814814815|
| GER|1900 Verano|               27|               4.5|
| NOR|1900 Verano|                9|            

### Fin de la lección 18.

### Lección 19: SQL.

En esta lección vamos a registar algunos DFs como si fueran tablas tradicionales de SQL.

In [62]:
resultadoDF.registerTempTable("resultado")
deportistaOlimpicoDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

In [63]:
sqlContext.sql("SELECT * FROM resultado").show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [64]:
paisesDF.printSchema()

root
 |-- equipo_id: integer (nullable = true)
 |-- nombre_equipo: string (nullable = true)
 |-- pais: string (nullable = true)



In [65]:
resultadoDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [66]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad_al_jugar: integer (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [67]:
sqlContext.sql("""
                SELECT medalla, nombre_equipo, pais FROM resultado AS r
                JOIN deportista AS d
                ON r.deportista_id = d.deportista_id
                JOIN paises AS p
                ON p.equipo_id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY pais DESC
                """).show(5)

+-------+-------------+----+
|medalla|nombre_equipo|pais|
+-------+-------------+----+
|   Gold|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
| Silver|     Zimbabwe| ZIM|
|   Gold|     Zimbabwe| ZIM|
+-------+-------------+----+
only showing top 5 rows



In [68]:
spark

Recomendación: Cuando estes cargando archivos o conexiones con bases de datos y vayas a darle vida a un DF, no hay problema en utilizar el sqlContext (Es lo más normal). Sin embargo, cuando empiezas a hacer cruces y demás operaciones join y de acrupación, opera con los DF directamente.

El verdadero poder de Spark está en sus funcions nattivas.

### Fin de la lección 19.

### Lección 20: ¿Qué es un UDF?.

Las funciones definidas por el usuario o UDF, por sus siglas en inglés, son una funcionalidad agregada en Spark para definir funciones basadas en columnas las cuales permiten extender las capacidades de Spark al momento de transformar el set de datos.

Este tipo de implementaciones son convenientes cuando tenemos un desarrollo extenso donde hemos identificado la periodicidad de tareas repetitivas como suele ser en pasos de limpieza de datos, transformación o renombrado dinámico de columnas.

Por lo anterior es común encontrar en un proyecto de Spark una librería independiente donde existen todas estas funciones agregadas para que los desarrolladores involucrados en el proyecto puedan usarlas a conveniencia.

El uso de UDF no implica que las funciones que podemos crear nativamente con Python, Scala, R o Java no sean útiles. Una UDF tiene el objetivo de ofrecer un estándar interno en el proyecto que nos encontremos realizando. Además, en caso de ser necesario, una UDF puede ser modificada con ayuda de decoradores para que sea más extensible en diversos escenarios a los cuales nos podemos enfrentar.

Otro motivo para usar UDF es que en el módulo de Spark MLlib, la librería nativa de Spark para operaciones de Machine Learning, las UDF juegan un papel vital al momento de hacer transformaciones. Por lo cual tener un uso familiar de estas ampliará considerablemente la curva de aprendizaje de Spark MLlib.

### Fin de la lección 20.

### Lección 21: UDF

In [69]:
!head -n 5 ./files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [70]:
deportistaOlimpicoRDD =  spark.textFile(path+"deportistaError.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDD=deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
l[0],
l[1],
l[2],
l[3],
l[4],
l[5],
l[6]
))

schema = StructType([
StructField("deportista_id",StringType(),False)     ,
StructField("nombre",StringType(),False)        ,
StructField("genero",StringType(),False)        ,
StructField("edad",StringType(),True)      ,
StructField("altura",StringType(),True)        ,
StructField("peso",StringType(),True)      ,
StructField("equipo_id",StringType(),True)     
])

deportistaErrorDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)
deportistaErrorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

En la celda anterior podemos ver que carga el DF, pero en la ausencia de datos no hay una palabra NULL. Esto lo resolveremos con la siguiente UDF.

Los pasos para crear la udf son:

    1. Crear la función base
    2. Registrarla como udf
    3. Indicar al sqlContext que la usaremos como función nativa en sqlContext (opcional)


In [71]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
    return int(valor) if len(valor) > 0 else None

conversionEnteros_udf = udf(lambda z: conversionEnteros(z), IntegerType())
sqlContext.udf.register("conversionEnteros_udf", conversionEnteros_udf)

<function __main__.<lambda>(z)>

In [72]:
deportistaErrorDF.select(conversionEnteros_udf("altura")).show()

+----------------+
|<lambda>(altura)|
+----------------+
|             180|
|             170|
|            null|
|            null|
|             185|
|             188|
|             183|
|             168|
|             186|
|            null|
|             182|
|             172|
|             159|
|             171|
|            null|
|             184|
|             175|
|             189|
|            null|
|             176|
+----------------+
only showing top 20 rows



### Fin de la lección 21.

# Fin del módulo 4.