In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType,FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext.getOrCreate() 

In [4]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

### Juego DataFrame

In [5]:
path = "resources/files/"
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("anio", StringType(), False),
    StructField("Ciudad",StringType(), False)
])

juegoDF = sqlContext.read.schema(juegoSchema) \
    .option("header", "true") \
    . csv(path + "juegos.csv")

### Deportista Olimpico

In [59]:
deportistaSchema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), True),
    StructField("edad", IntegerType(),True), 
    StructField("altura",IntegerType(), True),
    StructField("peso", FloatType(),True),
    StructField("equipo_id",IntegerType(), False),
])

In [60]:
deportistaDF = sqlContext.read.schema(deportistaSchema) \
    .option("header", "true") \
    .csv(path + "deportista.csv")

### Equipos Olimpicos

In [20]:
# ESQUEMA
paisesSchema = StructType([
    StructField("id", IntegerType(), False),
    StructField("equipo", StringType(),True),
    StructField("sigla", StringType(),True)
])

In [24]:
# DATA FRAME
paisesDF = sqlContext.read.schema(paisesSchema) \
    .option("header", "true") \
    .csv(path + "paises.csv")

###  Resultados Olimpicos

In [25]:
!head -n 5 resources/files/resultados.csv

resultado_id,medalla,deportista_id,juego_id,evento_id
1,NA,1,39,1
2,NA,2,49,2
3,NA,3,7,3
4,Gold,4,2,4


In [29]:
# SCHEMA
resultadosSchema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), True),
    StructField("deportista_id", IntegerType(), True),
    StructField("juego_id", IntegerType(), True),
    StructField("evento_id",IntegerType(),True)
])

In [30]:
# DATA FRAME
resultadosDF = sqlContext.read.schema(resultadosSchema) \
    .option("header", "true") \
    .csv(path + "resultados.csv")

In [32]:
!ls resources/files/

deporte.csv           deportistaError.csv   modelo_relacional.jpg
deportista.csv        evento.csv            paises.csv
deportista2.csv       juegos.csv            resultados.csv


#### Deporte

La función **printschema** nos ayuda a poder ver la representación de nuestro esquema de datos.

In [37]:
deporteSchema = StructType([
    StructField("deporte_id", IntegerType(),False),
    StructField("deporte", StringType(), True) 
    ])

In [44]:
deporteDF = sqlContext.read.schema(deporteSchema) \
    .option("header","true") \
    .csv(path + "deporte.csv")

In [46]:
deporteDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [47]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



#### Juego Data Frame

In [65]:
!head -n 5 resources/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [66]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("nombre_juego",StringType(), True ),
    StructField("anio", IntegerType(), True),
    StructField("temporada", StringType(), True),
    StructField("ciudad", StringType(), True)])

In [67]:
juegoDF  = sqlContext.read.schema(juegoSchema) \
    .option("header", "true") \
    .csv(path + "juegos.csv")

### Renombrado de columnas

Para renombrar una columna en spark lo que podemos hacer es ocupar la función **withColumnRenamed()** la cual va a recibir como primer parámetro la columna que queremos renombrar y como segundo su nombre. 

**Eliminar columnas:** Para eliminar columnas de nuestro data frame lo haremos mediante la función drop la cual va a recibir como parámetro .

### Dataframe deportista

In [48]:
deportistaDF = deportistaDF.withColumnRenamed("genero", "sexo").drop("altura")

In [50]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



#### Importante cuando realizamos una operación en un dataframe esta operación vuelve a cargar el data frame de nuevo es decir es una operación iterativa.

## Operador Select

el procesamiento de la funcion col es distinto ya que genera una lista envivo que va a tener todos los valores de las columnas y de esta forma hacer operaciones más pesadas.

In [52]:
from pyspark.sql.functions import *
deportistaDF = deportistaDF.select("deportista_id", "nombre",
                    col("edad").alias("edadAlJugar"),"equipo_id")

In [54]:
deportistaDF.show(3)

+-------------+-------------------+-----------+---------+
|deportista_id|             nombre|edadAlJugar|equipo_id|
+-------------+-------------------+-----------+---------+
|            1|          A Dijiang|         24|      199|
|            2|           A Lamusi|         23|      199|
|            3|Gunnar Nielsen Aaby|         24|      273|
+-------------+-------------------+-----------+---------+
only showing top 3 rows



In [55]:
deportistaDF.sort("edadAlJugar").show(10)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|           54|Mohamed Jamshid A...|          0|      496|
|           58|    Georgi Abadzhiev|          0|      154|
|           66|     Mohamed Abakkar|          0|     1003|
|          133|           Franz Abb|          0|      399|
|          102|   Sayed Fahmy Abaza|          0|      308|
|          139|George Ioannis Abbot|          0|     1043|
|          163|     Ismail Abdallah|          0|     1095|
|          167|Ould Lamine Abdallah|          0|      362|
|          173| Mohamed Abdel Fatah|          0|     1003|
|          176|Mahmoud Atter Abd...|          0|     1095|
+-------------+--------------------+-----------+---------+
only showing top 10 rows



### Filtrado de valores en SPARK
Para poder realizar una busqueda o un filtrado de valores lo haremos mediante una función  filter.
Si deseo hacer un filter con diferentes sentencias lo que tendre que hacer para anidarlas será ponerlas en parentesís de la siguiente manera:

dataFrame.filter((Sentencia1) & (sentencia2) & .... & (sentencian))

In [57]:
deportistaDF = deportistaDF.filter((deportistaDF.edadAlJugar != 0))

In [58]:
deportistaDF.show(10)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
|            6|     Per Knut Aaland|         31|     1096|
|            7|        John Aalberg|         31|     1096|
|            8|Cornelia Cor Aalt...|         18|      705|
|            9|    Antti Sami Aalto|         26|      350|
|           10|Einar Ferdinand E...|         26|      350|
+-------------+--------------------+-----------+---------+
only showing top 10 rows



In [98]:
paiseDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- equipo: string (nullable = true)
 |-- sigla: string (nullable = true)



## Esquema de valores donde se realizará un join

In [61]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [62]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [70]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- nombre_juego: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [73]:
!ls resources/files/

deporte.csv           deportistaError.csv   modelo_relacional.jpg
deportista.csv        evento.csv            paises.csv
deportista2.csv       juegos.csv            resultados.csv


In [76]:
!head -n 5 resources/files/evento.csv

evento_id,evento,deporte_id
1,Basketball Men's Basketball,1
2,Judo Men's Extra-Lightweight,2
3,Football Men's Football,3
4,Tug-Of-War Men's Tug-Of-War,4


In [79]:
deportesOlimpicosSchema = StructType([
    StructField("evento_id", IntegerType(), True),
    StructField("nombre", StringType(), False),
    StructField("deporte_id", IntegerType(),True)])

In [81]:
deportesOlimpicosDF = sqlContext.read.schema(deportesOlimpicosSchema)\
    .option("header", "true") \
    .csv(path + "evento.csv")


In [82]:
deportesOlimpicosDF.show(10)

+---------+--------------------+----------+
|evento_id|              nombre|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
|        6|Speed Skating Wom...|         5|
|        7|Cross Country Ski...|         6|
|        8|Cross Country Ski...|         6|
|        9|Cross Country Ski...|         6|
|       10|Cross Country Ski...|         6|
+---------+--------------------+----------+
only showing top 10 rows



# Join  en Spark
Para realizar el Join lo haremos mediante la función join y a la cual le tendremos que pasar lo siguiente parámetros:
- La tabla con la cual se va a cruzar 
- el tipo de join que se va a realizar

In [97]:
deportistaDF.join(resultadosDF,deportistaDF.deportista_id == resultadosDF.deportista_id, "left") \
            .join(juegoDF,juegoDF.juego_id == resultadosDF.juego_id, "left") \
            .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadosDF.evento_id, "left") \
            .select(deportistaDF.nombre, col("edad").alias("edad_al_jugar"),
                    "medalla", col("anio").alias("anio_de_juego"),
                   deportesOlimpicosDF.nombre.alias("nombre_disciplina")).show(10)

+--------------------+-------------+-------+-------------+--------------------+
|              nombre|edad_al_jugar|medalla|anio_de_juego|   nombre_disciplina|
+--------------------+-------------+-------+-------------+--------------------+
|           A Dijiang|           24|     NA|         1992|Basketball Men's ...|
|            A Lamusi|           23|     NA|         2012|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|         1920|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|         1900|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|         1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|         1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|         1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|         1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|         1988|Speed Skating Wom...|
|Christine Jacoba ...|           21|    

## Importante
Cuando nosotros escribimos un join lo que hacemos es ponerlo en una sola linea ya que de otro modo  no corre nuestro programa.


In [107]:
resultadosDF.filter(resultadosDF.medalla != "NA") \
    .join(deportistaDF, resultadosDF.deportista_id == deportistaDF.deportista_id, "left") \
    .join(paisesDF, paisesDF.id == deportistaDF.equipo_id, "left") \
    .select("Medalla","equipo","sigla") \
    .sort(col("sigla").desc()).show(10)

+-------+--------+-----+
|Medalla|  equipo|sigla|
+-------+--------+-----+
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 10 rows



In [164]:
medallistaXAnio = deportistaDF \
    .join(resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left") \
    .join(juegoDF, juegoDF.juego_id == resultadosDF.juego_id,"left") \
    .join(paisesDF, paisesDF.id == deportistaDF.equipo_id, "left" ) \
    .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadosDF.evento_id, "left") \
    .join(deporteDF, deporteDF.deporte_id == deportesOlimpicosDF.deporte_id, "left") \
    .select( "sigla",
            "anio",
            "medalla",
           deportesOlimpicosDF.nombre.alias("nombre_subdisciplina"),
           deporteDF.deporte.alias("nombre_disciplina"),
           deportistaDF.nombre)

In [165]:
medallistaXAnio.show(10)

+-----+----+-------+--------------------+-----------------+--------------------+
|sigla|anio|medalla|nombre_subdisciplina|nombre_disciplina|              nombre|
+-----+----+-------+--------------------+-----------------+--------------------+
|  CHN|1992|     NA|Basketball Men's ...|       Basketball|           A Dijiang|
|  CHN|2012|     NA|Judo Men's Extra-...|             Judo|            A Lamusi|
|  DEN|1920|     NA|Football Men's Fo...|         Football| Gunnar Nielsen Aaby|
|  SWE|1900|   Gold|Tug-Of-War Men's ...|       Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1994|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1992|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1992|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1988|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1988|     NA|Speed Sk

##  Operador Group By

In [174]:
medallistaXanio2 = medallistaXAnio.filter(medallistaXAnio.medalla !=  "NA") \
    .sort("anio") \
    .groupBy("sigla", "anio", "nombre_subdisciplina") \
    .count()

In [175]:
medallistaXanio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- nombre_subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



Segun la documentación de spark la mejor forma de realizar un agrupación es mediante el operador **agg** o también llamada  agregación la cual nos va a permit

In [176]:
medallistaXanio2.groupBy("sigla", "anio") \
    .agg(sum("count").alias("total_de_medallas"), avg("count").alias("medallas_promedio")) \
    .show()

+-----+----+-----------------+------------------+
|sigla|anio|total_de_medallas| medallas_promedio|
+-----+----+-----------------+------------------+
|  USA|2012|              121|1.9836065573770492|
|  FRA|2006|               12|1.3333333333333333|
|  BLR|2000|                9|               1.8|
|  FIN|1988|               10|               2.5|
|  KOR|2010|                3|               1.5|
|  FRA|1948|               52|              2.08|
|  GBR|2000|               30|1.5789473684210527|
|  QAT|2012|                2|               1.0|
|  JPN|1932|               11|               2.2|
|  FRG|1994|                2|               1.0|
|  NED|1972|                7|               1.4|
|  GER|1932|               35|1.8421052631578947|
|  NZL|1988|               11|             1.375|
|  AUS|1972|               13|1.1818181818181819|
|  BAH|2008|                2|               2.0|
|  SWE|1968|               11|             1.375|
|  KOR|1988|               43|2.0476190476190474|


## SQL en SPARK

Para poder realizar operacione de tipo **SQL** en SPARK lo que tendremos que hacer es crear resgistro temporales esto lo haremos mediante la función **registerTempTable()** la cual va a recibir como parámetro el nombre de nuestra tabla.

In [180]:
resultadosDF.registerTempTable("resultado")
deportistaDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

In [181]:
sqlContext.sql("SELECT * FROM deportista").show(10)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
|            6|     Per Knut Aaland|     1|  31|   188|75.0|     1096|
|            7|        John Aalberg|     1|  31|   183|72.0|     1096|
|            8|Cornelia Cor Aalt...|     2|  18|   168| 0.0|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|96.0|      350|
|           10|Einar Ferdinand E...|     1|  26|     0| 0.0|      350|
+-------------+--------------------+------+----+------+----+---------+
only s

In [186]:
paisesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- equipo: string (nullable = true)
 |-- sigla: string (nullable = true)



In [187]:
query = """
         SELECT medalla, equipo,sigla FROM resultado r
         JOIN deportista d
         ON r.deportista_id = d.deportista_id
         JOIN paises p 
         ON p.id = d.equipo_id 
         WHERE medalla <> "NA"
         ORDER BY sigla DESC
        """

sqlContext.sql(query).show(10)

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 10 rows



## SQL vs Data Frames 
Algo que no puede surgir como duda es , ¿Porque no manejamos como **SQL** todos nuestros dataframes? y la respuesta es que es más lento en comparación al manejo que se haría ocupando  Spark, por lo que es mejor manjearlo de esta manera.

### Las funciones definidas por el usuario o UDF, 
por sus siglas en inglés, son una funcionalidad agregada en Spark para definir funciones basadas en columnas las cuales permiten extender las capacidades de Spark al momento de transformar el set de datos.

- Este tipo de implementaciones son convenientes cuando tenemos un desarrollo extenso donde hemos identificado la periodicidad de tareas repetitivas como suele ser en pasos de limpieza de datos, transformación o renombrado dinámico de columnas.

- Por lo anterior es común encontrar en un proyecto de Spark una librería independiente donde existen todas estas funciones agregadas para que los desarrolladores involucrados en el proyecto puedan usarlas a conveniencia.

- El uso de UDF no implica que las funciones que podemos crear nativamente con Python, Scala, R o Java no sean útiles. Una UDF tiene el objetivo de ofrecer un estándar interno en el proyecto que nos encontremos realizando. Además, en caso de ser necesario, una UDF puede ser modificada con ayuda de decoradores para que sea más extensible en diversos escenarios a los cuales nos podemos enfrentar.

In [189]:
!head -n 5 resources/files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [226]:
deportistaError = sc.textFile( path + "deportistaError.csv") \
    .map(lambda l : l.split(","))

In [227]:
deportistaError.take(2)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199']]

In [228]:
def eliminaEncabezado(index, iterator):
    return iter(list(iterator)[1:])

In [229]:
deportistaError = deportistaError.mapPartitionsWithIndex(eliminaEncabezado)

In [230]:

deportistaError.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '', '', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [233]:
### Mapeo del archivo
deportistaError = deportistaError.map(lambda l : (l[0],l[1],l[2],l[3],l[4],l[5],l[6]))

## Estructura que vamos a ocupar para el data frame
deportistaErrorSchema = StructType([
    StructField("deportista_id", StringType(), False),
    StructField("nombre", StringType(),False),
    StructField("genero",StringType(),False),
    StructField("edad",StringType(),False ),
    StructField("altura",StringType(), False),
    StructField("peso",StringType() , False),
    StructField("equipo_id", StringType(), False)
])


In [236]:
deportistaErrorDF = sqlContext.createDataFrame(deportistaError, deportistaErrorSchema)

## Definición de función UDF

Para hacer una función que se aplique a nuestros dataframes lo que haremos en spark será lo siguiente:
- Definir la función que vamos a ocupar.
- Pasar la función a formato UDF.
- Dar de alta la función en formato UDF, para hacer esto tendremos que pasar la función register dos parámetros :
    - Nombre de función como será identificada por spark
    - La función en formato UDF 
    
de esta forma declaramos nuestras funciones nativas en spark.

In [238]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
    return int(valor) if len(valor) > 0 else None

# definimos la funcion en un formato udf
conversionEnteros_udf = udf(lambda z: conversionEnteros(z), IntegerType())
# Damos de alta nuestra funcion como de tipo udf
sqlContext.udf.register("converisionEnteros_udf", conversionEnteros_udf)

<function __main__.<lambda>(z)>

In [239]:
deportistaErrorDF.select(conversionEnteros_udf("altura") \
                        .alias("alturaUDF")).show()

+---------+
|alturaUDF|
+---------+
|      180|
|      170|
|     null|
|     null|
|      185|
|      188|
|      183|
|      168|
|      186|
|     null|
|      182|
|      172|
|      159|
|      171|
|     null|
|      184|
|      175|
|      189|
|     null|
|      176|
+---------+
only showing top 20 rows



# Particionado de da datos

- Como se ha descrito en clases pasadas, los RDD son la capa de abstracción primaria para poder interactuar con los datos que viven en nuestro ambiente de Spark. Aunque estos puedan ser enmascarados con un esquema dotándolos de las facultades propias de los DataFrames, la información de fondo sigue operando como RDD.

- Por lo tanto, la información, como indica el nombre de los RDD, se maneja de forma distribuida a lo largo del clúster, facilitando las operaciones que se van a ejecutar, ya que segmentos de información pueden encontrarse en diferentes ejecutores reduciendo el tiempo necesario para acceder a la información y poder así realizar los cálculos necesarios.

- Cuando un RDD o Dataframe es creado, según las especificaciones que se indiquen a la aplicación de Spark, creará un esquema de particionado básico, el cual distribuirá los datos a lo largo del clúster. Siendo así que al momento de ejecutar una acción, esta se ejecutará entre los diversos fragmentos de información que existan para poder así realizar de la forma más rápida las operaciones. Es por eso que un correcto esquema de particionado es clave para poder tener aplicaciones rápidas y precisas que además consuman pocos recursos de red.

- Otra de las tareas fundamentales es la replicación de componentes y sus fragmentos, ya que al aumentar la disponibilidad de estos podremos asegurar una tolerancia a fallos, mientras más se replique un valor es más probable que no se pierda si existe un fallo de red o energía, además de permitir una disponibilidad casi inmediata del archivo buscado.

- La partición y replicación son elementos que deben ser analizados según el tipo de negocio o requerimientos que se tengan en el desarrollo que se encuentre en progreso, por lo cual la cantidad de datos replicados o granularidad de datos existentes en los fragmentos dependerá en función de las reglas de negocio.



## Persistencia
 Problemas al usar un RDD o DF varias veces:
 - Al ser spark de ejecución perezosa recomputa  su componente y sus dependencias cada vez que se ejecuta una acción.
 - Es costoso (Especialmente en problemas iterativos.)

Solución : 
Una solución para eso es lo siguiente:
- Conservar el componente en memorio y/o disco.
- Métodos **cache()** o **persist()** nos ayudan.
- En PySpark los datos son almacenados de forma serializada.

In [240]:
from pyspark.storagelevel import StorageLevel

In [241]:
medallistaXAnio.is_cached

False

1. El primer paso en la persistencia de los datos en spark será mandar nuestro RDD a memoria cache de la siguiente forma :


In [243]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[541] at javaToPython at NativeMethodAccessorImpl.java:0

2. Para poder ver en que nivel de cache se encuentra nuestro rdd lo haremos con la función **getStorageLevel()**

In [244]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

Si deseamos interpretar lo anterior lo podemos hacer mediante el la documentación [doc](https://spark.apache.org/docs/2.4.6/api/python/pyspark.html#pyspark.StorageLevel)
en resumen el primer true es  para el nivel de disco, el segundo true es para el nivel de cache y el tercer true es para el uso de un heap.

Lo ideal sería que lo tuvieramos en 3 fuentes diferentes para evitar perdida.

Para aplicar la persistencia en un nivel dado lo haremos mediante la función **persist()** la cual va a recibir el nivel de memoria que deseo almacenar.

In [245]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[541] at javaToPython at NativeMethodAccessorImpl.java:0

Lo anterior se realizo ya que al ingresar una persistencia nueva en los datos será necesrio que no tenga una agregada ya que si pasa esto lo que hará spark es mandarnos un error.

In [246]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[541] at javaToPython at NativeMethodAccessorImpl.java:0

Una vez hecho esto lo que sucederá es que nuestro RDD estará persistido en memoria y en disco además de replicado dos veces.
El replicado será sobre el  particionamiento que realiza SPARK.

Pero siguiendo la documentación de spark que nos dice que debe haber una persistencia de dos niveles y particionado de 3 lo que haremos será  aplicar de nuevo el método **unpersist()** y posteriormente a esto definir el nuevo particionado.


In [247]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[541] at javaToPython at NativeMethodAccessorImpl.java:0

### Definición de nuestro propio particionado:

In [248]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [249]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[541] at javaToPython at NativeMethodAccessorImpl.java:0

### Particionado de datos 
Básicamente cuantos lacayos van a ejecutar el trabajo.