<a href="https://colab.research.google.com/github/Kaiziferr/projects-tools-spark/blob/main/workshop/01_basic_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
!pip install pyspark



# **Session**

---



In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.master("local[*]").appName('A').getOrCreate()

In [15]:
spark

# **RDD**
---

In [16]:
# Define Spark Session
sc = spark.sparkContext
sc

In [17]:
# RDD Empty
rdd_empty = sc.emptyRDD()
rdd_empty

EmptyRDD[8] at emptyRDD at NativeMethodAccessorImpl.java:0

In [18]:
# Create to a RDD Paralelize with 3 partitions
rdd_empty = sc.parallelize([], 3)
rdd_empty.getNumPartitions()

3

In [19]:
# Create to a RDD Paralelize with 2 partitions
rdd = sc.parallelize([1,2,3,4,5], 2)
rdd.collect(), rdd.getNumPartitions()

([1, 2, 3, 4, 5], 2)

In [20]:
rdd.take(2)

[1, 2]

In [21]:
# Create a RDD from text
rdd_text = sc.textFile('./data/rdd_source.txt')
rdd_text.collect(), rdd_text.getNumPartitions()

(['Hola se me', 'Olvido guardar', 'La información', 'de el Archivo'], 2)

In [22]:
# Crear a RDD text full file
rdd_text_full = sc.wholeTextFiles('./data/rdd_source.txt')
rdd_text_full.collect(), rdd_text_full.getNumPartitions()

([('file:/content/data/rdd_source.txt',
   'Hola se me\r\nOlvido guardar\r\nLa información\r\nde el Archivo')],
 1)

In [23]:
# Any Operation
rdd_sum = rdd.map(lambda x: x+1)
rdd_sum.collect()

[2, 3, 4, 5, 6]

In [24]:
rdd = sc.parallelize([
    'azul',
    'azul',
    'verde',
    'rojo',
    'rojo',
    'verde',
    'negro'
])

out_blue_rdd = rdd.filter(lambda x: x!='azul')
print(out_blue_rdd.collect())
out_red_rdd = out_blue_rdd.filter(lambda x: x!='rojo')
print(out_red_rdd.collect())
out_black_rdd = out_red_rdd.filter(lambda x: x!='negro')
print(out_black_rdd.collect())

['verde', 'rojo', 'rojo', 'verde', 'negro']
['verde', 'verde', 'negro']
['verde', 'verde']


In [25]:
rdd_le = sc.parallelize(['Justin', 'Violeta', 'Lucas', 'Mo'])
rdd_le = rdd_le.map(lambda x: x.upper())
rdd_le.collect()

['JUSTIN', 'VIOLETA', 'LUCAS', 'MO']

In [26]:
rdd_le = rdd_le.map(lambda x: 'Hello '+x)
rdd_le.collect()

['Hello JUSTIN', 'Hello VIOLETA', 'Hello LUCAS', 'Hello MO']

In [27]:
rdd_q = sc.parallelize([1,2,3,4,5])
rdd_q = rdd_q.map(lambda x: (x, x**2))
rdd_q.collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]

In [28]:
rdd_q = sc.parallelize([1,2,3,4,5])
rdd_q = rdd_q.flatMap(lambda x: (x, x**2))
rdd_q.collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25]

In [29]:
rdd_le = sc.parallelize(['Justin', 'Violeta', 'Lucas', 'Mo'])
rdd_le = rdd_le.flatMap(lambda x: (x, x.upper()))
rdd_le.collect()

['Justin', 'JUSTIN', 'Violeta', 'VIOLETA', 'Lucas', 'LUCAS', 'Mo', 'MO']

In [30]:
# Coalesce
rdd = sc.parallelize([1,2,3,4,5], 10)
rdd.getNumPartitions()

10

In [31]:
rdd = rdd.coalesce(5)
rdd.getNumPartitions(), rdd.collect()

(5, [1, 2, 3, 4, 5])

In [32]:
# repartion
rdd = rdd.repartition(7)
rdd.getNumPartitions()

7

In [33]:
rdd = sc.parallelize(
    [
        ('casa', 2),
        ('parque', 1),
        ('que', 5),
        ('casa', 1),
        ('escuela', 2),
        ('casa', 1),
        ('que', 1),

    ]
)
rdd.collect()

[('casa', 2),
 ('parque', 1),
 ('que', 5),
 ('casa', 1),
 ('escuela', 2),
 ('casa', 1),
 ('que', 1)]

In [34]:
rdd_reduce = rdd.reduceByKey(lambda x, y: x+y)
rdd_reduce.collect()

[('parque', 1), ('que', 6), ('casa', 4), ('escuela', 2)]

In [35]:
rdd = sc.parallelize(
      [
        ('casa', 2, 1),
        ('parque', 1, -5),
        ('que', 5, -1),
        ('casa', 1, 8),
        ('escuela', 2, -7),
        ('casa', 1, 5),
        ('que', 1, 9),
    ]
)

rdd.collect()

[('casa', 2, 1),
 ('parque', 1, -5),
 ('que', 5, -1),
 ('casa', 1, 8),
 ('escuela', 2, -7),
 ('casa', 1, 5),
 ('que', 1, 9)]

In [36]:
rdd_reducido = rdd.map(lambda x: (x[0], x[1]+x[2])).reduceByKey(lambda x,y: x+y)
rdd_reducido.collect()

[('parque', -4), ('que', 14), ('casa', 18), ('escuela', -5)]

In [37]:
# Reduce
rdd = sc.parallelize([1,2,3,4,5])
rdd.reduce(lambda x, y: x+y), rdd.reduce(lambda x, y: x * y)

(15, 120)

In [38]:
rdd = sc.parallelize(['j', 'u', 's', 't', 'i', 'n'])
rdd.count()

6

In [39]:
# saveAsTextFile
rdd.saveAsTextFile('./data/a')

In [40]:
rdd.getNumPartitions()

2

In [41]:
rdd8 = rdd.repartition(8)
rdd8.saveAsTextFile('./data/b')

# **Almacenamiento Cahche**
---

In [42]:
from pyspark.storagelevel import StorageLevel

In [43]:
rdd = sc.parallelize([e for e in range(100)])
rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [44]:
rdd.persist(StorageLevel.MEMORY_ONLY)

ParallelCollectionRDD[65] at readRDDFromFile at PythonRDD.scala:289

In [45]:
rdd.unpersist()

ParallelCollectionRDD[65] at readRDDFromFile at PythonRDD.scala:289

In [46]:
rdd.persist(StorageLevel.DISK_ONLY)

ParallelCollectionRDD[65] at readRDDFromFile at PythonRDD.scala:289

# **HashPartitioner**

In [47]:
rdd = sc.parallelize(['x', 'y', 'z'])
rdd.collect()

['x', 'y', 'z']

In [48]:
hola = 'holas'
hash(hola)

3667732821894762131

In [49]:
num_partitions = 2

In [50]:
# indice = has(item) / num_partitions

In [51]:
hash(hola)/num_partitions

1.833866410947381e+18

In [52]:
rdd.getNumPartitions()

2

In [53]:
hash('x') % num_partitions, hash('y') % num_partitions, hash('z') % num_partitions

(1, 0, 1)

In [54]:
rdd = sc.parallelize(['j', 'u', 's', 't', 'i', 'n'], 2)
[(hash(p) % num_partitions, p) for p in rdd.collect() ]

[(0, 'j'), (0, 'u'), (0, 's'), (0, 't'), (0, 'i'), (0, 'n')]

# **Variables Broadcast**
---

In [55]:
br_uno = sc.broadcast("a")

In [56]:
br_uno.value

'a'

In [57]:
rdd1 = rdd.map(lambda x: x + br_uno.value)
rdd1.collect()

['ja', 'ua', 'sa', 'ta', 'ia', 'na']

In [58]:
# Eliminara las variables de la memoria cache en todos los ejecutores
br_uno.unpersist()

In [59]:
br_uno.value

'a'

In [60]:
rdd1 = rdd.map(lambda x: x + br_uno.value+br_uno.value)
rdd1.collect()

['jaa', 'uaa', 'saa', 'taa', 'iaa', 'naa']

In [61]:
# Destruir la variables broadcast
br_uno.destroy()

In [62]:
try:
  rdd1 = rdd.map(lambda x: x + br_uno.value+br_uno.value)
  rdd1.collect()
except Exception as e:
  print(e)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2669)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2432)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at o

In [63]:
# Acumuladores
spark = SparkSession.builder.master("local[*]").appName('A').getOrCreate()
spark

In [64]:
acumuladores = sc.accumulator(0)
acumuladores

Accumulator<id=0, value=0>

In [65]:
rdd = sc.parallelize([2,4,6,8,10])
rdd

ParallelCollectionRDD[72] at readRDDFromFile at PythonRDD.scala:289

In [66]:
rdd.foreach(lambda x: acumuladores.add(x))

In [67]:
acumuladores.value

30

In [68]:
rdd1 = sc.parallelize('De que color es el caballo blanco de simon bolivar'.split(' '))

In [69]:
acumalador = sc.accumulator(0)

In [70]:
rdd1.foreach(lambda x: acumalador.add(3))

In [71]:
acumalador.value

30

# Crar un Dataframe a partir de un RDD


In [72]:
spark = SparkSession.builder.master("local[*]").appName('A').getOrCreate()
spark

In [73]:
rdd = sc.parallelize(e for e in range(15)).map(lambda x: (x,x**2))
rdd.collect()

[(0, 0),
 (1, 1),
 (2, 4),
 (3, 9),
 (4, 16),
 (5, 25),
 (6, 36),
 (7, 49),
 (8, 64),
 (9, 81),
 (10, 100),
 (11, 121),
 (12, 144),
 (13, 169),
 (14, 196)]

In [74]:
df = rdd.toDF(['nA', 'nB'])
df.show(5)

+---+---+
| nA| nB|
+---+---+
|  0|  0|
|  1|  1|
|  2|  4|
|  3|  9|
|  4| 16|
+---+---+
only showing top 5 rows

