In [10]:
from pyspark import SparkContext, SparkConf
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf

from pyspark.sql.types import (
                                StructType,
                                StructField,
                                IntegerType,
                                StringType,
                                FloatType,
                                Row
                            )

In [2]:
conf = SparkConf().set("spark.ui.port", "4051")

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

sqlContext = SQLContext(spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/15 19:33:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




In [3]:
! head -n 5 ./data/deportistasError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [4]:
player_errorRDD = sc.textFile('./data/deportistasError.csv').map(lambda x: x.split(','))
player_errorRDD.take(3)

                                                                                

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [5]:
def remove_header(index, iterator):
    return iter(list(iterator)[1:])

In [6]:
player_errorRDDwoHead = player_errorRDD.mapPartitionsWithIndex(remove_header)
player_errorRDDwoHead.take(3)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273']]

In [9]:
player_errorRDDTrans = player_errorRDDwoHead.map(
    lambda x:(
        x[0],
        x[1],
        x[2],
        x[3],
        x[4],
        x[5],
        x[6]
    )
)

player_e_schema = StructType(
    [
        StructField("deportista_id", StringType(), False),
        StructField("nombre", StringType(), False),
        StructField("genero", StringType(), False),
        StructField("edad", StringType(), False),
        StructField("altura", StringType(), False),
        StructField("peso", StringType(), False),
        StructField("equipo_id", StringType(), False),
    ]
)

player_errorDF = sqlContext.createDataFrame(player_errorRDDTrans, player_e_schema)
player_errorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [11]:
def convertIntegers(x):
    return int(x) if len(x) > 0 else None

In [16]:
convert_integers_udf = udf(
    lambda x: convertIntegers(x),
    IntegerType()
    )
sqlContext.udf.register('convert_integersUDF', convert_integers_udf)

<function __main__.<lambda>(x)>

In [18]:
player_errorDF.select(
    convert_integers_udf('altura').alias('altura_udf')
).show()

+----------+
|altura_udf|
+----------+
|       180|
|       170|
|      null|
|      null|
|       185|
|       188|
|       183|
|       168|
|       186|
|      null|
|       182|
|       172|
|       159|
|       171|
|      null|
|       184|
|       175|
|       189|
|      null|
|       176|
+----------+
only showing top 20 rows



## Persistencia y Particionado

In [19]:
from pyspark.storagelevel import StorageLevel

In [21]:
player_errorDF.is_cached

False

In [22]:
player_errorDF.rdd.cache()

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

In [23]:
player_errorDF.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

*class* **pyspark.StorageLevel**(useDisk, useMemory, useOffHeap, deserialized, replication=1)[source]
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY. Since the data is always serialized on the Python side, all the constants use the serialized formats.

- DISK_ONLY = StorageLevel(True, False, False, False, 1)

- DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

- MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

- MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

- MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

- MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

- MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

- MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

- MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

- MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

- OFF_HEAP = StorageLevel(True, True, True, False, 1)

In [24]:
player_errorDF.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

Py4JJavaError: An error occurred while calling o175.persist.
: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
	at org.apache.spark.errors.SparkCoreErrors$.cannotChangeStorageLevelError(SparkCoreErrors.scala:111)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:168)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:192)
	at org.apache.spark.api.java.JavaRDD.persist(JavaRDD.scala:51)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [25]:
player_errorDF.rdd.unpersist()

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

In [26]:
player_errorDF.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

### Crear replicación y persistencias personalizadas

In [27]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [28]:
player_errorDF.rdd.unpersist()

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

In [29]:
player_errorDF.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[37] at javaToPython at NativeMethodAccessorImpl.java:0

In [30]:
spark.stop()