# RDD- Data frame


 # Conversions entre dataframe y RDD
 
 ## suponiendo que myDF es un dataframe, se usa la propiedad rdd para convertir de df a RDD
 
 myRDD=  myDF.rdd
 
 ## y desde Rdd podemos pasar a data frame mediante la funcion toDF()
 
 myDF2 = myRDD.toDF()
 
 

In [4]:
#en python las funciones lambda tienen solo una instruccion
#cuando queremos hacer varias instrucciones se puede usar una funcion

from datetime import datetime, date
#import pandas as pd
from pyspark.sql import Row

df = spark.read.options(header='True', inferSchema='True').csv("../2019-Dec.csv")


def myFunc(s):
    if s["brand"]=="riche" and s["event_type"]=="cart":
        return [(s["product_id"],1)]
    return[]

lines =df.rdd.flatMap(myFunc).reduceByKey(lambda a, b : a+b)
for l in lines.collect():
    print(l)

  


[Stage 4:>                                                          (0 + 4) / 4]

(5842204, 2)
(5842224, 2)
(5842217, 2)
(5842213, 2)
(5842205, 2)
(5842214, 2)
(5842202, 4)
(5846098, 10)
(5842258, 4)
(5842222, 2)
(5842235, 1)
(5842203, 1)
(5842223, 1)


                                                                                

In [6]:
#convertimos de RDD a DF
myDF2 = lines.toDF()
myDF2.show()

+-------+---+
|     _1| _2|
+-------+---+
|5842204|  2|
|5842224|  2|
|5842217|  2|
|5842213|  2|
|5842205|  2|
|5842214|  2|
|5842202|  4|
|5846098| 10|
|5842258|  4|
|5842222|  2|
|5842235|  1|
|5842203|  1|
|5842223|  1|
+-------+---+



# Persistencia de los datos

Una característica muy importante de Spark es que puede conservar el RDD/DF en la memoria. Cuando el RDD/DF se persiste, cada nodo conservará en memoria la partición del RDD en la que opera. Esto permite que operaciones posteriores sobre RDD no se tengan que calcular, ya que se hace uso de los datos en memoria

<b>Uso normal de Spark sin persistencia</b>

Dado el siguiente ejemplo, tras la construccion del RDD, se realizan varias acciones y transformaciones. Por cada accion se construye el RDD necesario para ejecutar la accion

datos = sc.parallelize ([1,2,3,4,5,6,7,8,9,10])
doble = datos.<b>map</b>(lambda x: x*2)

print ("la suma de los elementos x2 es %d" <b> %doble.sum()</b>)

menor6 = doble.<b>filter</b>(lambda x : x<6)

print ("Hax %d elementos menores que 6" <b>menor6.count()</b>)


Para grandes cantidades de datos, el recalcular varias veces una misma transformación conllevará un sobrecoste nada despreciable en terminos de tiempo. 
Spark posee métodos para guardar las tranformaciones y evitar recálculos

* cache() --> cache guarda los datos en el nivel de almacenamiento por defecto "memoria principal"
* persist() --> permite mediante un parámetro, elegir un nivel de almacenamiento entre diferenters opciones tanto en memoria como en disco:

    - MEMORY_ONLY ( COMPORTAMIENTO POR DEFECTO): Guarda los datos en memoria, los datos que no caben se recalculan cada vez.
    - MEMORY_AND_DISK: Guarda los datos en memoria, los datos que no caben se guardan en disco.
    - MEMORY_ONLY_2, MEMORY_AND_DISK_2: equivalente a las dos anteriores pero guarda los datos en 2 nodos.
    - MEMORY_AND_DISK_SER  (SERIALIZADO)
    - DISK_ONLY
    - MEMORY_ONLY_SER  //SIMILAR A MEMORY ONLY, PERO OCUPA MENOS MEM, PROQUE SERIALIZA LOS OBJETOS
    
    ref: https://medium.com/iwannabedatadriven/spark-performace-cache-persist-iii-db4fa1bcc2c0
    




In [16]:
#ejemplo sin persistencia
datos = sc.parallelize ([1,2,3,4,5,6,7,8,9,10])
doble = datos.map(lambda x: x*2)
print ("la suma de los elementos x2 es %d" %doble.sum())
menor6 = doble.filter(lambda x : x<6)
print ("Hax %d elementos menores que 6" %menor6.count())

la suma de los elementos x2 es 110
Hax 2 elementos menores que 6


In [2]:
#ejemplo con persistencia

from pyspark import SparkContext
import pyspark

datos = sc.parallelize ([1,2,3,4,5,6,7,8,9,10])
doble = datos.map(lambda x: x*2)
doble.persist(pyspark.StorageLevel.MEMORY_ONLY)

print ("la suma de los elementos x2 es %d" %doble.sum())
menor6 = doble.filter(lambda x : x<6)
print ("Hax %d elementos menores que 6" %menor6.count())

[Stage 4:>                                                          (0 + 4) / 4]

la suma de los elementos x2 es 110
Hax 2 elementos menores que 6


                                                                                

In [4]:
#ejemplo tienda virtual
import time
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark4").getOrCreate()
t1= time.time()


df = spark.read.options(header='True', inferSchema='True').csv("../2019-Dec.csv")
df.unpersist()
df.count()
df.persist(pyspark.StorageLevel.MEMORY_ONLY)
df2=df.filter("event_type='cart'").filter("brand='runail'").head(2)
df.select("event_type").distinct().show()


t2= time.time()

dif = t2-t1
print (dif)


                                                                                

+----------------+
|      event_type|
+----------------+
|        purchase|
|            view|
|            cart|
|remove_from_cart|
+----------------+

6.5876171588897705


  # broadcast y Acumuladores de spark
   
 http://sparkbyexamples.com/spark/spark-broadcast-variables
 
## Broadcast (variables globales de solo lectura)
las variables de tipo broadcast son variables de solo lectura que se pueden consultar en todos los nodos
 
## Acumuladores  (varialbles globales que se pueden modificas)
las variables globales tradicionales no funcionan en Spark, se tuenen que usar acumuladores

    
    

In [13]:
broadcast = spark.sparkContext.broadcast([1,2,3])
print (broadcast.value)



[1, 2, 3]


In [14]:
accum =  spark.sparkContext.accumulator (0)
sumatorioTradicional =0

def suma(x):
    global sumatorioTradicional
    
    accum.add(x)
    sumatorioTradicional+x
    
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])
rdd.foreach(suma)

print (accum)
print (sumatorioTradicional)


    
    


21
0
