# Dataframes y replicación

# Creamos un contexto de Spark y otro de SQL

Nota: Cargo desde el inicio todos los métodos/modulos que se usarán a lo largo del notebook.

In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import pyspark.sql 
from pyspark.sql import SQLContext
from pyspark.sql.functions import * 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
from pyspark.sql.types import Row


In [4]:
#!pip install py4j

Collecting py4j
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 845 kB/s eta 0:00:01
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.2


In [6]:
#spark.stop()
spark = SparkContext(master="local", appName="DF y replicación")
sqlContext = SQLContext(spark)

21/11/04 13:27:40 WARN Utils: Your hostname, GNULinux resolves to a loopback address: 127.0.1.1; using 192.168.1.69 instead (on interface wlp2s0)
21/11/04 13:27:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/04 13:27:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Función para eliminar encabezados

In [7]:
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) 

## Creación del primer DataFrame

Las tres cosas que debes recordar al crear un Dataframe desde un RDDs son:
1. En caso de tener encabezado, eliminarlo
2. Seleccionar y hacer explícita la seperación de las columnas. Si es necesario castear valores
3. Crear el esquema a usarse con los tipos de datos de Spark

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [8]:
path = "files/"

deportistaOlimpicoRDD =  spark.textFile(path+"deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = spark.textFile(path+"deportista2.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

deportistaOlimpicoRDD=deportistaOlimpicoRDD.mapPartitionsWithIndex(dropFirstRow)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

schema = StructType([
StructField("deportista_id",IntegerType(),False)     ,
StructField("nombre",StringType(),False)        ,
StructField("genero",IntegerType(),False)        ,
StructField("edad",IntegerType(),True)      ,
StructField("altura",IntegerType(),True)        ,
StructField("peso",FloatType(),True)      ,
StructField("equipo_id",IntegerType(),True)     
])

deportistaOlimpicoDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

## Creación de DF desde archivo

En el caso de la creación de un DF desde cero, solo debemos de indicar la estructura, nombre del archivo y opcionalmente si posee o no encabezado.

In [10]:
deportesOlimpicosRDDSchema = StructType(
    [StructField("deporte_id",IntegerType(),False),
     StructField("deporte",StringType(),False)
    ])

deportesDF = sqlContext.read.schema(deportesOlimpicosRDDSchema).option("header","true").csv(path+"deporte.csv")

### UDF

Nota: Este apartado en el curso se pone al final.

Para ejemplificar la función creada por el usuario, cargamos deportistaError el cual tiene ausencia de valores.

Con la UDF solucionamos el error. Esta no es una solución definitiva, solo es demostrativa para explicar como crear una UDF.

In [11]:
deportistaOlimpicoRDD =  spark.textFile(path+"deportistaError.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDD=deportistaOlimpicoRDD.mapPartitionsWithIndex(dropFirstRow)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
l[0],
l[1],
l[2],
l[3],
l[4],
l[5],
l[6]
))

schema = StructType([
StructField("deportista_id",StringType(),False)     ,
StructField("nombre",StringType(),False)        ,
StructField("genero",StringType(),False)        ,
StructField("edad",StringType(),True)      ,
StructField("altura",StringType(),True)        ,
StructField("peso",StringType(),True)      ,
StructField("equipo_id",StringType(),True)     
])

deportistaError = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [12]:
deportistaError.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

                                                                                

In [13]:
spark

### Creación de UDF

Los pasos para crear la udf son:

1. Crear la función base
2. Registrarla como udf
3. Indicar al sqlContext que la usaremos como función nativa en sqlContext (opcional)

In [14]:
def ci(value: str) -> int:
    return int(value) if len(value) > 0 else None

ci_udf = udf(lambda z : ci(z), IntegerType())

sqlContext.udf.register("ci_udf", ci_udf)

deportistaError.select(ci_udf("altura").alias("altura")).show()

[Stage 1:>                                                          (0 + 1) / 1]

+------+
|altura|
+------+
|   180|
|   170|
|  null|
|  null|
|   185|
|   188|
|   183|
|   168|
|   186|
|  null|
|   182|
|   172|
|   159|
|   171|
|  null|
|   184|
|   175|
|   189|
|  null|
|   176|
+------+
only showing top 20 rows



                                                                                

In [16]:
deportistaError.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

## Reto

Dar vida a todos los archivos como Dataframes.

Se anexa una solución probable.

In [18]:
paisesRDD = spark.textFile(path+"paises.csv").map(lambda line : line.split(","))
paisesRDD = paisesRDD.mapPartitionsWithIndex(dropFirstRow)

paisesRDD = paisesRDD.map(lambda l : (
int(l[0]),
l[1],
l[2]
))

schema = StructType([
StructField("id",IntegerType(),False),
StructField("equipo",StringType(),False),
StructField("sigla",StringType(),False)
])

paisesDF = sqlContext.createDataFrame(paisesRDD,schema)

In [29]:
!head files/evento.csv

evento_id,evento,deporte_id
1,Basketball Men's Basketball,1
2,Judo Men's Extra-Lightweight,2
3,Football Men's Football,3
4,Tug-Of-War Men's Tug-Of-War,4
5,Speed Skating Women's 500 metres,5
6,"Speed Skating Women's 1,000 metres",5
7,Cross Country Skiing Men's 10 kilometres,6
8,Cross Country Skiing Men's 50 kilometres,6
9,Cross Country Skiing Men's 10/15 kilometres Pursuit,6


In [30]:
eventoSchema= StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])

deportesOlimpicosDF = sqlContext.read.schema(eventoSchema).option("header","true").csv(path+"evento.csv")

In [31]:

juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("anio",StringType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False),
])
juegoDF = sqlContext.read.schema(juegoSchema).option("header","true").csv(path+"juegos.csv")

resultadoSchema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False),
])
resultadoDF = sqlContext.read.schema(resultadoSchema).option("header","true").csv(path+"resultados.csv")

In [26]:
deportesDF.take(5)

[Row(deporte_id=1, deporte='Basketball'),
 Row(deporte_id=2, deporte='Judo'),
 Row(deporte_id=3, deporte='Football'),
 Row(deporte_id=4, deporte='Tug-Of-War'),
 Row(deporte_id=5, deporte='Speed Skating')]

In [32]:
deportesOlimpicosDF.take(5)

[Row(evento_id=1, evento="Basketball Men's Basketball", deporte_id=1),
 Row(evento_id=2, evento="Judo Men's Extra-Lightweight", deporte_id=2),
 Row(evento_id=3, evento="Football Men's Football", deporte_id=3),
 Row(evento_id=4, evento="Tug-Of-War Men's Tug-Of-War", deporte_id=4),
 Row(evento_id=5, evento="Speed Skating Women's 500 metres", deporte_id=5)]

In [40]:
paisesDF.show()

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
|  4|             Acturus|  ARG|
|  5|         Afghanistan|  AFG|
|  6|            Akatonbo|  IRL|
|  7|            Alain IV|  SUI|
|  8|             Albania|  ALB|
|  9|              Alcaid|  POR|
| 10|            Alcyon-6|  FRA|
| 11|            Alcyon-7|  FRA|
| 12|           Aldebaran|  ITA|
| 13|        Aldebaran II|  ITA|
| 14|              Aletta|  IRL|
| 15|             Algeria|  ALG|
| 16|         Ali-Baba II|  SWE|
| 17|         Ali-Baba IV|  SUI|
| 18|         Ali-Baba IX|  SUI|
| 19|         Ali-Baba VI|  SUI|
| 20|             Allegro|  FRA|
+---+--------------------+-----+
only showing top 20 rows



In [34]:
juegoDF.take(5)

21/11/04 14:42:24 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 4
CSV file: file:///home/jazzzfm/PersonalProjects/JazzzDevelopmentWithSpark/curso-apache-spark-platzi/files/juegos.csv


[Row(juego_id=1, anio='1896 Verano', temporada='1896', ciudad='Verano'),
 Row(juego_id=2, anio='1900 Verano', temporada='1900', ciudad='Verano'),
 Row(juego_id=3, anio='1904 Verano', temporada='1904', ciudad='Verano'),
 Row(juego_id=4, anio='1906 Verano', temporada='1906', ciudad='Verano'),
 Row(juego_id=5, anio='1908 Verano', temporada='1908', ciudad='Verano')]

In [35]:
deportistaOlimpicoDF.take(5)

[Row(deportista_id=1, nombre='A Dijiang', genero=1, edad=24, altura=180, peso=80.0, equipo_id=199),
 Row(deportista_id=2, nombre='A Lamusi', genero=1, edad=23, altura=170, peso=60.0, equipo_id=199),
 Row(deportista_id=3, nombre='Gunnar Nielsen Aaby', genero=1, edad=24, altura=0, peso=0.0, equipo_id=273),
 Row(deportista_id=4, nombre='Edgar Lindenau Aabye', genero=1, edad=34, altura=0, peso=0.0, equipo_id=278),
 Row(deportista_id=5, nombre='Christine Jacoba Aaftink', genero=2, edad=21, altura=185, peso=82.0, equipo_id=705)]

In [36]:
resultadoDF.take(5)

[Row(resultado_id=1, medalla='NA', deportista_id=1, juego_id=39, evento_id=1),
 Row(resultado_id=2, medalla='NA', deportista_id=2, juego_id=49, evento_id=2),
 Row(resultado_id=3, medalla='NA', deportista_id=3, juego_id=7, evento_id=3),
 Row(resultado_id=4, medalla='Gold', deportista_id=4, juego_id=2, evento_id=4),
 Row(resultado_id=5, medalla='NA', deportista_id=5, juego_id=36, evento_id=5)]

## Revisión de esquema

En ocasiones nosotros no creamos los Dataframes y la estrucutra es desconocida para nosotros. con ayuda del método 'printSchema' podemos conocer el esquema del DataFrame

In [41]:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [42]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



# Operaciones de renombrado y eliminación

Para renombrar una columna de un DF, podemos usar el método 'withColumnRenamed' o 'alias'.

Para eliminar columnas, podemos usar el método 'drop' o simplemente selecionar las columnas que deseamos y sobreesciribr el DF

In [44]:
deportistaOlimpicoDF = deportistaOlimpicoDF\
                        .withColumnRenamed("genero","sexo")\
                        .drop("altura")

In [45]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [46]:
from pyspark.sql.functions import *
deportistaOlimpicoDF = deportistaOlimpicoDF\
            .select("deportista_id","nombre", col("edad").alias("edadAlJugar"),"equipo_id")

In [47]:
deportistaOlimpicoDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



## Filtrado de valores

Como con el uso de RDDs, podemos usar el método 'filter' para selecionar subconjuntos.

filter permite usar operaciones lógicas y de comparación como <,>,>=,!= , &,| 

In [49]:
deportistaOlimpicoDF = deportistaOlimpicoDF.filter( (deportistaOlimpicoDF.edadAlJugar != 0))

In [50]:
deportistaOlimpicoDF.sort("edadAlJugar").show()



+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        70616|          Liu Luyang|         11|      199|
|       118925|Megan Olwen Deven...|         11|      413|
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        47618|Sonja Henie Toppi...|         11|      742|
|        76675|   Marcelle Matthews|         11|      967|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|       126307|        Liana Vicens|         11|      825|
|        48939|             Ho Gang|         12|      738|
|        49142|        Jan Hoffmann|         12|      302|
|        42835|   Werner Grieshofer|         12|       7

                                                                                

In [63]:
deportistaOlimpicoDF.show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
|            6|     Per Knut Aaland|         31|     1096|
|            7|        John Aalberg|         31|     1096|
|            8|Cornelia Cor Aalt...|         18|      705|
|            9|    Antti Sami Aalto|         26|      350|
|           10|Einar Ferdinand E...|         26|      350|
|           11|  Jorma Ilmari Aalto|         22|      350|
|           12|   Jyri Tapani Aalto|         31|      350|
|           13|  Minna Maarit Aalto|         30|      350|
|           14|Pirjo Hannele Aal...|         32|      35

## Unión de DF

Las operaciones conocidas como Join en SQL, tienen una impementación similar, ya que  el método 'join' recibe tres componentes:

| Orden | Argumento | Descripción |
|-------|--------|-----|
|1|dataFrame|dataFrame con el que queremos realizar el cruce|
|2|Cruze|Operación lógica a realizar para poder unir los Dataframes|
|3|Tipo|El tipo de join a realizar: "Left", "Right",etc|

No olvides que un join es una operación binaria. Por lo que si deseas unir mas DF, deberás realizar multiples joins

Posterior a los joins realizados, debemos de realizar una operación select para indicar que valores queremos. 

En el caso de campos repetidos, podemos hacer explícito el dataframe de origen y para evitar confusón, utilizar alias.

In [65]:
deportistaOlimpicoDF.join(
    resultadoDF,
    deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,
    "left") \
    .join(
    juegoDF,
    juegoDF.juego_id == resultadoDF.juego_id,
    "left") \
    .join(
    deportesOlimpicosDF,
    deportesOlimpicosDF.evento_id == resultadoDF.evento_id,
    "left") \
    .select(
        deportistaOlimpicoDF.nombre,
        col("edadAlJugar").alias("Edad al jugar"),
        "medalla",
        col("anio").alias("Año de juego"),
        deportesOlimpicosDF.evento.alias("Nombre de disciplina")
    )\
    .show()

21/11/04 15:42:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jazzzfm/PersonalProjects/JazzzDevelopmentWithSpark/curso-apache-spark-platzi/files/juegos.csv
                                                                                

+--------------------+-------------+-------+-------------+--------------------+
|              nombre|Edad al jugar|medalla| Año de juego|Nombre de disciplina|
+--------------------+-------------+-------+-------------+--------------------+
|           A Dijiang|           24|     NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|           23|     NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1988 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|    

De la misma forma que una instrucción SQL posee una jerarquía para poder funcionar y retornar correctamente los valores que deseamos. Los DF estan reguidos por las mismas reglas, es decir la misma jerarquía

## Funciones escalares

De la misma forma que SQL posee funciones para poder obtener estadísticas. DF hereda el mismo concepto apoyandose de los métodos 'groupBy', "agg" y los ya conocidos de sql "count","sum","avg" etc.

In [76]:
deportesOlimpicosDF.show()

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
|        6|Speed Skating Wom...|         5|
|        7|Cross Country Ski...|         6|
|        8|Cross Country Ski...|         6|
|        9|Cross Country Ski...|         6|
|       10|Cross Country Ski...|         6|
|       11|Cross Country Ski...|         6|
|       12|Athletics Women's...|         7|
|       13|Athletics Women's...|         7|
|       14|Ice Hockey Men's ...|         8|
|       15|Swimming Men's 40...|         9|
|       16|Badminton Men's S...|        10|
|       17|Sailing Women's W...|        11|
|       18|Biathlon Women's ...|        12|
|       19|Swimming Men's 20...|         9|
|       20|Swimming Men's 40...|

Para el ejercicio, buscaremos conocer cuantas medallas ha ganado un pais en cada juego olimpico.

Primero realizamos la batería de joins que nos permitan identificar todos los valores que necesitamos.

In [77]:
medallistaXAnio = deportistaOlimpicoDF\
        .join(
            resultadoDF, 
            deportistaOlimpicoDF.deportista_id ==resultadoDF.deportista_id,
            "left"
            )\
        .join(
            juegoDF, 
            juegoDF.juego_id == resultadoDF.juego_id,
            "left"
            )\
        .join(
            paisesDF,
            deportistaOlimpicoDF.equipo_id == paisesDF.id,
            "left"
            ) \
        .join(
            deportesOlimpicosDF,
            deportesOlimpicosDF.evento_id == resultadoDF.evento_id,
            "left"
            ) \
        .join(
            deportesDF,
            deportesOlimpicosDF.deporte_id == deportesDF.deporte_id,
            "left"
            ) \
        .select("sigla",
                "anio",
                "medalla",
                deportesOlimpicosDF.evento.alias("Nombre subdisciplina"),
                deportesDF.deporte.alias("Nombre disciplina"),
                deportistaOlimpicoDF.nombre,   
                )

In [80]:
medallistaXAnio.show()

21/11/04 16:01:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jazzzfm/PersonalProjects/JazzzDevelopmentWithSpark/curso-apache-spark-platzi/files/juegos.csv


+-----+-------------+-------+--------------------+--------------------+--------------------+
|sigla|         anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+--------------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating

Previo, identificamos el uso del método "like".

El cual es util cuando no sabemos el nombre completo o correcto de una columna deseada. 

En este ejemplo, apartir de todos los juegos de Ski Aplino Femenino, obtenemos la competencias en las que participó el pais. Recuerda que la columna medalla aun posee valores NA.

In [87]:
medallistaXAnio.where(
    col("Nombre subdisciplina").like("Alpine Skiing Wo%")) \
    .sort("anio") \
    .groupBy("Sigla","anio")\
    .count() \
    .show()

21/11/04 16:12:27 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jazzzfm/PersonalProjects/JazzzDevelopmentWithSpark/curso-apache-spark-platzi/files/juegos.csv

+-----+-------------+-----+
|Sigla|         anio|count|
+-----+-------------+-----+
|  SUI|2014 Invierno|   19|
|  URS|1984 Invierno|    2|
|  ROU|2014 Invierno|    8|
|  LIE|1998 Invierno|    7|
|  USA|1952 Invierno|   12|
|  CYP|1992 Invierno|    2|
|  POR|2014 Invierno|    2|
|  BRA|2002 Invierno|    1|
|  FRA|2010 Invierno|   17|
|  CZE|2002 Invierno|   12|
|  TCH|1984 Invierno|   10|
|  ROU|1988 Invierno|    5|
|  SVK|2006 Invierno|   16|
|  ISL|1992 Invierno|    2|
|  LIE|1988 Invierno|    9|
|  SUI|1992 Invierno|   19|
|  NOR|1960 Invierno|   11|
|  AND|2014 Invierno|    3|
|  ESP|2010 Invierno|    7|
|  ITA|2014 Invierno|   18|
+-----+-------------+-----+
only showing top 20 rows



                                                                                

En este paso nos quedamos solo con medallas

In [88]:
medallistaXAnio2 = medallistaXAnio\
    .filter(medallistaXAnio.medalla != "NA")\
    .sort("anio")\
    .groupBy("Sigla","anio","Nombre subdisciplina")\
    .count()

## Forma recomendada para agrupar

El método 'agg' es la forma recomendada para hacer agrupaciones ya que brinda la oportunidad de escalar la cantidad de operaciones escalares a realizar en un mismo DF.

Es claro que si solo realizamos una operación de agrupación, el uso de 'agg' es excesivo, esta es la recomendación oficial de uso.

In [96]:
medallistaXAnio2\
    .groupBy("Sigla","anio") \
    .agg(
        sum("count").alias("Total de medallas"),\
        avg("count").alias("Medallas promedio")
        )\
    .sort(
    col("Total de medallas").desc()
    )\
    .show()

21/11/04 16:29:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jazzzfm/PersonalProjects/JazzzDevelopmentWithSpark/curso-apache-spark-platzi/files/juegos.csv

+-----+-----------+-----------------+------------------+
|Sigla|       anio|Total de medallas| Medallas promedio|
+-----+-----------+-----------------+------------------+
|  URS|1980 Verano|              442|2.8333333333333335|
|  USA|1904 Verano|              356|               4.0|
|  USA|1984 Verano|              352| 2.550724637681159|
|  USA|2008 Verano|              318|3.4565217391304346|
|  GBR|1908 Verano|              305| 4.066666666666666|
|  URS|1988 Verano|              300|2.6548672566371683|
|  URS|1976 Verano|              286| 2.623853211009174|
|  GDR|1980 Verano|              264|2.5142857142857142|
|  USA|2016 Verano|              263| 2.481132075471698|
|  USA|2004 Verano|              262| 2.977272727272727|
|  USA|1996 Verano|              259|2.9101123595505616|
|  USA|2012 Verano|              248|2.7252747252747254|
|  USA|2000 Verano|              242|             3.025|
|  GER|1936 Verano|              224| 2.574712643678161|
|  USA|1992 Verano|            

                                                                                

## Funciones escalares

El ejempo realizado para obtener las medallas ganadas por un pais se migará para poder visualizar como sería integrar SQL a un proceso de Spark

In [97]:
resultadoDF\
    .filter(
        resultadoDF.medalla != "NA"
    )\
    .join(
        deportistaOlimpicoDF,
        deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,
        "left"
    )\
    .join(
        paisesDF,
        paisesDF.id == deportistaOlimpicoDF.equipo_id, 
        "left"
    )\
    .select(
        "medalla",
        "equipo",
        "sigla"
    )\
    .sort(
        col("sigla").desc()
    )\
    .show()

                                                                                

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



El uso de DF como SQL, se usa registrando un DF como tabla temportal.

En el caso de realizar la conexión a una base de datos, este paso puede llegar a ser omitido. Ya que spark estará configurado para poder hacer las conexiones implicitamente

In [98]:
resultadoDF.registerTempTable("resultado")
deportistaOlimpicoDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

El alias asignado, será la forma en la cual sqlContext conocerá el DF internamente, ahora podemos hacer operaciones de forma tradicional.

In [99]:
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [101]:
sqlContext.sql(
                """
                SELECT DISTINCT medalla, equipo, sigla FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla DESC
                """
            )\
            .show()



+-------+--------------------+-----+
|medalla|              equipo|sigla|
+-------+--------------------+-----+
|   Gold|            Zimbabwe|  ZIM|
| Silver|            Zimbabwe|  ZIM|
| Bronze|            Zimbabwe|  ZIM|
| Silver|              Zambia|  ZAM|
|   Gold|          Yugoslavia|  YUG|
| Bronze|          Yugoslavia|  YUG|
| Silver|          Yugoslavia|  YUG|
| Bronze|West Indies Feder...|  WIF|
| Silver|             Vietnam|  VIE|
|   Gold|             Vietnam|  VIE|
|   Gold|           Venezuela|  VEN|
| Bronze|           Venezuela|  VEN|
| Silver|           Venezuela|  VEN|
| Silver|          Uzbekistan|  UZB|
|   Gold|          Uzbekistan|  UZB|
| Bronze|          Uzbekistan|  UZB|
| Bronze|     United States-1|  USA|
|   Gold|       United States|  USA|
|   Gold|Seawanhaka Boat C...|  USA|
|   Gold|New York Athletic...|  USA|
+-------+--------------------+-----+
only showing top 20 rows



                                                                                

In [103]:
from pyspark.storagelevel import StorageLevel

# Persistencia

La persistencia de datos no ocurre por defecto en un DF o RDD de Spark, por lo cual debemos de indicar con el método 'cache', por otro lado, para poder verificar si esta almacenado o no, con el método 'is_cached' verificamos su estatus

In [110]:
medallistaXAnio.is_cached

False

In [109]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[554] at javaToPython at NativeMethodAccessorImpl.java:0

Para poder verificar el tipo de almacenamiento asignado, debemos de conocer el valor de códigos que nos regresa getStorageLevel

Para esto, podemos verificar en la documentación de spark:
https://spark.apache.org/docs/2.4.6/api/python/_modules/pyspark/storagelevel.html

In [111]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

Para poder cambiar el tipo de persistencia debemos de primero retirarla y posterior a eso asignarle la que deseamos.

Con el método persist, asignaremos la persistencia que nosotros deseamos.

In [113]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[554] at javaToPython at NativeMethodAccessorImpl.java:0

In [114]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[554] at javaToPython at NativeMethodAccessorImpl.java:0

In [115]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

In [116]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

Finalmente, podemos crear nuestros propios esquemas de persistencia según las reglas y restricciones de negocio que tengamos en el proyecto.

In [117]:
#def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

In [118]:
medallistaXAnio.rdd.unpersist()
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[554] at javaToPython at NativeMethodAccessorImpl.java:0