# Teoría
Driver -> manejo central del programa

Ejecutores -> computadoras que realizan las tareas en paralelo y finalmente devolverán los valores al driver.

## **RDD**

RDD (Resilient Distributed Dataset), tolerantes a fallos y pueden ser distribuidos a lo largo de los nodos del cluster.

Los RDD pueden ser creados al cargar datos de manera distribuida (HDFS, Cassandra, o cualquier sistema soportado por Hadoop, también por colecciones de datos de Scala o Python, a demás que desde ficheros locales.

RDD puede ser visto como un set de datos que soportan dos tipos de operaciones:
- Transormaciones
- Acciones

**Transformaciones**

Permiten crear un nuevo RDD a partir de uno previamente existente 

**Acciones**

Retornan un valor al driver de la aplicación.

El núcleo de operación del paradigma de Spark es una ejecución Lazy (perezosa), las transformaciones solo serán calculadas posteriormente a una llamada acción.

RDD tiene familiaridad con el paradigma orientado a objetos, lo que permite realizar operacones de bajo nivel a modo. (operaciones comunes: Map, filter, reduce).

Una de las ventajas de RDD es la compilación segura; por su particularidad de ejecución perezosa, se calcula si se generará un error antes de ejecutarse, esto nos permite identificar problemas antes de lanzar la aplicación. 

Problema RDD es que no es correctamente tratado por el Garbage collectory cuando las lógicas de operación se hacen complejas, su uso puede resultar poco práctico.

## DataFrames

Agragado en la version 1.3, puenden ser invocados con el contexto espacial de Spark SQL. Está especialmente desarrollado para ser ejectudado con instrucciones parecidas al SQL estándar.

Pueden ser creados a partir de archivos, tablas Hive, bases de datos externas y RDD o DataFrames existentes.

El DataFrame posee columnas nombradas (como DataFrame de Pandas). Pero a nivel interno Spark trabaja con Scala, lo cual le asigna a cada columna el tipo de data Row, tipo especial de objeto sin tipo definido.

Catalyst -> motor de optimización de planes de ejecución, parecido al de una base de datos pero diseñado para la cantidad de datos propia de Spark.

Tungsten -> optimizador de memoria y consumo de CPU, determina como se ejecutarán los planes lógicos creados por Catalyst en un plan físico.



# Transformaciones y acciones 

In [2]:
!pip install pyspark
from pyspark import SparkContext

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 61.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=f6d9b7ad134fbcb19015174f91fc74745eba2944f1ac954e731db8e73e14370a
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [122]:
sc = SparkContext(master="local", appName="transformacionesYAcciones")

ValueError: ignored

In [18]:
rdd1 = sc.parallelize([1,2,3])
type(rdd1)

pyspark.rdd.RDD

In [19]:
rdd1.collect()

[1, 2, 3]

In [8]:
sc # en el UI se puede ver informacion del desarrollo de los procesos

In [20]:
equiposOlimpicosRDD = sc.textFile("paises.csv") \
  .map(lambda line : line.split(","))

In [21]:
equiposOlimpicosRDD.take(5)
#sc.stop()

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

numero equipos distintos

In [25]:
equiposOlimpicosRDD.map(lambda x: (x[2])).distinct().count()
# en vez de usar listas usar tuplas.

231

Cuantos equipos posee cada pais

In [26]:
equiposOlimpicosRDD.map(lambda x : (x[2], x[1])).groupByKey().mapValues(len).take(5)

[('sigla', 1), ('AUT', 11), ('MEX', 9), ('ARG', 18), ('AFG', 1)]

In [28]:
equiposOlimpicosRDD.map(lambda x : (x[2], x[1])).groupByKey().mapValues(list).take(5)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan'])]

In [31]:
equiposArgentinos = equiposOlimpicosRDD.filter(lambda l : 'ARG' in l)

equiposArgentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

In [32]:
equiposOlimpicosRDD.count()

1185

In [35]:
equiposOlimpicosRDD.countApprox(20) # Donde 20 es el tiempo que le das en miliseg
# para ejecutar la operacion

1185

## Acciones de conteo sobre RDDs

In [37]:
deportistaOlimpicoRDD = sc.textFile("deportista.csv") \
  .map(lambda l : l.split(","))

deportistaOlimpicoRDD2 = sc.textFile("deportista2.csv") \
  .map(lambda l : l.split(","))



In [41]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD \
  .union(deportistaOlimpicoRDD2)

In [42]:
deportistaOlimpicoRDD.count()

152973

In [43]:
equiposOlimpicosRDD.top(2)

[['id', 'equipo', 'sigla'], ['999', 'Stella-2', 'NOR']]

In [44]:
deportistaOlimpicoRDD.top(2)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967']]

In [46]:
deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .take(6)

[('278', (['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0'], 'SWE')),
 ('278', (['86368', 'August Nilsson', '1', '27', '0', '0'], 'SWE')),
 ('278', (['112720', 'Gustaf Fredrik Sderstrm', '1', '34', '0', '0'], 'SWE')),
 ('278',
  (['114107', 'Karl Gustaf Vilhelm Staaf Johansson ', '1', '19', '0', '0'],
   'SWE')),
 ('278', (['86368', 'August Nilsson', '1', '27', '0', '0'], 'SWE')),
 ('278', (['112720', 'Gustaf Fredrik Sderstrm', '1', '34', '0', '0'], 'SWE'))]

In [47]:
deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .takeSample(False, 6, 25) # Valores respetidos, valores a representar, semilla aleatoria

[('1019', (['103004', 'Philippe Roux', '1', '23', '176', '75'], 'SUI')),
 ('564', (['46569', 'Hath', '1', '16', '160', '50'], 'LAO')),
 ('66', (['30556', 'Scott Dumbrell', '1', '19', '182', '66'], 'AUS')),
 ('1012', (['86432', 'Tor Folke Ren Nilsson', '1', '29', '0', '0'], 'SWE')),
 ('55', (['84616', 'Anna Nasilyan', '2', '20', '177', '57'], 'ARM')),
 ('308',
  (['73381', 'Raghd Magdy Muhammad Mustafa', '2', '29', '169', '67'], 'EGY'))]

In [48]:
resultado = sc.textFile("resultados.csv") \
  .map(lambda l : l.split(","))

In [49]:
resultadoGanador = resultado.filter(lambda l : 'NA' not in l[1])


In [50]:
resultadoGanador.take(2)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4']]

## Joins

In [59]:
deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .map(lambda l : (l[1][0][0], (l[1][0][1:], l[1][1]))) \
    .join(resultadoGanador.map(lambda l: [l[2], l[:]])) \
    .takeSample(False, 6, 25) # Valores respetidos, valores a representar, semilla aleatoria

[('107789',
  ((['Birgit Schtz', '2', '21', '179', '76'], 'GDR'),
   ['214693', 'Gold', '107789', '33', '245'])),
 ('20401',
  ((['Cheong Jun Hoong', '2', '22', '150', '48'], 'MAS'),
   ['39665', 'Silver', '20401', '51', '470'])),
 ('79635',
  ((['Boris Petrovich Mikhaylov', '1', '27', '175', '79'], 'URS'),
   ['158639', 'Silver', '79635', '32', '14'])),
 ('77390',
  ((['Francena Lynette McCorory', '2', '23', '171', '68'], 'USA'),
   ['154136', 'Gold', '77390', '49', '246'])),
 ('5700',
  ((['Polina Hryhorivna Astakhova', '2', '20', '166', '56'], 'URS'),
   ['10407', 'Gold', '5700', '21', '214'])),
 ('79627',
  ((['Isidoros Mikhas', '1', '0', '0', '0'], 'GRE'),
   ['158622', 'Gold', '79627', '4', '396']))]

In [60]:
deportistaPais = deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \

In [61]:
#resultadoGanador = 
deportistaPais.take(6)

[('278', (['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0'], 'SWE')),
 ('278', (['86368', 'August Nilsson', '1', '27', '0', '0'], 'SWE')),
 ('278', (['112720', 'Gustaf Fredrik Sderstrm', '1', '34', '0', '0'], 'SWE')),
 ('278',
  (['114107', 'Karl Gustaf Vilhelm Staaf Johansson ', '1', '19', '0', '0'],
   'SWE')),
 ('278', (['86368', 'August Nilsson', '1', '27', '0', '0'], 'SWE')),
 ('278', (['112720', 'Gustaf Fredrik Sderstrm', '1', '34', '0', '0'], 'SWE'))]

## Operaciones numéricas

In [62]:
valoresMedallas = { 'Gold' : 7,
                   'Silver' : 5,
                   'Bronze' : 4}

In [68]:
paisesMedallas = deportistaPais.join(resultadoGanador)

In [69]:
deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .map(lambda l : (l[1][0][0], (l[1][0][1:], l[1][1]))) \
    .join(resultadoGanador.map(lambda l: [l[2], l[:]])) \
    .takeSample(False, 6, 25) # Valores respetidos, valores a representar, semilla aleatoria

[('107789',
  ((['Birgit Schtz', '2', '21', '179', '76'], 'GDR'),
   ['214693', 'Gold', '107789', '33', '245'])),
 ('20401',
  ((['Cheong Jun Hoong', '2', '22', '150', '48'], 'MAS'),
   ['39665', 'Silver', '20401', '51', '470'])),
 ('79635',
  ((['Boris Petrovich Mikhaylov', '1', '27', '175', '79'], 'URS'),
   ['158639', 'Silver', '79635', '32', '14'])),
 ('77390',
  ((['Francena Lynette McCorory', '2', '23', '171', '68'], 'USA'),
   ['154136', 'Gold', '77390', '49', '246'])),
 ('5700',
  ((['Polina Hryhorivna Astakhova', '2', '20', '166', '56'], 'URS'),
   ['10407', 'Gold', '5700', '21', '214'])),
 ('79627',
  ((['Isidoros Mikhas', '1', '0', '0', '0'], 'GRE'),
   ['158622', 'Gold', '79627', '4', '396']))]

In [77]:
paisesMedallas = deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .map(lambda l : (l[1][0][0], (l[1][0][1:], l[1][1]))) \
    .join(resultadoGanador.map(lambda l: [l[2], l[1]])) \
    .takeSample(False, 6, 25)

In [79]:
paisesMedallas = deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x : [x[0], x[2]])) \
    .map(lambda l : (l[1][0][0], (l[1][0][1:], l[1][1]))) \
    .join(resultadoGanador.map(lambda l: [l[2], l[1]])) \
    .map(lambda l : [l[1][0][-1], l[1][1]]) \
  
paisesMedallas.takeSample(False, 6, 25)

[['GDR', 'Gold'],
 ['MAS', 'Silver'],
 ['URS', 'Silver'],
 ['USA', 'Gold'],
 ['URS', 'Gold'],
 ['GRE', 'Gold']]

In [88]:
PaisesMedallas = paisesMedallas \
  .map(lambda l : (l[0], valoresMedallas[l[1]])) \


PaisesMedallas.take(4)

[('NED', 5), ('NED', 5), ('NED', 5), ('NED', 5)]

In [89]:
from operator import add
conclusion = PaisesMedallas.reduceByKey((add)).sortBy(lambda x : x[1], ascending=False)

conclusion.take(10)

[('USA', 36684),
 ('URS', 15542),
 ('GBR', 12644),
 ('GER', 11263),
 ('ITA', 11142),
 ('FRA', 10854),
 ('SWE', 9484),
 ('CAN', 8146),
 ('AUS', 7914),
 ('GDR', 6590)]

Correccion

In [97]:
paisesMedallas = equiposOlimpicosRDD.map(lambda x : [x[0], x[2]]) \
    .join(deportistaOlimpicoRDD.map(lambda l: [l[-1], l[:-1]])) \
    .map(lambda l : (l[1][1][0], (l[1][1][1:], l[1][0]))) \
    .join(resultadoGanador.map(lambda l: [l[2], l[1]])) \
    .map(lambda l : (l[1][0][-1], l[1][1]))
paisesMedallas.take(5)

[('CAN', 'Bronze'),
 ('CAN', 'Bronze'),
 ('AZE', 'Bronze'),
 ('AZE', 'Bronze'),
 ('AZE', 'Gold')]

In [98]:
paisesMedallas = paisesMedallas \
  .map(lambda l : (l[0], valoresMedallas[l[1]])) \


paisesMedallas.take(4)

[('CAN', 4), ('CAN', 4), ('AZE', 4), ('AZE', 4)]

In [99]:
conclusion = paisesMedallas.reduceByKey((add)).sortBy(lambda x : x[1], ascending=False)
conclusion.take(10)

[('USA', 36684),
 ('URS', 15542),
 ('GBR', 12644),
 ('GER', 11263),
 ('ITA', 11142),
 ('FRA', 10854),
 ('SWE', 9484),
 ('CAN', 8146),
 ('AUS', 7914),
 ('GDR', 6590)]

In [109]:
sc.stop()

## DataFrame

- Permite procesar como una tabla de base de datos los DF.
- Poseen estructura y pueden ser creados como los DF.
- Una optimización superior debido al optimizador de consultas Catalyst y el motor de ejecución Tungsten.

In [107]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.types import  StringType, FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [112]:
spark = SparkContext(master='local', appName="DataFrames")
sqlContext = SQLContext(spark)



In [113]:
juegoSchema = StructType([
                          StructField("juego_id", IntegerType(), False),
                          StructField("anio", StringType(), False),
                          StructField("temporada", StringType(), False),
                          StructField("ciudad", StringType(), False)
])

juegoDF = sqlContext.read.schema(juegoSchema)\
    .option("header", "true").csv("juegos.csv")

In [114]:
juegoDF.take(5)

[Row(juego_id=1, anio='1896 Verano', temporada='1896', ciudad='Verano'),
 Row(juego_id=2, anio='1900 Verano', temporada='1900', ciudad='Verano'),
 Row(juego_id=3, anio='1904 Verano', temporada='1904', ciudad='Verano'),
 Row(juego_id=4, anio='1906 Verano', temporada='1906', ciudad='Verano'),
 Row(juego_id=5, anio='1908 Verano', temporada='1908', ciudad='Verano')]

In [115]:
juegoDF.show(5)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
|       5|1908 Verano|     1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows



## Crear DF desde un RDD

In [116]:
def eliminaEncabezado(indice, iterador):
  return iter(list(iterador)[1:])

In [118]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

In [None]:
deportistaOlimpicoRDD.take(5)

In [None]:
deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [None]:
schema  = StructType(
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo", IntegerType(), False),
)

In [None]:
sqlContext.createDataFrame(deportistaOlimpicoRDD, schema).show(5)

In [123]:
resultadoDF.printSchema()

NameError: ignored