In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [2]:
spark = SparkContext(master="local", appName="DataFrames")
sqlContext = SQLContext(spark)

In [3]:
!ls /home/sparky/Documentos/curso-apache-spark-platzi/files

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [4]:
!head -n 5 /home/sparky/Documentos/curso-apache-spark-platzi/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [5]:
path = "/home/sparky/Documentos/curso-apache-spark-platzi/files/"

juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("anio",StringType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False)
])
juegoDF = sqlContext.read.schema(juegoSchema) \
    .option("header","true").csv(path+"juegos.csv")

In [6]:
juegoDF.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [7]:
deportistaOlimpicoRDD = spark.textFile(path+"deportista.csv")\
    .map(lambda l : l.split(","))
deportistaOlimpicoRDD2 = spark.textFile(path+"deportista2.csv")\
    .map(lambda l : l.split(","))
deportistaOlimpicoRDD = deportistaOlimpicoRDD\
    .union(deportistaOlimpicoRDD2)

In [8]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [9]:
def eliminaEncabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [10]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

In [11]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [12]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

In [13]:
schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

In [14]:
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [15]:
deportistaDF.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [16]:
paises_schema = StructType([
    StructField("pais_id", IntegerType(),False),
    StructField("equipo", StringType(),False),
    StructField("sigla", StringType(),False)
])
paisesDF = sqlContext.read.schema(paises_schema) \
    .option("header","true").csv(path+"paises.csv")

In [17]:
paisesDF.show(5)

+-------+--------------------+-----+
|pais_id|              equipo|sigla|
+-------+--------------------+-----+
|      1|         30. Februar|  AUT|
|      2|A North American ...|  MEX|
|      3|           Acipactli|  MEX|
|      4|             Acturus|  ARG|
|      5|         Afghanistan|  AFG|
+-------+--------------------+-----+
only showing top 5 rows



In [18]:
resultados_schema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])
resultadosDF = sqlContext.read.schema(resultados_schema) \
    .option("header","true").csv(path+"resultados.csv")

In [19]:
resultadosDF.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [20]:
evento_schema = StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])
eventoDF = sqlContext.read.schema(evento_schema)\
    .option("header","true").csv(path+"evento.csv")

In [21]:
eventoDF.show(5)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
+---------+--------------------+----------+
only showing top 5 rows



In [22]:
eventoDF.take(5)

[Row(evento_id=1, evento="Basketball Men's Basketball", deporte_id=1),
 Row(evento_id=2, evento="Judo Men's Extra-Lightweight", deporte_id=2),
 Row(evento_id=3, evento="Football Men's Football", deporte_id=3),
 Row(evento_id=4, evento="Tug-Of-War Men's Tug-Of-War", deporte_id=4),
 Row(evento_id=5, evento="Speed Skating Women's 500 metres", deporte_id=5)]

In [23]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [24]:
deporte_schema = StructType([
    StructField("deporte_id",IntegerType(),False),
    StructField("deporte",StringType(),False)
])
deportesDF = sqlContext.read.schema(deporte_schema)\
    .option("header","true").csv(path+"deporte.csv")

In [25]:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [26]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [27]:
deportistaDF = deportistaDF.withColumnRenamed("genero","sexo").drop("altura")

In [28]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [29]:
from pyspark.sql.functions import *
deportistaDF = deportistaDF.select("deportista_id","nombre",
                 col("edad").alias("edadAlJugar"),"equipo_id")

In [30]:
deportistaDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [31]:
deportistaDF = deportistaDF.filter((deportistaDF.edadAlJugar != 0))

In [32]:
deportistaDF.sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        70616|          Liu Luyang|         11|      199|
|       118925|Megan Olwen Deven...|         11|      413|
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        47618|Sonja Henie Toppi...|         11|      742|
|        76675|   Marcelle Matthews|         11|      967|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|       126307|        Liana Vicens|         11|      825|
|        48939|             Ho Gang|         12|      738|
|        49142|        Jan Hoffmann|         12|      302|
|        42835|   Werner Grieshofer|         12|       7

In [33]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [34]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [35]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- anio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [36]:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [37]:
deportistaDF.join(resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id,"left")\
    .join(juegoDF,juegoDF.juego_id == resultadosDF.juego_id,"left")\
    .join(deportesDF,deportesDF.deporte_id == resultadosDF.evento_id,"left")\
    .select(deportistaDF.nombre,col("edadAlJugar").alias("Edad al jugar"),"medalla",col("anio").alias("Anyo del juego"),deportesDF.deporte.alias("Nombre de disciplina"))\
    .show()

+--------------------+-------------+-------+--------------+--------------------+
|              nombre|Edad al jugar|medalla|Anyo del juego|Nombre de disciplina|
+--------------------+-------------+-------+--------------+--------------------+
|           A Dijiang|           24|     NA|   1992 Verano|          Basketball|
|            A Lamusi|           23|     NA|   2012 Verano|                Judo|
| Gunnar Nielsen Aaby|           24|     NA|   1920 Verano|            Football|
|Edgar Lindenau Aabye|           34|   Gold|   1900 Verano|          Tug-Of-War|
|Christine Jacoba ...|           21|     NA| 1994 Invierno|Cross Country Skiing|
|Christine Jacoba ...|           21|     NA| 1994 Invierno|       Speed Skating|
|Christine Jacoba ...|           21|     NA| 1992 Invierno|Cross Country Skiing|
|Christine Jacoba ...|           21|     NA| 1992 Invierno|       Speed Skating|
|Christine Jacoba ...|           21|     NA| 1988 Invierno|Cross Country Skiing|
|Christine Jacoba ...|      

In [38]:
resultadoGanadorDF = resultadosDF.filter(resultadosDF.medalla != 'NA')

In [39]:
resultadoGanadorDF.show()

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           4|   Gold|            4|       2|        4|
|          38| Bronze|           15|       7|       19|
|          39| Bronze|           15|       7|       20|
|          41| Bronze|           16|      50|       14|
|          42| Bronze|           17|      17|       21|
|          43|   Gold|           17|      17|       22|
|          45|   Gold|           17|      17|       24|
|          49|   Gold|           17|      17|       28|
|          51| Bronze|           17|      19|       22|
|          61|   Gold|           20|      38|       32|
|          62| Bronze|           20|      38|       33|
|          64| Silver|           20|      40|       31|
|          65| Bronze|           20|      40|       32|
|          68| Silver|           20|      40|       35|
|          74|   Gold|           20|      44|   

In [40]:
resultadoGanadorDF\
    .join(deportistaDF, resultadoGanadorDF.deportista_id == deportistaDF.deportista_id,"left")\
    .join(paisesDF, paisesDF.pais_id == deportistaDF.equipo_id,"left")\
    .select(resultadoGanadorDF.medalla,"equipo","sigla")\
    .sort(col("sigla").desc()).show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



In [41]:
medallistaXAnio = deportistaDF\
    .join(resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left")\
    .join(juegoDF, juegoDF.juego_id == resultadosDF.juego_id, "left")\
    .join(paisesDF, deportistaDF.equipo_id == paisesDF.pais_id, "left")\
    .join(eventoDF, eventoDF.evento_id == resultadosDF.evento_id, "left")\
    .join(deportesDF, eventoDF.deporte_id == deportesDF.deporte_id)\
    .select("sigla",
            "anio",
            "medalla",
            eventoDF.evento.alias("Nombre subdisciplina"),
            deportesDF.deporte.alias("Nombre disciplina"),
            deportistaDF.nombre,
           )

In [42]:
medallistaXAnio.show()

+-----+-------------+-------+--------------------+--------------------+--------------------+
|sigla|         anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+--------------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating

In [43]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != "NA")\
    .sort("anio")\
    .groupBy("sigla","anio","Nombre subdisciplina")\
    .count()

In [44]:
medallistaXAnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [45]:
medallistaXAnio2.groupBy("sigla","anio")\
    .agg(sum("count").alias("Total de medallas"),\
        avg("count").alias("Medallas promedio")).show()

+-----+-------------+-----------------+------------------+
|sigla|         anio|Total de medallas| Medallas promedio|
+-----+-------------+-----------------+------------------+
|  NED|1992 Invierno|                4|1.3333333333333333|
|  BEL|  2000 Verano|                7|               1.4|
|  MAS|  2012 Verano|                2|               1.0|
|  MGL|  2008 Verano|                5|              1.25|
|  SWE|  1976 Verano|               10|               2.0|
|  SUI|2014 Invierno|               29|3.2222222222222223|
|  ETH|  2004 Verano|                7|              1.75|
|  AUT|  1928 Verano|                5|              1.25|
|  SYR|  1984 Verano|                1|               1.0|
|  ITA|  1996 Verano|               69| 2.225806451612903|
|  THA|  2008 Verano|                4|               1.0|
|  URS|1984 Invierno|               56|               2.8|
|  DEN|  1896 Verano|                6|               1.0|
|  GRN|  2016 Verano|                1|               1.

In [46]:
resultadosDF.registerTempTable("resultados")
deportistaDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

In [47]:
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [48]:
sqlContext.sql("""
                SELECT medalla,equipo,sigla FROM resultados r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.pais_id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla DESC
                """).show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



In [49]:
spark

In [50]:
!head -n 5 /home/sparky/Documentos/curso-apache-spark-platzi/files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [51]:
deportistaError = spark.textFile(path+"deportistaError.csv")\
    .map(lambda l: l.split(","))

In [52]:
deportistaError = deportistaError.mapPartitionsWithIndex(eliminaEncabezado)

In [53]:
deportistaError.take(2)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [54]:
deportistaError = deportistaError.map(lambda l: (
l[0],
l[1],
l[2],
l[3],
l[4],
l[5],
l[6]))
schema = StructType([
    StructField("deportista_id",StringType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",StringType(),False),
    StructField("edad",StringType(),False),
    StructField("altura",StringType(),False),
    StructField("peso",StringType(),False),
    StructField("equipo_id",StringType(),False)
])
deportistaErrorDF = sqlContext.createDataFrame(deportistaError,schema)

In [55]:
deportistaErrorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [56]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
    return int(valor) if len(valor) > 0 else None

conversionEnteros_udf = udf(lambda z: conversionEnteros(z),IntegerType())
sqlContext.udf.register("conversionEnteros_udf",conversionEnteros_udf)

<function __main__.<lambda>(z)>

In [59]:
deportistaErrorDF.select(conversionEnteros_udf("altura")\
                        .alias("alturaUDF")).show()

+---------+
|alturaUDF|
+---------+
|      180|
|      170|
|     null|
|     null|
|      185|
|      188|
|      183|
|      168|
|      186|
|     null|
|      182|
|      172|
|      159|
|      171|
|     null|
|      184|
|      175|
|      189|
|     null|
|      176|
+---------+
only showing top 20 rows



In [60]:
from pyspark.storagelevel import StorageLevel

In [61]:
medallistaXAnio.is_cached

False

In [62]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[180] at javaToPython at NativeMethodAccessorImpl.java:0

In [65]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [66]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[180] at javaToPython at NativeMethodAccessorImpl.java:0

In [67]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[180] at javaToPython at NativeMethodAccessorImpl.java:0

In [68]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

In [69]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[180] at javaToPython at NativeMethodAccessorImpl.java:0

In [70]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[180] at javaToPython at NativeMethodAccessorImpl.java:0

In [71]:
spark.stop()