# Dataframes y replicación

# Creamos un contexto de Spark y otro de SQL

Nota: Cargo desde el inicio todos los métodos/modulos que se usarán a lo largo del notebook.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import pyspark.sql 
from pyspark.sql import SQLContext
from pyspark.sql.functions import * 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
from pyspark.sql.types import Row

In [2]:
spark = SparkContext(master="local", appName="DF y replicación")
sqlContext = SQLContext(spark)

21/12/16 20:46:53 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 172.18.0.2 instead (on interface eth0)
21/12/16 20:46:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/16 20:46:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

## Función para eliminar encabezados

In [4]:
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) 

## Creación del primer DataFrame

Las tres cosas que debes recordar al crear un Dataframe desde un RDDs son:
1. En caso de tener encabezado, eliminarlo
2. Seleccionar y hacer explícita la seperación de las columnas. Si es necesario castear valores
3. Crear el esquema a usarse con los tipos de datos de Spark

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [5]:
!head -n 5 data/deportista.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,0,0,273
4,Edgar Lindenau Aabye,1,34,0,0,278


In [6]:
path = "data/"

deportistaOlimpicoRDD =  spark.textFile(path+"deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = spark.textFile(path+"deportista2.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(dropFirstRow)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

schema = StructType([
StructField("deportista_id",IntegerType(),False)     ,
StructField("nombre",StringType(),False)        ,
StructField("genero",IntegerType(),False)        ,
StructField("edad",IntegerType(),True)      ,
StructField("altura",IntegerType(),True)        ,
StructField("peso",FloatType(),True)      ,
StructField("equipo_id",IntegerType(),True)     
])

deportistaOlimpicoDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [7]:
deportistaOlimpicoDF.show(4)

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
+-------------+--------------------+------+----+------+----+---------+
only showing top 4 rows



                                                                                

## Creación de DF desde archivo

En el caso de la creación de un DF desde cero, solo debemos de indicar la estructura, nombre del archivo y opcionalmente si posee o no encabezado.

In [8]:
!head -n 5 data/deporte.csv

deporte_id,deporte
1,Basketball
2,Judo
3,Football
4,Tug-Of-War


In [9]:
deportesOlimpicosRDDSchema = StructType(
    [StructField("deporte_id",IntegerType(),False),
     StructField("deporte",StringType(),False)
    ])

deportesDF = sqlContext.read.schema(deportesOlimpicosRDDSchema).option("header","true").csv(path+"deporte.csv")

In [10]:
deportesDF.show(5)

+----------+-------------+
|deporte_id|      deporte|
+----------+-------------+
|         1|   Basketball|
|         2|         Judo|
|         3|     Football|
|         4|   Tug-Of-War|
|         5|Speed Skating|
+----------+-------------+
only showing top 5 rows



## Reto

Dar vida a todos los archivos como Dataframes.

Se anexa una solución probable.

In [16]:
!ls data/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [17]:
!head -n 4 data/paises.csv

id,equipo,sigla
1,30. Februar,AUT
2,A North American Team,MEX
3,Acipactli,MEX


In [18]:
paisesRDD = spark.textFile(path+"paises.csv").map(lambda line : line.split(","))
paisesRDD = paisesRDD.mapPartitionsWithIndex(dropFirstRow)

paisesRDD = paisesRDD.map(lambda l : (
int(l[0]),
l[1],
l[2]
))

schema = StructType([
StructField("id",IntegerType(),False),
StructField("equipo",StringType(),False),
StructField("sigla",StringType(),False)
])

paisesDF = sqlContext.createDataFrame(paisesRDD,schema)

In [19]:
!head -n 4 data/evento.csv

evento_id,evento,deporte_id
1,Basketball Men's Basketball,1
2,Judo Men's Extra-Lightweight,2
3,Football Men's Football,3


In [20]:
eventoSchema= StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])

deportesOlimpicosDF = sqlContext.read.schema(eventoSchema).option("header","true").csv(path+"evento.csv")

In [21]:
!head -n 4 data/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis


In [22]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(),False),
    StructField("nombre_juego",StringType(),False),
    StructField("annio",StringType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False),
])
juegoDF = sqlContext.read.schema(juegoSchema).option("header","true").csv(path+"juegos.csv")

In [23]:
!head -n 4 data/resultados.csv

resultado_id,medalla,deportista_id,juego_id,evento_id
1,NA,1,39,1
2,NA,2,49,2
3,NA,3,7,3


In [24]:
resultadoSchema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False),
])
resultadoDF = sqlContext.read.schema(resultadoSchema).option("header","true").csv(path+"resultados.csv")

In [25]:
deportesDF.take(5)

[Row(deporte_id=1, deporte='Basketball'),
 Row(deporte_id=2, deporte='Judo'),
 Row(deporte_id=3, deporte='Football'),
 Row(deporte_id=4, deporte='Tug-Of-War'),
 Row(deporte_id=5, deporte='Speed Skating')]

In [26]:
deportesOlimpicosDF.take(5)

[Row(evento_id=1, evento="Basketball Men's Basketball", deporte_id=1),
 Row(evento_id=2, evento="Judo Men's Extra-Lightweight", deporte_id=2),
 Row(evento_id=3, evento="Football Men's Football", deporte_id=3),
 Row(evento_id=4, evento="Tug-Of-War Men's Tug-Of-War", deporte_id=4),
 Row(evento_id=5, evento="Speed Skating Women's 500 metres", deporte_id=5)]

In [27]:
paisesDF.take(5)

[Row(id=1, equipo='30. Februar', sigla='AUT'),
 Row(id=2, equipo='A North American Team', sigla='MEX'),
 Row(id=3, equipo='Acipactli', sigla='MEX'),
 Row(id=4, equipo='Acturus', sigla='ARG'),
 Row(id=5, equipo='Afghanistan', sigla='AFG')]

In [28]:
juegoDF.take(5)

21/12/16 20:47:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego, annio, temporada, ciudad
 Schema: juego_id, nombre_juego, annio, temporada, ciudad
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv


[Row(juego_id=1, nombre_juego='1896 Verano', annio='1896', temporada='Verano', ciudad='Athina'),
 Row(juego_id=2, nombre_juego='1900 Verano', annio='1900', temporada='Verano', ciudad='Paris'),
 Row(juego_id=3, nombre_juego='1904 Verano', annio='1904', temporada='Verano', ciudad='St. Louis'),
 Row(juego_id=4, nombre_juego='1906 Verano', annio='1906', temporada='Verano', ciudad='Athina'),
 Row(juego_id=5, nombre_juego='1908 Verano', annio='1908', temporada='Verano', ciudad='London')]

In [29]:
deportistaOlimpicoDF.take(5)

[Row(deportista_id=1, nombre='A Dijiang', genero=1, edad=24, altura=180, peso=80.0, equipo_id=199),
 Row(deportista_id=2, nombre='A Lamusi', genero=1, edad=23, altura=170, peso=60.0, equipo_id=199),
 Row(deportista_id=3, nombre='Gunnar Nielsen Aaby', genero=1, edad=24, altura=0, peso=0.0, equipo_id=273),
 Row(deportista_id=4, nombre='Edgar Lindenau Aabye', genero=1, edad=34, altura=0, peso=0.0, equipo_id=278),
 Row(deportista_id=5, nombre='Christine Jacoba Aaftink', genero=2, edad=21, altura=185, peso=82.0, equipo_id=705)]

In [30]:
resultadoDF.take(5)

[Row(resultado_id=1, medalla='NA', deportista_id=1, juego_id=39, evento_id=1),
 Row(resultado_id=2, medalla='NA', deportista_id=2, juego_id=49, evento_id=2),
 Row(resultado_id=3, medalla='NA', deportista_id=3, juego_id=7, evento_id=3),
 Row(resultado_id=4, medalla='Gold', deportista_id=4, juego_id=2, evento_id=4),
 Row(resultado_id=5, medalla='NA', deportista_id=5, juego_id=36, evento_id=5)]

## Revisión de esquema

En ocasiones nosotros no creamos los Dataframes y la estrucutra es desconocida para nosotros. con ayuda del método 'printSchema' podemos conocer el esquema del DataFrame

In [31]:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [32]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



# Operaciones de renombrado y eliminación

Para renombrar una columna de un DF, podemos usar el método 'withColumnRenamed' o 'alias'.

Para eliminar columnas, podemos usar el método 'drop' o simplemente selecionar las columnas que deseamos y sobreesciribr el DF

In [33]:
deportistaOlimpicoDF_2 = deportistaOlimpicoDF
deportistaOlimpicoDF_2 = deportistaOlimpicoDF_2.withColumnRenamed("genero","sexo").drop("altura")

In [34]:
deportistaOlimpicoDF_2.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [35]:
from pyspark.sql.functions import *
deportistaOlimpicoDF_2 = deportistaOlimpicoDF_2.select("deportista_id","nombre",
                            col("edad").alias("edadAlJugar"),"equipo_id")

In [36]:
deportistaOlimpicoDF_2.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



## Filtrado de valores

Como con el uso de RDDs, podemos usar el método 'filter' para selecionar subconjuntos.

filter permite usar operaciones lógicas y de comparación como <,>,>=,!= , &,| 

In [38]:
deportistaOlimpicoDF_2 = deportistaOlimpicoDF_2.filter( (deportistaOlimpicoDF_2.edadAlJugar != 0))

In [39]:
deportistaOlimpicoDF_2.sort("edadAlJugar").show(5)



+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        52070|        Etsuko Inada|         11|      514|
|        40129|    Luigina Giavotti|         11|      507|
|        37333|Carlos Bienvenido...|         11|      982|
|        47618|Sonja Henie Toppi...|         11|      742|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



                                                                                

In [40]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [41]:
resultadoDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [42]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- nombre_juego: string (nullable = true)
 |-- annio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [43]:
deportesOlimpicosDF.printSchema()

root
 |-- evento_id: integer (nullable = true)
 |-- evento: string (nullable = true)
 |-- deporte_id: integer (nullable = true)



## Unión de DF

Las operaciones conocidas como Join en SQL, tienen una impementación similar, ya que  el método 'join' recibe tres componentes:

| Orden | Argumento | Descripción |
|-------|--------|-----|
|1|dataFrame|dataFrame con el que queremos realizar el cruce|
|2|Cruze|Operación lógica a realizar para poder unir los Dataframes|
|3|Tipo|El tipo de join a realizar: "Left", "Right",etc|

No olvides que un join es una operación binaria. Por lo que si deseas unir mas DF, deberás realizar multiples joins

Posterior a los joins realizados, debemos de realizar una operación select para indicar que valores queremos. 

En el caso de campos repetidos, podemos hacer explícito el dataframe de origen y para evitar confusón, utilizar alias.

In [44]:
deportistaOlimpicoDF.join(resultadoDF, deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,"left") \
    .join(juegoDF,juegoDF.juego_id == resultadoDF.juego_id,"left") \
    .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadoDF.evento_id,"left") \
    .select(deportistaOlimpicoDF.nombre,col("edad").alias("Edad al jugar"),
           "medalla",col("annio").alias("Año de juego"), \
           deportesOlimpicosDF.evento.alias("Nombre de disciplina")).show()

21/12/16 20:47:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv


+--------------------+-------------+-------+------------+--------------------+
|              nombre|Edad al jugar|medalla|Año de juego|Nombre de disciplina|
+--------------------+-------------+-------+------------+--------------------+
|           A Dijiang|           24|     NA|        1992|Basketball Men's ...|
|            A Lamusi|           23|     NA|        2012|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|        1920|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|        1900|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|        1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1988|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        

De la misma forma que una instrucción SQL posee una jerarquía para poder funcionar y retornar correctamente los valores que deseamos. Los DF estan reguidos por las mismas reglas, es decir la misma jerarquía

In [45]:
resultadoDF.filter(resultadoDF.medalla != "NA") \
    .join(deportistaOlimpicoDF,deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,"left") \
    .join(paisesDF,paisesDF.id == deportistaOlimpicoDF.equipo_id, "left") \
    .select("medalla", "equipo","sigla").sort( col("sigla").desc() ).show()

                                                                                

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



## Funciones escalares

De la misma forma que SQL posee funciones para poder obtener estadísticas. DF hereda el mismo concepto apoyandose de los métodos 'groupBy', "agg" y los ya conocidos de sql "count","sum","avg" etc.

Para el ejercicio, buscaremos conocer cuantas medallas ha ganado un pais en cada juego olimpico.

Primero realizamos la batería de joins que nos permitan identificar todos los valores que necesitamos.

In [49]:
medallistaXAnio = deportistaOlimpicoDF \
        .join(resultadoDF, deportistaOlimpicoDF.deportista_id ==resultadoDF.deportista_id,"left") \
        .join(juegoDF, juegoDF.juego_id == resultadoDF.juego_id,"left") \
        .join(paisesDF,deportistaOlimpicoDF.equipo_id == paisesDF.id,"left") \
        .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadoDF.evento_id,"left") \
        .join(deportesDF,deportesOlimpicosDF.deporte_id == deportesDF.deporte_id,"left") \
        .select("sigla",
                "annio",
                "medalla",
                deportesOlimpicosDF.evento.alias("Nombre subdisciplina"),
                deportesDF.deporte.alias("Nombre disciplina"),
                deportistaOlimpicoDF.nombre,   
                )

In [50]:
medallistaXAnio.show()

21/12/16 21:33:38 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv


+-----+-----+-------+--------------------+--------------------+--------------------+
|sigla|annio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-----+-------+--------------------+--------------------+--------------------+
|  CHN| 1992|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN| 2012|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN| 1920|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE| 1900|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED| 1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED| 1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED| 1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED| 1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED| 1988|     NA|Speed Skating Wom...|       Speed Skating|Ch

Previo, identificamos el uso del método "like".

El cual es util cuando no sabemos el nombre completo o correcto de una columna deseada. 

En este ejemplo, apartir de todos los juegos de Ski Aplino Femenino, obtenemos la competencias en las que participó el pais. Recuerda que la columna medalla aun posee valores NA.

In [65]:
medallistaXAnio.where( col("Nombre subdisciplina").like("Alpine Skiing Wo%")) \
    .groupBy("sigla","annio").count() \
    .sort(col("annio").asc()) \
    .show()

21/12/16 21:51:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv

+-----+-----+-----+
|sigla|annio|count|
+-----+-----+-----+
|  SUI| 1936|    2|
|  NOR| 1936|    3|
|  CAN| 1936|    4|
|  NED| 1936|    1|
|  LAT| 1936|    1|
|  EST| 1936|    1|
|  GBR| 1936|    4|
|  GER| 1936|    4|
|  TCH| 1936|    3|
|  ITA| 1936|    4|
|  AUT| 1936|    4|
|  ESP| 1936|    2|
|  USA| 1936|    4|
|  FRA| 1948|   14|
|  HUN| 1948|    3|
|  ITA| 1948|    6|
|  GBR| 1948|   12|
|  AUT| 1948|   14|
|  SUI| 1948|   14|
|  SWE| 1948|    3|
+-----+-----+-----+
only showing top 20 rows



                                                                                

En este paso nos quedamos solo con medallas

In [66]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != "NA") \
    .groupBy("Sigla","annio","Nombre subdisciplina") \
    .count() \
    .sort(col("annio").asc())
medallistaXAnio2.show()

21/12/16 21:51:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv

+-----+-----+--------------------+-----+
|Sigla|annio|Nombre subdisciplina|count|
+-----+-----+--------------------+-----+
|  USA| 1896|Athletics Men's D...|    1|
|  USA| 1896|Athletics Men's 1...|    1|
|  FRA| 1896|Cycling Men's 100...|    1|
|  USA| 1896|Athletics Men's 4...|    2|
|  GER| 1896|Gymnastics Men's ...|    2|
|  FRA| 1896|Cycling Men's Sprint|    2|
|  GER| 1896|Gymnastics Men's ...|   10|
|  FRA| 1896|Cycling Men's 10,...|    2|
|  AUS| 1896|Athletics Men's 1...|    1|
|  GER| 1896|Athletics Men's 1...|    1|
|  FRA| 1896|Fencing Men's Foi...|    2|
|  GBR| 1896|Athletics Men's 4...|    1|
|  GBR| 1896|Cycling Men's Roa...|    1|
|  GRE| 1896|Cycling Men's 100...|    1|
|  USA| 1896|Athletics Men's L...|    3|
|  GRE| 1896|Shooting Men's Fr...|    2|
|  GRE| 1896|Tennis Men's Singles|    2|
|  GBR| 1896|Tennis Men's Singles|    1|
|  USA| 1896|Athletics Men's H...|    3|
|  HUN| 1896|Swimming Men's 10...|    1|
+-----+-----+--------------------+-----+
only showing top

                                                                                

## Forma recomendada para agrupar

El método 'agg' es la forma recomendada para hacer agrupaciones ya que brinda la oportunidad de escalar la cantidad de operaciones escalares a realizar en un mismo DF.

Es claro que si solo realizamos una operación de agrupación, el uso de 'agg' es excesivo, esta es la recomendación oficial de uso.

In [72]:
medallistaXAnio2.groupBy("sigla","annio") \
    .agg(sum("count").alias("Total de medallas"), \
         avg("count").alias("Medallas promedio")) \
    .sort(col("annio").asc(), col("Total de medallas").desc()) \
    .show()

21/12/16 22:00:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv

+-----+-----+-----------------+------------------+
|sigla|annio|Total de medallas| Medallas promedio|
+-----+-----+-----------------+------------------+
|  GRE| 1896|               48|1.6551724137931034|
|  GER| 1896|               32|2.6666666666666665|
|  USA| 1896|               20|1.6666666666666667|
|  FRA| 1896|               11|             1.375|
|  GBR| 1896|                9|             1.125|
|  HUN| 1896|                6|               1.0|
|  DEN| 1896|                6|               1.0|
|  AUT| 1896|                5|               1.0|
|  SUI| 1896|                3|               1.0|
|  AUS| 1896|                3|               1.0|
|  FRA| 1900|              250| 4.310344827586207|
|  GBR| 1900|               90|3.4615384615384617|
|  USA| 1900|               67| 2.310344827586207|
|  GER| 1900|               45| 6.428571428571429|
|  BEL| 1900|               45|            2.8125|
|  NED| 1900|               26| 4.333333333333333|
|  SUI| 1900|               21|

                                                                                

## Funciones escalares

El ejempo realizado para obtener las medallas ganadas por un pais se migará para poder visualizar como sería integrar SQL a un proceso de Spark

In [73]:
resultadoDF.filter(resultadoDF.medalla != "NA") \
    .join(deportistaOlimpicoDF,deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,"left") \
    .join(paisesDF,paisesDF.id == deportistaOlimpicoDF.equipo_id, "left") \
    .select("medalla", "equipo","sigla").sort( col("sigla").desc() ).show()



+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



[Stage 196:>                                                        (0 + 1) / 1]                                                                                

El uso de DF como SQL, se usa registrando un DF como tabla temportal.

En el caso de realizar la conexión a una base de datos, este paso puede llegar a ser omitido. Ya que spark estará configurado para poder hacer las conexiones implicitamente

In [76]:
resultadoDF.createOrReplaceTempView("resultado")
deportistaOlimpicoDF.createOrReplaceTempView("deportista")
paisesDF.createOrReplaceTempView("paises")

El alias asignado, será la forma en la cual sqlContext conocerá el DF internamente, ahora podemos hacer operaciones de forma tradicional.

In [77]:
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [78]:
sqlContext.sql("""
                SELECT medalla, equipo, sigla FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla DESC
                """).show()



+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



                                                                                

### UDF

Nota: Este apartado en el curso se pone al final.

Para ejemplificar la función creada por el usuario, cargamos deportistaError el cual tiene ausencia de valores.

Con la UDF solucionamos el error. Esta no es una solución definitiva, solo es demostrativa para explicar como crear una UDF.

Las **funciones definidas por el usuario o UDF**, por sus siglas en inglés, son una funcionalidad agregada en Spark para definir funciones basadas en columnas las cuales permiten extender las capacidades de Spark al momento de transformar el set de datos.

Este tipo de implementaciones son convenientes cuando tenemos un desarrollo extenso donde hemos identificado la periodicidad de tareas repetitivas como suele ser en pasos de limpieza de datos, transformación o renombrado dinámico de columnas.

Por lo anterior es común encontrar en un proyecto de Spark una librería independiente donde existen todas estas funciones agregadas para que los desarrolladores involucrados en el proyecto puedan usarlas a conveniencia.

El uso de UDF no implica que las funciones que podemos crear nativamente con Python, Scala, R o Java no sean útiles. Una UDF tiene el objetivo de ofrecer un estándar interno en el proyecto que nos encontremos realizando. Además, en caso de ser necesario, una UDF puede ser modificada con ayuda de decoradores para que sea más extensible en diversos escenarios a los cuales nos podemos enfrentar.

Otro motivo para usar UDF es que en el módulo de Spark MLlib, la librería nativa de Spark para operaciones de Machine Learning, las UDF juegan un papel vital al momento de hacer transformaciones. Por lo cual tener un uso familiar de estas ampliará considerablemente la curva de aprendizaje de Spark MLlib.

In [11]:
!head -n 5 data/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [12]:
deportistaOlimpicoRDDerr =  spark.textFile(path+"deportistaError.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDDerr = deportistaOlimpicoRDDerr.mapPartitionsWithIndex(dropFirstRow)

deportistaOlimpicoRDDerr = deportistaOlimpicoRDDerr.map(lambda l : (
    int(l[0]),l[1],l[2],l[3],l[4],l[5],l[6]
))

schema = StructType([
StructField("deportista_id",IntegerType(),False),
StructField("nombre",StringType(),False),
StructField("genero",StringType(),False),
StructField("edad",StringType(),True),
StructField("altura",StringType(),True),
StructField("peso",StringType(),True),
StructField("equipo_id",StringType(),True)     
])

deportistaError = sqlContext.createDataFrame(deportistaOlimpicoRDDerr,schema)

In [13]:
deportistaError.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

### Creación de UDF

Los pasos para crear la udf son:

1. Crear la función base
2. Registrarla como udf
3. Indicar al sqlContext que la usaremos como función nativa en sqlContext (opcional)

In [79]:
def ci(value: str) -> int:
    return int(value) if len(value) > 0 else None


ci_udf = udf(lambda z : ci(z), IntegerType())

sqlContext.udf.register("ci_udf", ci_udf)

deportistaError.select("deportista_id",ci_udf("altura").alias("altura")).show()

21/12/16 23:24:06 WARN SimpleFunctionRegistry: The function ci_udf replaced a previously registered function.


+-------------+------+
|deportista_id|altura|
+-------------+------+
|            1|   180|
|            2|   170|
|            3|  null|
|            4|  null|
|            5|   185|
|            6|   188|
|            7|   183|
|            8|   168|
|            9|   186|
|           10|  null|
|           11|   182|
|           12|   172|
|           13|   159|
|           14|   171|
|           15|  null|
|           16|   184|
|           17|   175|
|           18|   189|
|           19|  null|
|           20|   176|
+-------------+------+
only showing top 20 rows



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [15]:
deportistaError.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

# Persistencia

La persistencia de datos no ocurre por defecto en un DF o RDD de Spark, por lo cual debemos de indicar con el método 'cache', por otro lado, para poder verificar si esta almacenado o no, con el método 'is_cached' verificamos su estatus

Como se ha descrito en clases pasadas, los RDD son la capa de abstracción primaria para poder interactuar con los datos que viven en nuestro ambiente de Spark. Aunque estos puedan ser enmascarados con un esquema dotándolos de las facultades propias de los DataFrames, la información de fondo sigue operando como RDD.

Por lo tanto, la información, como indica el nombre de los RDD, se maneja de forma distribuida a lo largo del clúster, facilitando las operaciones que se van a ejecutar, ya que segmentos de información pueden encontrarse en diferentes ejecutores reduciendo el tiempo necesario para acceder a la información y poder así realizar los cálculos necesarios.

Cuando un RDD o Dataframe es creado, según las especificaciones que se indiquen a la aplicación de Spark, creará un **esquema de particionado básico**, el cual distribuirá los datos a lo largo del clúster. Siendo así que al momento de ejecutar una acción, esta se ejecutará entre los diversos fragmentos de información que existan para poder así realizar de la forma más rápida las operaciones. Es por eso que un correcto esquema de particionado es clave para poder tener aplicaciones rápidas y precisas que además consuman pocos recursos de red.

Otra de las tareas fundamentales es la **replicación de componentes y sus fragmentos**, ya que al aumentar la disponibilidad de estos podremos asegurar una tolerancia a fallos, mientras más se replique un valor es más probable que no se pierda si existe un fallo de red o energía, además de permitir una disponibilidad casi inmediata del archivo buscado.

La partición y replicación son elementos que deben ser analizados según el tipo de negocio o requerimientos que se tengan en el desarrollo que se encuentre en progreso, por lo cual la cantidad de datos replicados o granularidad de datos existentes en los fragmentos dependerá en función de las reglas de negocio

In [80]:
medallistaXAnio.is_cached

False

In [81]:
medallistaXAnio.rdd.cache()

21/12/17 00:27:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, annio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/data/juegos.csv


MapPartitionsRDD[558] at javaToPython at NativeMethodAccessorImpl.java:0

Para poder verificar el tipo de almacenamiento asignado, debemos de conocer el valor de códigos que nos regresa getStorageLevel

Para esto, podemos verificar en la documentación de spark:
https://spark.apache.org/docs/2.4.6/api/python/_modules/pyspark/storagelevel.html

In [82]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

Para poder cambiar el tipo de persistencia debemos de primero retirarla y posterior a eso asignarle la que deseamos.

Con el método persist, asignaremos la persistencia que nosotros deseamos.

In [83]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[558] at javaToPython at NativeMethodAccessorImpl.java:0

In [84]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[558] at javaToPython at NativeMethodAccessorImpl.java:0

In [85]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

In [86]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

Finalmente, podemos crear nuestros propios esquemas de persistencia según las reglas y restricciones de negocio que tengamos en el proyecto.

In [87]:
#def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

In [88]:
medallistaXAnio.rdd.unpersist()
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[558] at javaToPython at NativeMethodAccessorImpl.java:0