In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

Un RDD (Resilient Distributed Dataset) es la estructura básica de datos en Spark.
Es una colección de datos distribuida en varios nodos de un cluster y permite realizar operaciones en paralelo.

In [None]:
# Crea un contexto de Spark , para hacer la conexion entre mi codigo y el motor de Spark
#master = 'local' -> significa que spark se ejecutara en la maquina local usando solo nucleo de CPU 'local[*]' para usar todos los nucleos disponibles ,, En un entorno distribuido, podrías usar algo como 'yarn' o 'spark://<host>:<port>'.
# El parámetro appName define el nombre de la aplicación en Spark, útil para monitorear el trabajo en la interfaz web de Spark UI.
sc = SparkContext(master = 'local',appName =  'TransformacionesAcciones')

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

In [None]:
data = [1,2,3,4]
rdd = sc.parallelize(data)

In [None]:
type(rdd)

In [None]:
rdd.collect()

[1, 2, 3, 4]

In [None]:
sc

In [3]:
path = '/content/drive/MyDrive/Spark/files/'

# Obtencion csv con spark Cntext

In [None]:
# Obtyencion de archivo csv con spark Context
equiposOlimpicosRDD = sc.textFile(path + 'paises.csv').map(lambda line : line.split(","))

In [None]:
equiposOlimpicosRDD.take(10)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

In [None]:
 # ver cuantos paises participaron
equiposOlimpicosRDD.map(lambda x : (x[2])).distinct().count()# x[2] el elemnto 2 de cada lista


231

In [None]:
#Cuenta los rdd dentro de los primeros 20 milisegundos
equiposOlimpicosRDD.countApprox(20)

1185

In [None]:
# ver los primeros elementos para ver el campo en comun entre equiposRDD y deportistaRDD
equiposOlimpicosRDD.show(2)

In [None]:
#Importamos los csv deportista y deportista2
deportistaRDD = sc.textFile(path + 'deportista.csv').map(lambda line : line.split(","))
deportista2RDD = sc.textFile(path + 'deportista2.csv').map(lambda line : line.split(","))

AttributeError: 'SparkSession' object has no attribute 'textFile'

In [None]:
# Hacemos la union de los dos rdd
deportistaRDD = deportistaRDD.union(deportista2RDD)

In [None]:
#contamos la cantidad de rdds
deportistaRDD.count()

In [None]:
deportistaRDD.show()


## Obtener equipo y deportista

In [None]:
# es id de equiposRDD y equipo_id de deportistsRDD lo que los une
# Se selecciona la última columna del RDD (equipo_id) que es valor eje y el resto de contenidos
 # Se selecciona solo el id que es el valor eje y la sigla del país


deportistasEquipos = deportistaRDD.map(lambda line : [line[-1],line[:-1]]).join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]))#.takeSample(False,6,25)  #False si quiero que no se repitan , 6 cuantos quiero en la salida  y 25 la semilla de aleatoriedad

In [None]:
lista_deportisata_equipo = deportistaRDD.map(lambda line : [line[-1],line[:-1]]).join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]])).top(10)


In [None]:
lista_deportisata_equipo[:]

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR')),
 ('999', (['10765', 'Einar Berntsen', '1', '28', '0', '0'], 'NOR')),
 ('998',
  (['111659', 'G. Bernard Bernie Skinner', '1', '34', '182', '82'], 'CAN')),
 ('996', (['116030', 'Edward Eddy Stutterheim', '1', '39', '0', '0'], 'NED')),
 ('995', (['71728', 'Gordon Frederick Love', '1', '26', '0', '0'], 'FRA')),
 ('995', (['101553', 'Louis Roche', '1', '0', '0', '0'], 'FRA')),
 ('994', (['87856', 'Leo Anthony O Connell', '1', '20', '0', '0'], 'USA')),
 ('994', (['55319', 'Johnson', '1', '0', '0', '0'], 'USA')),
 ('994', (['53520', 'Henry Wood Jameson', '1', '21', '0', '0'], 'USA'))]

In [None]:
lista_deportisata_equipo[1][0]

'999'

In [None]:
lista_deportisata_equipo[1][0][0]

'9'

In [None]:
deportistasEquipos.map(lambda x : (x[1][0][0],x[1][0][1:],x[1][1])).top(2)

[('99999', ['Alexander Grant Alick Rennie', '1', '32', '182', '71'], 'RSA'),
 ('99998', ['Robert John Bob Renney', '1', '21', '178', '90'], 'AUS')]

In [None]:
# obtenemos info de resultado
resultado = sc.textFile(path + 'resultados.csv').map(lambda line : line.split(","))

In [None]:
resultado.take(7)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4'],
 ['5', 'NA', '5', '36', '5'],
 ['6', 'NA', '5', '36', '6']]

In [None]:
#Filtrar solo a los que ganaron medalla
resultado = resultado.filter(lambda l : 'NA' not in l[1])

In [None]:
resultado.take(3)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19']]

In [None]:
deportistaRDD.top(2)



[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967']]

In [None]:
equiposOlimpicosRDD.top(2)

[['id', 'equipo', 'sigla'], ['999', 'Stella-2', 'NOR']]

In [None]:
# Juntamos el RDD de deportistas ,equiposcon el de resultado

# 1er parte juntamos deportistas con y sus equipos
deportistas_Equipos = deportistaRDD.map(lambda line : [line[-1],line[:-1]]).join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]))


In [None]:
deportistas_Equipos.takeSample(False,6,25)

[('308', (['117713', 'Ahmed Amin Tabouzada', '1', '0', '0', '0'], 'EGY')),
 ('96', (['95174', 'Dsir Piryns', '1', '0', '0', '0'], 'BEL')),
 ('1178', (['99228', 'Viktor Ravnik', '1', '22', '179', '80'], 'YUG')),
 ('888', (['52248', 'Vaipava Nevo Ioane', '1', '28', '152', '62'], 'SAM')),
 ('716', (['108017', 'Neville Ian Scott', '1', '21', '187', '74'], 'NZL')),
 ('1019', (['39331', 'Werner Geeser', '1', '23', '182', '68'], 'SUI'))]

In [None]:
deportistas_Equipos.map(lambda x  : (x[1][0][0],(x[1][0][1:],x[1][1]))).take(2)

[('1', (['A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('2', (['A Lamusi', '1', '23', '170', '60'], 'CHN'))]

In [None]:
# 2da parte Juntamos con los resultados obteniendo las medallas de cada deportista haciendo la union con el deportista_id
deportistasGanadores = deportistas_Equipos.map(lambda x  : (x[1][0][0],(x[1][0][1:],x[1][1]))).join(resultado.map(lambda y : (y[2],y[1])))

In [None]:
deportistasGanadores.take(2)

[('17996', ((['Cao Mianying', '2', '21', '176', '71'], 'CHN'), 'Silver')),
 ('18005', ((['Cao Yuan', '1', '17', '160', '42'], 'CHN'), 'Gold'))]

Obtenemos los puntajes de cada pais de acuerdo al valor de las medallas

In [None]:
# Diccionario valores de las medallas
valoresMedallas = {'Gold':7,'Silver':5,'Bronze':4}

In [None]:
# primer parte se reduce la tupla
# x[1][0][1] -> Iniciales pais
# x[1][1] -> tipo de medalla
deportistasGanadores.map(lambda x : (x[1][0][1],x[1][1])).take(2)

[('CHN', 'Silver'), ('CHN', 'Gold')]

In [None]:
# Segunda parte obtenemos los valores de cada medalla
deportistasGanadores.map(lambda x : (x[1][0][1],valoresMedallas[x[1][1]])).take(2)

[('CHN', 5), ('CHN', 7)]

In [None]:
# Guardamos en una  variable el rdd donde esta pais , puntos por medalla
paisesPuntos = deportistasGanadores.map(lambda x : (x[1][0][1],valoresMedallas[x[1][1]]))

In [None]:
paisesPuntos.take(2)

[('CHN', 5), ('CHN', 7)]

In [None]:
# hacemos la sumatoria por pais y ordenamos los resultados
from operator import add
conclusion = paisesPuntos.reduceByKey((add)).sortBy(lambda x : x[1],ascending = False)


In [None]:
conclusion.take(10)

[('USA', 32137),
 ('URS', 14834),
 ('GBR', 10925),
 ('GER', 10896),
 ('FRA', 9265),
 ('ITA', 8755),
 ('SWE', 8110),
 ('CAN', 7209),
 ('AUS', 6755),
 ('HUN', 6142)]

# Obtencion de archivos csv con Spark Session

In [4]:
#Obtencion de archivo csv con spark session , cuando se hace la lectura con spak session se transforma en dataframe y con context en rdd
deportistaOlimpicoDF= spark.read.csv(path + 'deportista.csv',header = True)
deportistaOlimpico2DF= spark.read.csv(path + 'deportista2.csv',header = True)

In [5]:
# Union de los 2 Data Frame
DeportistaDF = deportistaOlimpicoDF.union(deportistaOlimpico2DF)

# Dataframes

In [6]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import pyspark.sql
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
from pyspark.sql.types import Row

In [None]:
path = '/content/drive/MyDrive/Spark/files'

Creacion de schema

In [8]:
juegosSchema = StructType([StructField('juego_id',IntegerType(),False),# False si es obligatorio
                           StructField('nombre_juego',StringType(),False),
                           StructField("año",StringType(),False),
                           StructField("temporada",StringType(),False),
                           StructField("ciudad",StringType(),False)])

Lectura de schema

In [None]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()


Transformacion de RDD -> Df

In [9]:
# "header", "true" → Le dice a Spark que la primera fila del CSV contiene los nombres de las columnas.
juegoDF = spark.read.schema(juegosSchema).option ("header","true").csv(path + '/juegos.csv')

In [10]:
juegoDF.show(5)

+--------+------------+----+---------+---------+
|juego_id|nombre_juego| año|temporada|   ciudad|
+--------+------------+----+---------+---------+
|       1| 1896 Verano|1896|   Verano|   Athina|
|       2| 1900 Verano|1900|   Verano|    Paris|
|       3| 1904 Verano|1904|   Verano|St. Louis|
|       4| 1906 Verano|1906|   Verano|   Athina|
|       5| 1908 Verano|1908|   Verano|   London|
+--------+------------+----+---------+---------+
only showing top 5 rows



In [None]:
spark

In [None]:
deportistaRDD.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0|   0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0|   0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|Cornelia Cor Aalt...|     2|  18|   168|   0|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|Einar Ferdinand E...|     1|  26|     0|   0|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [None]:
type(deportistaRDD)

In [None]:
# Cambiar los tipos de datos del dataframe DeportistaDF
dict_tipos = {'deportista_id':IntegerType(),'genero':IntegerType(),'edad':IntegerType(),'altura':IntegerType(),'peso':FloatType(),'equipo_id':IntegerType()}

for col,tipo in dict_tipos.items():
  DeportistaDF = DeportistaDF.withColumn(col,DeportistaDF[col].cast(tipo))


In [None]:
DeportistaDF.dtypes

[('deportista_id', 'int'),
 ('nombre', 'string'),
 ('genero', 'int'),
 ('edad', 'int'),
 ('altura', 'int'),
 ('peso', 'float'),
 ('equipo_id', 'int')]

In [None]:
#Eliminamos columna Altura
DeportistaDF = deportistaRDD.drop('altura')

In [None]:
# Reasignamos nombre de columnas
DeportistaDF = DeportistaDF.withColumnRenamed('genero','sexo')

In [None]:
DeportistaDF.columns

['deportista_id', 'nombre', 'sexo', 'edad', 'altura', 'peso', 'equipo_id']

In [None]:
# Uso de select
DeportistaDF.select('nombre','edad').show(5)

+--------------------+----+
|              nombre|edad|
+--------------------+----+
|           A Dijiang|  24|
|            A Lamusi|  23|
| Gunnar Nielsen Aaby|  24|
|Edgar Lindenau Aabye|  34|
|Christine Jacoba ...|  21|
+--------------------+----+
only showing top 5 rows



In [None]:
import pyspark.sql.functions as f

#Si usas col(), puedes encadenar métodos y hacer transformaciones más complejas,

DeportistaDF = DeportistaDF.select('deportista_id','nombre','sexo',f.col('edad').alias('edadAlJugar'),'equipo_id')

In [None]:
DeportistaDF

DataFrame[deportista_id: int, nombre: string, edadAlJugar: int, equipo_id: int]

In [None]:
#Ordenar el Dataframe segun una columna
DeportistaDF.sort('edadAlJugar').show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        68312|      Beyene Legesse|          0|      332|
|          224|     Mohamed AbdelEl|          0|      308|
|        68652|    Adjutant Lemoine|          0|      362|
|          487|      Inni Aboubacar|          0|      721|
|        68315|     Douglas C. Legg|          0|      413|
|          226|Sanad Bushara Abd...|          0|     1003|
|        68302|         Viktor Legt|          0|      259|
|           58|    Georgi Abadzhiev|          0|      154|
|        68317|      Ronald H. Legg|          0|      413|
|          230|    Moustafa Abdelal|          0|      308|
|        67849|          Edward Lee|          0|      199|
|          102|   Sayed Fahmy Abaza|          0|      308|
|        68329|        Paul Legrain|          0|      362|
|          260|  Ahmed Abdo Mustafa|          0|     100

In [None]:
#Filtramos los deportistas que sean diferentes de cero
DeportistaDF = DeportistaDF.filter(DeportistaDF.edadAlJugar != 0)

In [None]:
DeportistaDF.sort('edadAlJugar').show()

+-------------+--------------------+----+-----------+---------+
|deportista_id|              nombre|sexo|edadAlJugar|equipo_id|
+-------------+--------------------+----+-----------+---------+
|        71691|  Dimitrios Loundras|   1|         10|      333|
|        22411|Magdalena Cecilia...|   2|         11|      413|
|        70616|          Liu Luyang|   2|         11|      199|
|        37333|Carlos Bienvenido...|   1|         11|      982|
|        76675|   Marcelle Matthews|   2|         11|      967|
|        40129|    Luigina Giavotti|   2|         11|      507|
|       118925|Megan Olwen Deven...|   2|         11|      413|
|        47618|Sonja Henie Toppi...|   2|         11|      742|
|       126307|        Liana Vicens|   2|         11|      825|
|        51268|      Beatrice Hutiu|   2|         11|      861|
|        52070|        Etsuko Inada|   2|         11|      514|
|        72854|      Licia Macchini|   2|         12|      507|
|         5291|Marcia Arriaga La...|   2

In [None]:
#Obtencion de paises
paises = spark.read.csv(path + 'paises.csv',header=True)

In [None]:
paises.dtypes

[('id', 'string'), ('equipo', 'string'), ('sigla', 'string')]

In [None]:
paises = paises.withColumn('id',paises['id'].cast(IntegerType()))

In [None]:
paises.show(5)

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
|  4|             Acturus|  ARG|
|  5|         Afghanistan|  AFG|
+---+--------------------+-----+
only showing top 5 rows



In [None]:
# Obtencion de evento
eventoDF = spark.read.csv(path + 'evento.csv',header=True)

In [None]:
eventoDF.show(3)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
+---------+--------------------+----------+
only showing top 3 rows



In [None]:
eventoDF.dtypes

[('evento_id', 'string'), ('evento', 'string'), ('deporte_id', 'string')]

In [None]:
eventoDF = eventoDF.withColumn('evento_id',eventoDF['evento_id'].cast(IntegerType()))


In [None]:
#Obtewncion de resultados
resultados = spark.read.csv(path + 'resultados.csv',header=True)

In [None]:
resultados.dtypes

[('resultado_id', 'string'),
 ('medalla', 'string'),
 ('deportista_id', 'string'),
 ('juego_id', 'string'),
 ('evento_id', 'string')]

In [None]:
dict = {'resultado_id':IntegerType(),'medalla':StringType(),'deportista_id':IntegerType(),'juego_id':IntegerType(),'evento_id':IntegerType()}

for clave,valor in dict.items():
  resultados = resultados.withColumn(clave,resultados[clave].cast(valor))

In [None]:
resultados.dtypes

[('resultado_id', 'int'),
 ('medalla', 'string'),
 ('deportista_id', 'int'),
 ('juego_id', 'int'),
 ('evento_id', 'int')]

In [None]:
resultados.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [None]:
#Obtencion de juegos
juegosDF = spark.read.csv(path+'/juegos.csv',header=True)

In [None]:
#Renombramos y cambiamos tipo de datos
juegosDF = juegosDF.select(f.col('_c0').alias('juego_id').cast(IntegerType()),'nombre_juego','annio','temporada','ciudad')

In [None]:
juegosDF.show(5)

+--------+------------+-----+---------+---------+
|juego_id|nombre_juego|annio|temporada|   ciudad|
+--------+------------+-----+---------+---------+
|       1| 1896 Verano| 1896|   Verano|   Athina|
|       2| 1900 Verano| 1900|   Verano|    Paris|
|       3| 1904 Verano| 1904|   Verano|St. Louis|
|       4| 1906 Verano| 1906|   Verano|   Athina|
|       5| 1908 Verano| 1908|   Verano|   London|
+--------+------------+-----+---------+---------+
only showing top 5 rows



In [None]:
juegosDF = juegosDF.withColumn('annio',juegosDF['annio'].cast(IntegerType()))

In [None]:
juegosDF.dtypes

[('juego_id', 'int'),
 ('nombre_juego', 'string'),
 ('annio', 'int'),
 ('temporada', 'string'),
 ('ciudad', 'string')]

In [None]:
# Uso de joins para unir data frames
DeportistaDF.join( resultados,DeportistaDF.deportista_id == resultados.deportista_id , 'left')\
.join(juegosDF, juegosDF.juego_id == resultados.juego_id,'left')\
.join(eventoDF,eventoDF.evento_id == resultados.evento_id,'left')\
.select('nombre','edadAlJugar','medalla',f.col('annio').alias('año'),'evento').filter(resultados.medalla!= 'NA').show(5)

+--------------------+-----------+-------+----+--------------------+
|              nombre|edadAlJugar|medalla| año|              evento|
+--------------------+-----------+-------+----+--------------------+
|Edgar Lindenau Aabye|         34|   Gold|1900|Tug-Of-War Men's ...|
|Arvo Ossian Aaltonen|         22| Bronze|1920|Swimming Men's 20...|
|Arvo Ossian Aaltonen|         22| Bronze|1920|Swimming Men's 40...|
|Juhamatti Tapio A...|         28| Bronze|2014|Ice Hockey Men's ...|
|Paavo Johannes Aa...|         28| Bronze|1948|Gymnastics Men's ...|
+--------------------+-----------+-------+----+--------------------+
only showing top 5 rows



In [None]:
# union de medalla , pais y equipo que pertenecen
DeportistaDF.join(resultados,DeportistaDF.deportista_id == resultados.deportista_id,'left')\
.join(paises,paises.id == DeportistaDF.equipo_id,'left')\
.select('medalla','equipo','sigla').filter(resultados.medalla !='NA').show(5)

+-------+--------------+-----+
|medalla|        equipo|sigla|
+-------+--------------+-----+
|   Gold|Denmark/Sweden|  SWE|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
+-------+--------------+-----+
only showing top 5 rows



In [None]:
medallistaXAnio = DeportistaDF.join(resultados,DeportistaDF.deportista_id == resultados.deportista_id,'left')\
.join(juegosDF,juegosDF.juego_id == resultados.juego_id,'left')\
.join(paises,paises.id == DeportistaDF.equipo_id,'left')\
.join(eventoDF,eventoDF.evento_id == resultados.evento_id,'left')\
.select('sigla',f.col('annio').alias('año'),'medalla','equipo','nombre','evento')\
.filter(resultados.medalla !='NA')

In [None]:
medallista2XAnio = medallistaXAnio.groupBy('sigla','año').count()

In [None]:
medallista2XAnio.show()

+-----+----+-----+
|sigla| año|count|
+-----+----+-----+
|  USA|2012|  248|
|  BLR|2000|   15|
|  FRA|2006|   15|
|  FIN|1988|   38|
|  KOR|2010|   18|
|  VEN|2012|    1|
|  FRA|1948|   77|
|  GBR|2000|   55|
|  QAT|2012|    2|
|  FRG|1994|    6|
|  JPN|1932|   28|
|  KOR|1972|    1|
|  GER|1932|   57|
|  AUS|1972|   20|
|  NED|1972|   15|
|  BAH|2008|    7|
|  NZL|1988|   24|
|  THA|1988|    1|
|  POR|1988|    1|
|  POR|2012|    2|
+-----+----+-----+
only showing top 20 rows



In [None]:
medallista2XAnio.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- año: integer (nullable = true)
 |-- count: long (nullable = false)



In [None]:
# Agrupamps por medallas totales y medallas promedio
medallista2XAnio.groupBy('sigla','año').agg(f.sum('count').alias('Total medallas'),f.avg('count').alias('Medallas promedio')).sort('año').show()

+-----+----+--------------+-----------------+
|sigla| año|Total medallas|Medallas promedio|
+-----+----+--------------+-----------------+
|  DEN|1896|             6|              6.0|
|  FRA|1896|            11|             11.0|
|  AUT|1896|             5|              5.0|
|  HUN|1896|             6|              6.0|
|  GER|1896|            30|             30.0|
|  AUS|1896|             3|              3.0|
|  GBR|1896|             8|              8.0|
|  USA|1896|            20|             20.0|
|  GRE|1896|             9|              9.0|
|  SUI|1896|             3|              3.0|
|  IND|1900|             2|              2.0|
|  AUS|1900|             6|              6.0|
|  CAN|1900|             2|              2.0|
|  ITA|1900|             5|              5.0|
|  ESP|1900|             2|              2.0|
|  AUT|1900|             6|              6.0|
|  SUI|1900|            21|             21.0|
|  DEN|1900|             7|              7.0|
|  CUB|1900|             2|       

#SQL

In [None]:
resultados.registerTempTable('resultado')
DeportistaDF.registerTempTable('deportista')
paises.registerTempTable('paises')
juegosDF.registerTempTable('juegos')
eventoDF.registerTempTable('evento')

In [None]:

spark.sql("SELECT * FROM resultado").show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [None]:
#Requiere mas procesamiento al usar sql
spark.sql(""" SELECT medalla,equipo,sigla,j.annio FROM resultado r
              JOIN deportista d
              ON r.deportista_id = d.deportista_id
              JOIN paises p
              ON p.id = d.equipo_id
              JOIN juegos j
              ON j.juego_id = r.juego_id
              JOIN evento e
              ON e.evento_id = r.evento_id
              WHERE medalla <> "NA"
              GROUP BY medalla,equipo,sigla,j.annio
              ORDER BY annio ASC""").show()

+-------+--------------------+-----+-----+
|medalla|              equipo|sigla|annio|
+-------+--------------------+-----+-----+
| Silver|              France|  FRA| 1896|
|   Gold|       United States|  USA| 1896|
|   Gold|              Greece|  GRE| 1896|
|   Gold|           Australia|  AUS| 1896|
| Bronze|             Germany|  GER| 1896|
|   Gold|             Denmark|  DEN| 1896|
| Silver|       Great Britain|  GBR| 1896|
| Silver|       United States|  USA| 1896|
| Silver|             Denmark|  DEN| 1896|
| Silver|             Austria|  AUT| 1896|
| Bronze|Ethnikos Gymnasti...|  GRE| 1896|
| Bronze|       United States|  USA| 1896|
|   Gold|             Austria|  AUT| 1896|
| Bronze|             Denmark|  DEN| 1896|
| Bronze|       Great Britain|  GBR| 1896|
| Bronze|              France|  FRA| 1896|
|   Gold|             Hungary|  HUN| 1896|
|   Gold|       Great Britain|  GBR| 1896|
|   Gold|              France|  FRA| 1896|
| Bronze|             Hungary|  HUN| 1896|
+-------+--

# UDF
Las funciones definidas por el usuario o UDF, por sus siglas en inglés, son una funcionalidad agregada en Spark para definir funciones basadas en columnas las cuales permiten extender las capacidades de Spark al momento de transformar el set de datos.

Este tipo de implementaciones son convenientes cuando tenemos un desarrollo extenso donde hemos identificado la periodicidad de tareas repetitivas como suele ser en pasos de limpieza de datos, transformación o renombrado dinámico de columnas.

Por lo anterior es común encontrar en un proyecto de Spark una librería independiente donde existen todas estas funciones agregadas para que los desarrolladores involucrados en el proyecto puedan usarlas a conveniencia.

El uso de UDF no implica que las funciones que podemos crear nativamente con Python, Scala, R o Java no sean útiles. Una UDF tiene el objetivo de ofrecer un estándar interno en el proyecto que nos encontremos realizando. Además, en caso de ser necesario, una UDF puede ser modificada con ayuda de decoradores para que sea más extensible en diversos escenarios a los cuales nos podemos enfrentar.

Otro motivo para usar UDF es que en el módulo de Spark MLlib, la librería nativa de Spark para operaciones de Machine Learning, las UDF juegan un papel vital al momento de hacer transformaciones. Por lo cual tener un uso familiar de estas ampliará considerablemente la curva de aprendizaje de Spark MLlib.

In [None]:
deportistaError = spark.read.csv(path + '/deportistaError.csv',header = True)

In [None]:
deportistaError.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|  NULL|NULL|      273|
|            4|Edgar Lindenau Aabye|     1|  34|  NULL|NULL|      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|NULL|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|  NULL|NULL|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [None]:
deportistaError.dtypes

[('deportista_id', 'string'),
 ('nombre', 'string'),
 ('genero', 'string'),
 ('edad', 'string'),
 ('altura', 'string'),
 ('peso', 'string'),
 ('equipo_id', 'string')]

# Creacion de funcion UDF para transformar tipo de dato

In [None]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
  return int(valor) if valor !=None else None

conversionEnteros_udf = udf(lambda z : conversionEnteros(z),IntegerType())
#lambda z: conversionEnteros(z): Aplica la función conversionEnteros a cada valor de la columna.
spark.udf.register('conversionEnteros_udf', conversionEnteros_udf)
#Registra la UDF en el contexto SQL de Spark (SQLContext), lo que permite usarla en consultas SQL en Spark.

<pyspark.sql.udf.UserDefinedFunction at 0x7e2b9eb11250>

In [None]:
deportistaError.select(conversionEnteros_udf('altura').alias('alturaUDF')).show()

+---------+
|alturaUDF|
+---------+
|      180|
|      170|
|     NULL|
|     NULL|
|      185|
|      188|
|      183|
|      168|
|      186|
|     NULL|
|      182|
|      172|
|      159|
|      171|
|     NULL|
|      184|
|      175|
|      189|
|     NULL|
|      176|
+---------+
only showing top 20 rows



# Persistencia

Cómo funciona la persistencia de datos en Spark?

Aprender a manejar el particionado y la persistencia de datos en Spark es crucial para optimizar el rendimiento y eficacia de los procesos. Sin un adecuado manejo de estos aspectos, los sistemas pueden recomputar innecesariamente los datos, afectando el rendimiento y aumentando los costos. En esta clase, aprenderás cómo utilizar y configurar distintas técnicas de persistencia en Spark, permitiendo a tus datos estar disponibles de manera eficaz a lo largo del procesamiento.

¿Por qué es importante mantener los datos en memoria?

Cuando se trabaja con grandes conjuntos de datos, la recomputación constante puede ser ineficiente y costosa. Utilizar herramientas como cache y persist permite mantener los datos en memoria o en disco, evitando reprocesamientos y mejorando tiempos de respuesta. Aquí algunos métodos comunes:

Cache: Guarda los datos en la memoria para poder acceder a ellos rápidamente.
Persist: Ofrece la opción de almacenar datos tanto en memoria como en disco, según se requiera.
Ambos métodos ayudan a conservar los datos en forma serializada, facilitando su acceso y manejo dentro de PySpark.

Cómo funciona en Spark:
Sin persistencia: Cada vez que ejecutas una acción (por ejemplo, count), Spark vuelve a ejecutar todas las transformaciones necesarias.

Con persistencia: Puedes usar métodos como persist() o cache() para guardar los resultados intermedios en memoria o disco. Esto evita la recomputación y acelera el proceso.

In [11]:
from pyspark.storagelevel import StorageLevel

In [12]:
DeportistaDF.is_cached

False

¿Qué hace?

Esta propiedad (is_cached) devuelve True si el DataFrame está almacenado en caché (persistente en memoria o disco), y False si no lo está.

¿Por qué es útil?

Cuando trabajas con grandes volúmenes de datos en PySpark, las transformaciones se ejecutan de forma perezosa (lazy evaluation). Si usas .cache() o .persist(), PySpark guarda el DataFrame en memoria para evitar recomputaciones y mejorar el rendimiento.

In [13]:
# Almacenar el DataFrame en caché
DeportistaDF.cache()

DataFrame[deportista_id: string, nombre: string, genero: string, edad: string, altura: string, peso: string, equipo_id: string]

In [17]:
#storageLevel para saber que tipo de persistencia tiene el archivo
DeportistaDF.storageLevel

StorageLevel(True, True, False, True, 1)

StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)


useDisk	True/False ->	Si el DataFrame se guarda en disco cuando no cabe en memoria.

useMemory	True/False	-> Si el DataFrame se almacena en memoria RAM.

useOffHeap	True/False	->Si el almacenamiento usa memoria fuera del heap de la JVM (requiere configuración extra).

deserialized	True/False	->Si los datos se guardan en memoria en formato deserializado para acceso rápido. Si es False, se almacenan en un formato comprimido.

replication->	Número entero	Número de veces que los datos se replican en diferentes nodos para tolerancia a fallos.

In [21]:
#Para cambiar de persistencia debemos desligarla de la persisntencia que ya tenia
DeportistaDF.unpersist()

DataFrame[deportista_id: string, nombre: string, genero: string, edad: string, altura: string, peso: string, equipo_id: string]

In [22]:
#Para que es te en memoria y en disco y repicado 2 veces
DeportistaDF.persist(StorageLevel.MEMORY_AND_DISK_2)

DataFrame[deportista_id: string, nombre: string, genero: string, edad: string, altura: string, peso: string, equipo_id: string]

In [23]:
DeportistaDF.storageLevel

StorageLevel(True, True, False, False, 2)

In [20]:
#Creacion de persistencia personalizada
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

In [24]:
DeportistaDF.unpersist()

DataFrame[deportista_id: string, nombre: string, genero: string, edad: string, altura: string, peso: string, equipo_id: string]

In [25]:
#Aplicando la persistencia creada
DeportistaDF.persist(StorageLevel.MEMORY_AND_DISK_3)#lo mas recomendable es que se localice en 3 nodos pero depende del negocio

DataFrame[deportista_id: string, nombre: string, genero: string, edad: string, altura: string, peso: string, equipo_id: string]

In [26]:
DeportistaDF.storageLevel

StorageLevel(True, True, False, False, 3)

# Particionado

Como se ha descrito en clases pasadas, los RDD son la capa de abstracción primaria para poder interactuar con los datos que viven en nuestro ambiente de Spark. Aunque estos puedan ser enmascarados con un esquema dotándolos de las facultades propias de los DataFrames, la información de fondo sigue operando como RDD.

Por lo tanto, la información, como indica el nombre de los RDD, se maneja de forma distribuida a lo largo del clúster, facilitando las operaciones que se van a ejecutar, ya que segmentos de información pueden encontrarse en diferentes ejecutores reduciendo el tiempo necesario para acceder a la información y poder así realizar los cálculos necesarios.

Cuando un RDD o Dataframe es creado, según las especificaciones que se indiquen a la aplicación de Spark, creará un esquema de particionado básico, el cual distribuirá los datos a lo largo del clúster. Siendo así que al momento de ejecutar una acción, esta se ejecutará entre los diversos fragmentos de información que existan para poder así realizar de la forma más rápida las operaciones. Es por eso que un correcto esquema de particionado es clave para poder tener aplicaciones rápidas y precisas que además consuman pocos recursos de red.

Otra de las tareas fundamentales es la replicación de componentes y sus fragmentos, ya que al aumentar la disponibilidad de estos podremos asegurar una tolerancia a fallos, mientras más se replique un valor es más probable que no se pierda si existe un fallo de red o energía, además de permitir una disponibilidad casi inmediata del archivo buscado.

La partición y replicación son elementos que deben ser analizados según el tipo de negocio o requerimientos que se tengan en el desarrollo que se encuentre en progreso, por lo cual la cantidad de datos replicados o granularidad de datos existentes en los fragmentos dependerá en función de las reglas de negocio.


In [28]:

spark = SparkSession.builder.appName("Particionado").master("local[5]").getOrCreate()

.master("local[5]")

Especifica el modo de ejecución y el número de hilos (threads) que Spark usará en la máquina local.
"local[5]" significa que Spark usará 5 núcleos del procesador para ejecutar tareas en paralelo.
"local[*]" usa todos los núcleos disponibles.

In [32]:
df = spark.range(0,10)

In [33]:
#pARA OBTENER EL NUMERO DE PARTICIONES
df.rdd.getNumPartitions()

5

In [36]:
#Otra forma de particionar rdd's
rdd1 = spark.sparkContext.parallelize((0,20),10)
rdd1.getNumPartitions()

10

In [38]:
#rdd desde archivo
rddArchivo = spark.sparkContext.textFile(path + '/deporte.csv',10)

In [55]:
type(rddArchivo)

In [39]:
rddArchivo.getNumPartitions()

10

In [40]:
#Guardar el archivo en una ruta donde encontraremos las particiones
rddArchivo.saveAsTextFile(path+'/salidatexto')

In [41]:
!ls /content/drive/MyDrive/Spark/files/salidatexto

part-00000  part-00002	part-00004  part-00006	part-00008  _SUCCESS
part-00001  part-00003	part-00005  part-00007	part-00009


In [42]:
#Cargar todos los archivos guardados
rdd = spark.sparkContext.wholeTextFiles(path + '/salidatexto/*')

In [43]:
rdd.take(5)

[('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00000',
  'deporte_id,deporte\n1,Basketball\n2,Judo\n3,Football\n4,Tug-Of-War\n5,Speed Skating\n6,Cross Country Skiing\n'),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00001',
  '7,Athletics\n8,Ice Hockey\n9,Swimming\n10,Badminton\n11,Sailing\n12,Biathlon\n13,Gymnastics\n14,Art Competitions\n'),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00002',
  '15,Alpine Skiing\n16,Handball\n17,Weightlifting\n18,Wrestling\n19,Luge\n20,Water Polo\n'),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00003',
  '21,Hockey\n22,Rowing\n23,Bobsleigh\n24,Fencing\n25,Equestrianism\n26,Shooting\n27,Boxing\n28,Taekwondo\n'),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00004',
  '29,Cycling\n30,Diving\n31,Canoeing\n32,Tennis\n33,Modern Pentathlon\n34,Figure Skating\n35,Golf\n')]

In [44]:
# mapear los valores en una lista
lista = rdd.mapValues(lambda x : x.split()).collect()

In [46]:
lista

[('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00000',
  ['deporte_id,deporte',
   '1,Basketball',
   '2,Judo',
   '3,Football',
   '4,Tug-Of-War',
   '5,Speed',
   'Skating',
   '6,Cross',
   'Country',
   'Skiing']),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00001',
  ['7,Athletics',
   '8,Ice',
   'Hockey',
   '9,Swimming',
   '10,Badminton',
   '11,Sailing',
   '12,Biathlon',
   '13,Gymnastics',
   '14,Art',
   'Competitions']),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00002',
  ['15,Alpine',
   'Skiing',
   '16,Handball',
   '17,Weightlifting',
   '18,Wrestling',
   '19,Luge',
   '20,Water',
   'Polo']),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00003',
  ['21,Hockey',
   '22,Rowing',
   '23,Bobsleigh',
   '24,Fencing',
   '25,Equestrianism',
   '26,Shooting',
   '27,Boxing',
   '28,Taekwondo']),
 ('file:/content/drive/MyDrive/Spark/files/salidatexto/part-00004',
  ['29,Cycling',
   '30,Diving',
   '31,Canoeing',


In [48]:
lista = [l[0] for l in lista ]

In [49]:
lista

['file:/content/drive/MyDrive/Spark/files/salidatexto/part-00000',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00001',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00002',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00003',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00004',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00005',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00006',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00007',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00008',
 'file:/content/drive/MyDrive/Spark/files/salidatexto/part-00009']

In [52]:
rdd2 = spark.sparkContext.textFile(','.join(lista),10).map(lambda l : l.split(","))

In [53]:
rdd2.take(4)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football']]

In [56]:
#Obtencion de paises
paises = spark.read.csv(path + 'paises.csv',header=True)

In [57]:
# Reparticionarlo en 5 particiones (para que se distribuya en 5 nodos)
paises = paises.repartition(5)

In [61]:
paises.rdd.getNumPartitions()

5

In [62]:
paises.storageLevel

StorageLevel(False, False, False, False, 1)

In [68]:
paises.unpersist()

DataFrame[id: string, equipo: string, sigla: string]

In [69]:
paises.rdd.persist(StorageLevel(True, True, False, True, 5))  # 5 copias de cada partición

MapPartitionsRDD[70] at javaToPython at NativeMethodAccessorImpl.java:0

In [70]:
paises.storageLevel

StorageLevel(False, False, False, False, 1)

In [71]:
spark.stop()

In [None]:
sc.stop()