# Pyspark notebook with the basics

In [1]:
# Libraries used
from pyspark import SparkContext
from pyspark.sql import SparkSession
#  Necessary libraries to create a Dataframe
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

#### creating an SQL spark context

In [2]:
spark = SparkContext(master="local", appName="Dataframes")
SQLCont = SQLContext(spark)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/30 16:25:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
# bash
!head -n 5 juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


## RDD and Dataframe creations with schemas

In [8]:
# Here we are creating a schema for our csv
games_schema = StructType([
    StructField("juego_id", IntegerType(),False),
    StructField("anio", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)])

GamesDF = SQLCont.read.schema(games_schema)\
.option("header","true").csv("juegos.csv")
                            

In [9]:
GamesDF.show(5)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
|       5|1908 Verano|     1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows



21/12/30 16:26:27 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 4
CSV file: file:///src/Ficheros%20de%20trabajo/juegos.csv


In [10]:
# Creation of a Dataframe from an RDD

In [30]:

olympic_athlete_RDD = spark.textFile("deportista.csv").map(lambda l : l.split(","))

In [31]:
olympic_athlete_RDD .take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [32]:
# we define a function
def delete_header(index, iterator)-> iter:
    """
    this function deletes the header of our
    RDD csv
    """
    return iter(list(iterator)[1:])

In [33]:
olympic_athlete_RDD = olympic_athlete_RDD.mapPartitionsWithIndex(delete_header)

In [34]:
olympic_athlete_RDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [35]:
# Here we remap our RDD
olympic_athlete_RDD = olympic_athlete_RDD.map(lambda l: (int(l[0]),
                              l[1],
                              int(l[2]),
                              int(l[3]),
                              int(l[4]),
                              float(l[5]),
                              int(l[6])
                             ))
                                  
                                  

In [36]:

athlete_schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

In [38]:
athlete_df = SQLCont.createDataFrame(olympic_athlete_RDD, athlete_schema)

In [39]:
athlete_df.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [40]:
athlete_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [41]:
athlete_df = athlete_df.withColumnRenamed("genero","sexo").drop("altura")

In [42]:
athlete_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [43]:
import  pyspark.sql.functions  as F

athlete_df = athlete_df.select("deportista_id","nombre",
                 F.col("edad").alias("edad_al_jugar"),"equipo_id")

In [44]:
athlete_df.show(5)

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|            1|           A Dijiang|           24|      199|
|            2|            A Lamusi|           23|      199|
|            3| Gunnar Nielsen Aaby|           24|      273|
|            4|Edgar Lindenau Aabye|           34|      278|
|            5|Christine Jacoba ...|           21|      705|
+-------------+--------------------+-------------+---------+
only showing top 5 rows



In [45]:

athlete_df.sort("edad_al_jugar").show()

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|          224|     Mohamed AbdelEl|            0|      308|
|          487|      Inni Aboubacar|            0|      721|
|          226|Sanad Bushara Abd...|            0|     1003|
|           58|    Georgi Abadzhiev|            0|      154|
|          230|    Moustafa Abdelal|            0|      308|
|          102|   Sayed Fahmy Abaza|            0|      308|
|          260|  Ahmed Abdo Mustafa|            0|     1003|
|          139|George Ioannis Abbot|            0|     1043|
|          281|      S. Abdul Hamid|            0|      487|
|          163|     Ismail Abdallah|            0|     1095|
|          285|Talal Hassoun Abd...|            0|      497|
|          173| Mohamed Abdel Fatah|            0|     1003|
|          179|Ibrahim Saad Abde...|            0|     1003|
|          378|     Ange

In [46]:
athlete_df = athlete_df.filter((athlete_df.edad_al_jugar !=0))

In [47]:
athlete_df.sort("edad_al_jugar").show()

+-------------+--------------------+-------------+---------+
|deportista_id|              nombre|edad_al_jugar|equipo_id|
+-------------+--------------------+-------------+---------+
|        52070|        Etsuko Inada|           11|      514|
|        22411|Magdalena Cecilia...|           11|      413|
|        40129|    Luigina Giavotti|           11|      507|
|        47618|Sonja Henie Toppi...|           11|      742|
|        37333|Carlos Bienvenido...|           11|      982|
|        51268|      Beatrice Hutiu|           11|      861|
|         5291|Marcia Arriaga La...|           12|      656|
|        24191| Philippe Cuelenaere|           12|       96|
|        42835|   Werner Grieshofer|           12|       71|
|        25877|Olga Lucia de Ang...|           12|      225|
|        31203|Patricia Anne Pat...|           12|      967|
|        43528|Antoinette Joyce ...|           12|      172|
|        46578|        Diana Hatler|           12|      825|
|        48728|      Mar

In [50]:

athlete_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edad_al_jugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [51]:
resultadoDF = SQLCont.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("resultados.csv", inferschema = True)

In [52]:
deporteDF = SQLCont.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("deporte.csv", inferschema = True)

In [53]:
paisesDF = SQLCont.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("paises.csv", inferschema = True)

In [54]:
eventoDF = SQLCont.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("evento.csv", inferschema = True)

In [55]:

resultadoDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: string (nullable = true)



In [32]:
paisesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- equipo: string (nullable = true)
 |-- sigla: string (nullable = true)



In [33]:
JuegosDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- anio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [34]:
deporteDF.show()

+----------+--------------------+
|deporte_id|             deporte|
+----------+--------------------+
|         1|          Basketball|
|         2|                Judo|
|         3|            Football|
|         4|          Tug-Of-War|
|         5|       Speed Skating|
|         6|Cross Country Skiing|
|         7|           Athletics|
|         8|          Ice Hockey|
|         9|            Swimming|
|        10|           Badminton|
|        11|             Sailing|
|        12|            Biathlon|
|        13|          Gymnastics|
|        14|    Art Competitions|
|        15|       Alpine Skiing|
|        16|            Handball|
|        17|       Weightlifting|
|        18|           Wrestling|
|        19|                Luge|
|        20|          Water Polo|
+----------+--------------------+
only showing top 20 rows



## Joins exercises

In [35]:
depor_2 = athlete_df.join(resultadoDF, athlete_df.deportista_id == resultadoDF.deportista_id, "left").join(JuegosDF, JuegosDF.juego_id == resultadoDF.juego_id, "left")\
                  .join(deporteDF, deporteDF.deporte_id == resultadoDF.evento_id,"left").select("nombre",F.col("edad_al_jugar").alias("Edad_al_jugar"),
                                                                              "medalla",F.col("anio").alias("año_de_juego"),
                                                                              deporteDF.deporte.alias("Nombre_disciplina"))

In [36]:
depor_2.show()

21/12/20 22:51:46 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///src/Ficheros%20de%20trabajo/juegos.csv


+--------------------+-------------+-------+-------------+--------------------+
|              nombre|Edad_al_jugar|medalla| año_de_juego|   Nombre_disciplina|
+--------------------+-------------+-------+-------------+--------------------+
|           A Dijiang|           24|     NA|  1992 Verano|          Basketball|
|            A Lamusi|           23|     NA|  2012 Verano|                Judo|
| Gunnar Nielsen Aaby|           24|     NA|  1920 Verano|            Football|
|Edgar Lindenau Aabye|           34|   Gold|  1900 Verano|          Tug-Of-War|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Cross Country Skiing|
|Christine Jacoba ...|           21|     NA|1994 Invierno|       Speed Skating|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Cross Country Skiing|
|Christine Jacoba ...|           21|     NA|1992 Invierno|       Speed Skating|
|Christine Jacoba ...|           21|     NA|1988 Invierno|Cross Country Skiing|
|Christine Jacoba ...|           21|    

In [37]:
medal_organized =resultadoDF.filter(resultadoDF.medalla != "NA")\
.join(athlete_df,athlete_df.deportista_id == resultadoDF.deportista_id, "left").join(paisesDF, paisesDF.id == athlete_df.equipo_id, "left").select("medalla", "equipo", "sigla")

In [38]:
medal_organized.sort(F.col("sigla").desc()).show()

                                                                                

+-------+----------+-----+
|medalla|    equipo|sigla|
+-------+----------+-----+
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Bronze|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|Yugoslavia|  YUG|
| Bronze|Yugoslavia|  YUG|
| Silver|Yugoslavia|  YUG|
| Silver|Yugoslavia|  YUG|
| Bronze|Yugoslavia|  YUG|
+-------+----------+-----+
only showing top 20 rows



In [39]:
medallasxanio = athlete_df \
    .join(resultadoDF, athlete_df.deportista_id == resultadoDF.deportista_id, "left")\
    .join(JuegosDF, JuegosDF.juego_id == resultadoDF.juego_id,"left")\
    .join(paisesDF, athlete_df.equipo_id == paisesDF.id, "left") \
    .join(eventoDF, eventoDF.evento_id == resultadoDF.evento_id, "left") \
    .join(deporteDF, deporteDF.deporte_id == eventoDF.deporte_id,"left")\
    .select("sigla", "anio","medalla", eventoDF.evento.alias("nombre Subdiciplina"),
            deporteDF.deporte.alias("nombre disciplica"),
            athlete_df.nombre,)

In [40]:
medallasxanio.sort(F.col("nombre disciplica").asc()).show()

21/12/20 22:51:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///src/Ficheros%20de%20trabajo/juegos.csv


+-----+-------------+-------+--------------------+-----------------+--------------------+
|sigla|         anio|medalla| nombre Subdiciplina|nombre disciplica|              nombre|
+-----+-------------+-------+--------------------+-----------------+--------------------+
|  GBR|  1908 Verano|   Gold|                null|             null|John Charles Fiel...|
|  GBR|  1900 Verano| Silver|                null|             null|John Selwin Calve...|
|  GBR|  1908 Verano|     NA|                null|             null|John Marshall Gorham|
|  FRA|  1900 Verano|     NA|                null|             null|Marie Calixte Ann...|
|  GBR|  1908 Verano|     NA|                null|             null|Sophia Hope Gorha...|
|  NOR|1992 Invierno|     NA|Alpine Skiing Men...|    Alpine Skiing|  Kjetil Andr Aamodt|
|  PAK|2010 Invierno|     NA|Alpine Skiing Men...|    Alpine Skiing|      Muhammad Abbas|
|  GEO|1998 Invierno|     NA|Alpine Skiing Men...|    Alpine Skiing|  Levan Abramishvili|
|  NOR|199

In [41]:
 medallista_anio_filtered =medallasxanio.filter(medallasxanio.medalla != "NA")\
    .sort("anio")\
    .groupBy("sigla","anio","nombre Subdiciplina") \
    .count()

In [42]:
medallista_anio_filtered.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- nombre Subdiciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [43]:
medallista_anio_filtered.groupBy("sigla","anio").agg(F.sum("count").alias("Total de medallas"),F.avg("count").alias("Medallas promedio")).sort(F.col("sigla").asc()).show()

21/12/20 22:51:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///src/Ficheros%20de%20trabajo/juegos.csv
[Stage 38:>                                                         (0 + 1) / 1]

+-----+-----------+-----------------+------------------+
|sigla|       anio|Total de medallas| Medallas promedio|
+-----+-----------+-----------------+------------------+
|  AHO|1988 Verano|                1|               1.0|
|  ALB|2016 Verano|                1|               1.0|
|  ALG|1992 Verano|                1|               1.0|
|  ALG|2000 Verano|                2|               1.0|
|  ALG|2008 Verano|                2|               1.0|
|  ALG|1996 Verano|                1|               1.0|
|  ANZ|1912 Verano|                6|               1.5|
|  ANZ|1920 Verano|                3|               1.0|
|  ANZ|1924 Verano|                2|               1.0|
|  ANZ|1908 Verano|               10|               2.0|
|  ARG|2004 Verano|               27|               5.4|
|  ARG|1988 Verano|                7|               7.0|
|  ARG|1992 Verano|                1|               1.0|
|  ARG|1936 Verano|                7|              1.75|
|  ARG|1996 Verano|            

                                                                                

In [44]:
!head -n 5 deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [121]:
deportista_malo_RDD = spark.textFile("deportistaError.csv").map(lambda x: x.split(","))

In [122]:
def elimina_encabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [123]:
deportista_malo_RDD = deportista_malo_RDD.mapPartitionsWithIndex(elimina_encabezado)

In [124]:
deportista_malo_RDD.take(10)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '', '', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', '"Cornelia ""Cor"" Aalten (-Strannood)"', '2', '18', '168', '', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350'],
 ['10', '"Einar Ferdinand ""Einari"" Aalto"', '1', '26', '', '', '350']]

In [125]:
deportista_malo_RDD = deportista_malo_RDD.map(lambda l:(
                        l[0],
                        l[1],
                        l[2],
                        l[3],
                        l[4],
                        l[5],
                        l[6]))

deportista_malo_schema = StructType([
    StructField("deportista_id", StringType(), False),
    StructField("nombre",StringType(), False),
    StructField("genero", StringType(), False),
    StructField("edad", StringType(), False),
    StructField("altura", StringType(), False),
    StructField("peso", StringType(), False),
    StructField("equipo_id", StringType(), False),
])
deportista_error_DF = SQLCont.createDataFrame(deportista_malo_RDD, deportista_malo_schema)  

In [126]:
deportista_error_DF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [127]:
#creación de la Udf

def conversion_enteros(valor):
    return int(valor) if len(valor) > 0 else None


In [128]:
conversion_enteros_udf = F.udf(lambda z: conversion_enteros(z),IntegerType())
SQLCont.udf.register("conversion_enteros_udf", conversion_enteros_udf)

21/12/20 23:53:38 WARN SimpleFunctionRegistry: The function conversion_enteros_udf replaced a previously registered function.


<function __main__.<lambda>(z)>

In [133]:
deportista_error_DF.select(conversion_enteros_udf("altura").alias("alturaUDf")).show(10)

+---------+
|alturaUDf|
+---------+
|      180|
|      170|
|     null|
|     null|
|      185|
|      188|
|      183|
|      168|
|      186|
|     null|
+---------+
only showing top 10 rows



Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/lib/python3.8/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/lib/python3.8/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/lib/python3.8/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


## persistencia y replicación

In [134]:
from pyspark.storagelevel import StorageLevel

In [135]:
medallista_anio_filtered.is_cached

False

In [136]:
medallasxanio.is_cached

False

In [137]:
medallasxanio.rdd.cache()

21/12/21 00:06:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///src/Ficheros%20de%20trabajo/juegos.csv


MapPartitionsRDD[323] at javaToPython at NativeMethodAccessorImpl.java:0

In [141]:
medallasxanio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [144]:
medallasxanio.rdd.unpersist()

MapPartitionsRDD[323] at javaToPython at NativeMethodAccessorImpl.java:0

In [145]:
medallasxanio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[323] at javaToPython at NativeMethodAccessorImpl.java:0

In [146]:
medallasxanio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

In [151]:
#creando nuestra propia persistencia
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [148]:
medallasxanio.rdd.unpersist()

MapPartitionsRDD[323] at javaToPython at NativeMethodAccessorImpl.java:0

In [149]:
medallasxanio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[323] at javaToPython at NativeMethodAccessorImpl.java:0

In [150]:
medallasxanio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 3)