<a href="https://colab.research.google.com/github/angelkp570/CursoSpark/blob/master/Clase3_InferenciaTipoDatos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Creación de un DataFrame a través de un RDD


In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [2]:
import findspark
findspark.init()

In [14]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, Row

from pyspark.sql import SQLContext

In [4]:
# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')

In [5]:
sc = SparkContext(master="local", appName="DataFrames",conf=conf)

In [19]:
sqlContext = SQLContext(sc)

In [6]:
deportistaOlimpicoRDD = sc.textFile("deportista.csv").map(lambda l : l.split(","))
deportistaOlimpicoRDD2 = sc.textFile("deportista2.csv").map(lambda l : l.split(","))

deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [7]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

Crearemos un DataFrame a través del RDD deportistaOlimpicoRDD pero como poodemos observar este RDD contiene un encabezado. Por esta razón crearemos una función para poder retirar dicho encabezado.

Esta función va a recibir dos parámetros, indice e iterador. Esta función va a retornar una lista limpia que ya posee los valores que nosotros queremos.

iter() nos devuelve valor a valor lo que nosotros procesamos, aseguramos que devuelva una lista y que sea desde el segundo valor del RDD con esto eliminamos el encabezado  

In [10]:
def eliminaEncabezado(indice, iterador):
  return iter(list(iterador)[1:])

#Ejemplo con función iter()

La función iter() en Python se utiliza para obtener un iterador a partir de un objeto iterable. Un iterador es un objeto que permite recorrer los elementos de un iterable, como una lista, tupla o cadena de caracteres, uno a la vez. Puedes usar la función iter() junto con la función next() para obtener y avanzar a través de los elementos del iterable.

Aquí hay un ejemplo sencillo que ilustra el uso de iter():

In [22]:
# Crear una lista
mi_lista = [1, 2, 3, 4, 5]

# Obtener un iterador a partir de la lista
mi_iterador = iter(mi_lista)

# Obtener y mostrar el primer elemento del iterador
primer_elemento = next(mi_iterador)
print("Primer elemento:", primer_elemento)

# Obtener y mostrar el segundo elemento del iterador
segundo_elemento = next(mi_iterador)
print("Segundo elemento:", segundo_elemento)

Primer elemento: 1
Segundo elemento: 2


En este ejemplo:

1. Se crea una lista llamada mi_lista.
2. Se obtiene un iterador a partir de la lista utilizando la función iter().
3. Se utiliza la función next() para obtener el primer elemento del iterador y se imprime.
4. Se utiliza la función next() nuevamente para obtener el segundo elemento del iterador y se imprime.

Este es un ejemplo básico, pero la función iter() se utiliza comúnmente en situaciones donde se necesita trabajar con iteradores y se desea controlar manualmente el avance a través de los elementos de un iterable.

#Ejemplo con mapPartitionsWithIndex

La función mapPartitionsWithIndex es una transformación en Spark que permite aplicar una función a cada partición de un RDD, mientras proporciona el índice de la partición como un parámetro a la función. Esto es útil cuando necesitas realizar operaciones que dependen del índice de la partición en el que se está trabajando.

La sintaxis básica de mapPartitionsWithIndex es la siguiente:

<code>RDD.mapPartitionsWithIndex(función)</code>

Donde función es una función que toma dos argumentos: el índice de la partición y un iterador que produce los elementos de esa partición. La función devuelve un nuevo iterador que genera los elementos transformados.

Un ejemplo para ilustrar cómo funciona mapPartitionsWithIndex:


In [32]:
#from pyspark import SparkContext

# Configuración del SparkContext
#sc = SparkContext("local", "Ejemplo Spark")

# Crear un RDD con datos de ejemplo
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)  # Crear un RDD con 3 particiones


# Definir una función para multiplicar cada elemento por el índice de la partición
def multiplicar_por_indice(partition_index, iterador):
    yield f"Partición {partition_index}: {list(iterador)}"

# Aplicar mapPartitionsWithIndex
resultado = rdd.mapPartitionsWithIndex(multiplicar_por_indice)

# Recoger y mostrar los resultados
resultados = resultado.collect()
for resultado_particion in resultados:
    print(resultado_particion)

Partición 0: [1, 2, 3]
Partición 1: [4, 5, 6]
Partición 2: [7, 8, 9, 10]


In [33]:
rdd.take(5)

[1, 2, 3, 4, 5]

En este ejemplo:

1. Se crea un RDD con datos de ejemplo y se especifica que tenga 3 particiones.
2. Se define una función multiplicar_por_indice que toma el índice de la partición y un iterador que produce los elementos de esa partición. La función multiplica cada elemento por el índice de la partición.
3. Se aplica mapPartitionsWithIndex al RDD utilizando la función definida.
4. Se recogen y muestran los resultados, donde cada partición y sus elementos transformados se imprimen.

#mapPartitionsWithIndex
Esta función le pasa dos parámetros a nuestra función creada uno sería el índice y el otro toda la columna

In [12]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

El resultado se nos muestra sin encabezado

In [13]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

Antes de transformar el RDD primero tenemos que transformar los valores del RDD.

Por eso haremos un mapeo sobre deportistaOlimpicoRDD y tranformamos los valores

In [15]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
    ))

In [34]:
deportistaOlimpicoRDD.take(5)

[(1, 'A Dijiang', 1, 24, 180, 80.0, 199),
 (2, 'A Lamusi', 1, 23, 170, 60.0, 199),
 (3, 'Gunnar Nielsen Aaby', 1, 24, 0, 0.0, 273),
 (4, 'Edgar Lindenau Aabye', 1, 34, 0, 0.0, 278),
 (5, 'Christine Jacoba Aaftink', 2, 21, 185, 82.0, 705)]

Generamos el esquema que va a contener nuestro DataFrame

In [17]:
schema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])

In [20]:
sqlContext.createDataFrame(deportistaOlimpicoRDD, schema).show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [21]:
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schema)