In [2]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("miApp")
sc = SparkContext(conf=conf)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 13:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/02 13:06:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/12/02 13:06:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [8]:
path = '/home/jovyan/data/'

deportistaRDD = sc.textFile(path + "deportista.csv") \
    .filter(lambda x: 'deportista_id' not in x) \
    .map(lambda l: l.split(','))

paisesRDD = sc.textFile(path + "paises.csv") \
    .filter(lambda x: 'id,equipo,sigla' not in x) \
    .map(lambda l: l.split(','))

resultadosRDD = sc.textFile(path + "resultados.csv") \
    .filter(lambda x: 'resultado_id' not in x) \
    .map(lambda l: l.split(','))


In [7]:
deportistaRDD.first()
paisesRDD.first()
resultadosRDD.first()

['1', 'NA', '1', '39', '1']

<h1>EJERCICIO 1 — Filtrado Básico</h1>
<h2>Deportistas mayores de 50 años</h2>

In [9]:
ej1 = deportistaRDD \
    .map(lambda x: (x[1], int(x[3]))) \
    .filter(lambda x: x[1] > 50)

ej1.take(5)


[('Win Valdemar Aaltonen', 54),
 ('Georges AchilleFould', 55),
 ('Olof Ahlberg', 71),
 ('Ernesto Arturo Alas', 54),
 ('Emilio lava Sautu', 63)]


<h1>EJERCICIO 2 — Países únicos</h1>
Usé distinct() para obtener equipos únicos.

In [10]:
equipos_unicos = paisesRDD.map(lambda x: x[1]).distinct()
equipos_unicos.count(), equipos_unicos.take(5)


                                                                                

(1184, ['30. Februar', 'Acturus', 'Akatonbo', 'Alain IV', 'Albania'])


<h1>EJERCICIO 3 — Unión</h1>
union() une ambos RDDs en uno solo.

In [11]:
deportista2RDD = sc.textFile(path + "deportista2.csv") \
    .filter(lambda x: 'deportista_id' not in x) \
    .map(lambda l: l.split(','))

union_total = deportistaRDD.union(deportista2RDD)
union_total.count()


                                                                                

135571

<h1>EJERCICIO 4 — Join Deportista + País</h1>
Usé join para unir por equipo_id y proyecté nombre + sigla.

In [12]:
# (equipo_id, nombre)
depor_kv = deportistaRDD.map(lambda x: (x[6], x[1]))

# (id pais, sigla)
pais_kv = paisesRDD.map(lambda x: (x[0], x[2]))

join_res = depor_kv.join(pais_kv)
join_res.take(5)

                                                                                

[('278', ('Edgar Lindenau Aabye', 'SWE')),
 ('705', ('Christine Jacoba Aaftink', 'NED')),
 ('705', ('Cornelia Cor Aalten Strannood ', 'NED')),
 ('705', ('Johan Aantjes', 'NED')),
 ('705', ('Willemien Aardenburg', 'NED'))]

<h1>EJERCICIO 5 — Deportistas por Género</h1>
reduceByKey sumó la cantidad por género.

In [13]:
genero_count = deportistaRDD \
    .map(lambda x: (x[2], 1)) \
    .reduceByKey(lambda a,b: a+b)

genero_count.collect()


[('1', 51380), ('2', 16406)]

<h1>EJERCICIO 6 — Puntaje por Deportista</h1>
Convertí medallas a puntaje y sumé con reduceByKey.

In [16]:
def score(m):
    m = m.lower()
    if m == "gold": return 10
    if m == "silver": return 5
    if m == "bronze": return 2
    return 0

# (deportista_id, puntaje)
puntajes = resultadosRDD \
    .map(lambda x: (x[2], score(x[1]))) \
    .reduceByKey(lambda a,b: a+b)

# unir con nombre
id_nombre = deportistaRDD.map(lambda x: (x[0], x[1]))

puntaje_nombre = puntajes.join(id_nombre) \
                         .map(lambda x: (x[1][1], x[1][0]))  # (nombre, puntaje)

puntaje_nombre.takeOrdered(5, key=lambda x: -x[1])


                                                                                

[('Larysa Semenivna Latynina Diriy ', 123),
 ('Ole Einar Bjrndalen', 102),
 ('Nikolay Yefimovich Andrianov', 101),
 ('Birgit FischerSchmidt', 100),
 ('Raymond Clarence Ray Ewry', 100)]

<h1>EJERCICIO 7 — Deportistas USA con medalla Silver</h1>
Filtré medalla Silver y luego usé join para ver si son de USA.

In [23]:
# atletas con medalla Silver
silver = resultadosRDD \
    .filter(lambda x: x[1].lower() == "silver") \
    .map(lambda x: x[2])  # solo deportista_id
silver_kv = silver.map(lambda x: (x, 1))
# deportista_id -> equipo_id
depor_equipo = deportistaRDD.map(lambda x: (x[0], x[6]))
# equipo_id -> sigla
pais_sigla = paisesRDD.map(lambda x: (x[0], x[2]))
# deportista_id -> sigla
depor_sigla = depor_equipo.join(pais_sigla)
# filtrar USA
silver_usa = depor_sigla \
    .filter(lambda x: x[1][1] == "USA")
silver_usa.take(10)

                                                                                

[('60', ('154', 'USA')),
 ('185', ('308', 'USA')),
 ('207', ('308', 'USA')),
 ('226', ('1003', 'USA')),
 ('230', ('308', 'USA')),
 ('356', ('874', 'USA')),
 ('410', ('172', 'USA')),
 ('464', ('622', 'USA')),
 ('629', ('362', 'USA')),
 ('820', ('1096', 'USA'))]

<h1>EJERCICIO 8 — Edad Promedio</h1>
Limpié edades inválidas y calculé promedio manualmente.

In [18]:
edades = deportistaRDD.map(lambda x: int(x[3])) \
                      .filter(lambda x: x > 0)

total = edades.count()
suma = edades.reduce(lambda a,b: a+b)

promedio = suma / total
promedio


24.478489876221904

<h1>EJERCICIO 9 — Deportista más altoo</h1>
takeOrdered permitió obtener el máximo.

In [19]:
alturas = deportistaRDD.map(lambda x: (x[1], int(x[4])))

max_altura = alturas.takeOrdered(1, key=lambda x: -x[1])
max_altura


[('Tommy Loren Burleson', 223)]

<h1>EJERCICIO 10 — Top 3 Países con más Medallas Gold</h1>
Filtré medallas Gold y sumé por equipo, uní con países y tomé top tres.

In [26]:
# filtrar GOLD
gold = resultadosRDD.filter(lambda x: x[1].lower() == "gold")

# (deportista_id,1)
gold_depor = gold.map(lambda x: (x[2],1))

# deportista_id -> equipo_id
dep_id_equipo = deportistaRDD.map(lambda x: (x[0], x[6]))

# unir para obtener equipo_id de cada medalla gold
gold_equipo = gold_depor.join(dep_id_equipo).map(lambda x: (x[1][1], 1))

# (equipo_id, total gold)
gold_por_equipo = gold_equipo.reduceByKey(lambda a,b: a+b)

# equipo_id -> sigla
equipo_sigla = paisesRDD.map(lambda x: (x[0], x[2]))

# unir para obtener sigla
gold_sigla = gold_por_equipo.join(equipo_sigla).map(lambda x: (x[1][1], x[1][0]))

# top 3
gold_sigla.takeOrdered(3, key=lambda x: -x[1])


                                                                                

[('USA', 1291), ('URS', 502), ('GER', 314)]