# Using PySpark to manipulate data about the Olympic Games

In [109]:
from pyspark import SparkContext

In [110]:
from pyspark.sql import SparkSession

In [111]:
spark = SparkSession.builder \
    .master('local') \
    .appName('myFirstSession') \
    .getOrCreate()

In [112]:
spark.stop()

### Transformations and actions

In [113]:
from pyspark import SparkContext

sc = SparkContext(master='local', appName='TransAndActions')

rdd1 = sc.parallelize([1,2,3])
type(rdd1)

pyspark.rdd.RDD

In [114]:
rdd1.collect()

[1, 2, 3]

In [115]:
path = './files'

teamsRDD = sc.textFile(path+'/paises.csv') \
    .map(lambda line : line.split(','))

In [116]:
teamsRDD.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

In [117]:
# It's better to do operations with tuples.
teamsRDD.map(lambda x: (x[2])).distinct().count() # We have 231 countries competing

231

In [118]:
# In this case since it's a groupBy, we put first the col, we want to group by,
# Here we take first the third col since we want to group by 'sigla'
teamsRDD.map(lambda x: (x[2], x[1])).groupByKey().mapValues(list).take(5)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan'])]

In [119]:
argentinaTeams = teamsRDD.filter(lambda l: 'ARG' in l)
argentinaTeams.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

### Checking participants in Olympic Games

In [120]:
sportmenRDD = sc.textFile(path+'/deportista.csv') \
    .map(lambda line : line.split(','))

sportmenRDD2 = sc.textFile(path+'/deportista2.csv') \
    .map(lambda line : line.split(','))

In [121]:
sportmenRDD = sportmenRDD.union(sportmenRDD2)
sportmenRDD.count()

135572

In [122]:
teamsRDD.top(1)

[['id', 'equipo', 'sigla']]

In [123]:
sportmenRDD.top(1)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id']]

In [124]:
# In order to do the join, I have to select the position of the col where 'equipo_id' exists.
sportmenRDD.map(lambda l: [l[-1], l[:-1]]) \
    .join(teamsRDD.map(lambda x : [x[0], x[2]] )) \
    .takeSample(False, 6, 25)

                                                                                

[('362', (['131505', 'Steven Woodburn', '1', '24', '185', '90'], 'FRA')),
 ('967', (['13626', 'Jill Brresen', '2', '22', '170', '57'], 'RSA')),
 ('482', (['44299', 'Gumundur Gumundsson', '1', '23', '174', '77'], 'ISL')),
 ('970', (['68062', 'Lee MinHui', '2', '28', '174', '65'], 'KOR')),
 ('794', (['92442', 'Luis Paz Zoldan', '1', '19', '187', '82'], 'PER')),
 ('413', (['26822', 'Jared Mark Deacon', '1', '24', '185', '77'], 'GBR'))]

In [125]:
sportmenRDD.filter(lambda l: '131505' in l).collect()

[['131505', 'Steven Woodburn', '1', '24', '185', '90', '362']]

### Let's see the number of medals

In [126]:
results = sc.textFile(path+'/resultados.csv') \
    .map(lambda line : line.split(','))

# We only want results that we're certain of    
winnerResults = results.filter(lambda l : 'NA' not in l[1])
winnerResults.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19'],
 ['39', 'Bronze', '15', '7', '20'],
 ['41', 'Bronze', '16', '50', '14']]

### Obtaining participants by countries

In [127]:
participantCountries = sportmenRDD \
    .map(lambda l : [l[-1], l[:-1]]) \
    .join(teamsRDD.map(lambda x: [x[0], x[2]]))

participantCountries.top(1)

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR'))]

In [128]:
winnerResults.top(1)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id']]

In [129]:
countriesMedals = participantCountries.join(winnerResults)
countriesMedals.top(1)

                                                                                

[('975',
  ((['94388', 'Phan Hu Dong', '1', '21', '167', '58'], 'VNM'), 'Bronze'))]

### Calculating medal values

In [130]:
medalValues = {'Gold': 7, 'Silver': 5, 'Bronze': 4}

In [131]:
countriesMedals = countriesMedals \
    .map(lambda x: [x[1][0][-1], medalValues[x[1][1]]]) # Here I am assigning the values of medalValues to the countriesMedals values


countriesMedals.take(1)

[['AZE', 7]]

In [132]:
from operator import add

conclusion = countriesMedals.reduceByKey((add)) \
    .sortBy(lambda x :x[1], ascending=False)

conclusion.take(10)

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218)]

In [133]:
sc.stop()