# Tarea: Uso de RDDs y Dataframes en PySpark
## Edgar Callejas Hernández

In [0]:
# Importamos las bibliotecas 
import pyspark
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('primeros_pasos').getOrCreate()

## RDD

In [0]:
# Carga de los datos - RDD (cargamos los datos de un CSV hacia un RDD)
datos_RRD = spark.sparkContext.textFile('/FileStore/tables/players_19.csv')

In [0]:
datos_RRD

Out[59]: /FileStore/tables/players_19.csv MapPartitionsRDD[122] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
for renglon in datos_RRD.take(2):
  print(renglon)

sofifa_id,player_url,short_name,long_name,age,dob,height_cm,weight_kg,nationality,club,overall,potential,value_eur,wage_eur,player_positions,preferred_foot,international_reputation,weak_foot,skill_moves,work_rate,body_type,real_face,release_clause_eur,player_tags,team_position,team_jersey_number,loaned_from,joined,contract_valid_until,nation_position,nation_jersey_number,pace,shooting,passing,dribbling,defending,physic,gk_diving,gk_handling,gk_kicking,gk_reflexes,gk_speed,gk_positioning,player_traits,attacking_crossing,attacking_finishing,attacking_heading_accuracy,attacking_short_passing,attacking_volleys,skill_dribbling,skill_curve,skill_fk_accuracy,skill_long_passing,skill_ball_control,movement_acceleration,movement_sprint_speed,movement_agility,movement_reactions,movement_balance,power_shot_power,power_jumping,power_stamina,power_strength,power_long_shots,mentality_aggression,mentality_interceptions,mentality_positioning,mentality_vision,mentality_penalties,mentality_composure,defe

In [0]:
# Guarda el primer elemento como el nombre de nuestras columnas 
columnas = datos_RRD.take(1)[0]
# Le quitamos el header a los datos 
datos_RRD = datos_RRD.filter(lambda renglon : renglon != columnas)

In [0]:
# Imprimimos el nombre de las columnas junto con su índice 
for i, col in enumerate(columnas.split(',')):
    print(i, col)


0 sofifa_id
1 player_url
2 short_name
3 long_name
4 age
5 dob
6 height_cm
7 weight_kg
8 nationality
9 club
10 overall
11 potential
12 value_eur
13 wage_eur
14 player_positions
15 preferred_foot
16 international_reputation
17 weak_foot
18 skill_moves
19 work_rate
20 body_type
21 real_face
22 release_clause_eur
23 player_tags
24 team_position
25 team_jersey_number
26 loaned_from
27 joined
28 contract_valid_until
29 nation_position
30 nation_jersey_number
31 pace
32 shooting
33 passing
34 dribbling
35 defending
36 physic
37 gk_diving
38 gk_handling
39 gk_kicking
40 gk_reflexes
41 gk_speed
42 gk_positioning
43 player_traits
44 attacking_crossing
45 attacking_finishing
46 attacking_heading_accuracy
47 attacking_short_passing
48 attacking_volleys
49 skill_dribbling
50 skill_curve
51 skill_fk_accuracy
52 skill_long_passing
53 skill_ball_control
54 movement_acceleration
55 movement_sprint_speed
56 movement_agility
57 movement_reactions
58 movement_balance
59 power_shot_power
60 power_jumping
6

In [0]:
# Función para seleccionar las columnas que necesitamos 
def separar_campos(renglon):
    renglon_separado = renglon.split(',')
    # Extrae los campos que queremos
    nombre = str(renglon_separado[2])
    edad = int(renglon_separado[4])
    estatura = int(renglon_separado[6])
    peso = int(renglon_separado[7])
    nacionalidad = str(renglon_separado[8])
    equipo = str(renglon_separado[9])
    salario = int(renglon_separado[13])
    return (nombre, edad, estatura, peso, nacionalidad, equipo, salario)

In [0]:
# aplicamos la función para seprar y seleccionar los datos 
rddFifa19 = datos_RRD.map(separar_campos)

# Imprimimos una muestra de los datos
for renglon in rddFifa19.take(5):
  print(renglon)

('Cristiano Ronaldo', 33, 187, 83, 'Portugal', 'Juventus', 405000)
('L. Messi', 31, 170, 72, 'Argentina', 'FC Barcelona', 565000)
('Neymar Jr', 26, 175, 68, 'Brazil', 'Paris Saint-Germain', 290000)
('De Gea', 27, 193, 76, 'Spain', 'Manchester United', 260000)
('K. De Bruyne', 27, 181, 70, 'Belgium', 'Manchester City', 355000)


## Dataframe

In [0]:
# Leer Dataframe con Spark
df_Fifa = spark.read.csv('/FileStore/tables/players_19.csv', header = True, inferSchema = True)

In [0]:
df_Fifa

Out[66]: DataFrame[sofifa_id: int, player_url: string, short_name: string, long_name: string, age: int, dob: string, height_cm: int, weight_kg: int, nationality: string, club: string, overall: int, potential: int, value_eur: int, wage_eur: int, player_positions: string, preferred_foot: string, international_reputation: int, weak_foot: int, skill_moves: int, work_rate: string, body_type: string, real_face: string, release_clause_eur: int, player_tags: string, team_position: string, team_jersey_number: int, loaned_from: string, joined: string, contract_valid_until: int, nation_position: string, nation_jersey_number: int, pace: int, shooting: int, passing: int, dribbling: int, defending: int, physic: int, gk_diving: int, gk_handling: int, gk_kicking: int, gk_reflexes: int, gk_speed: int, gk_positioning: int, player_traits: string, attacking_crossing: string, attacking_finishing: string, attacking_heading_accuracy: string, attacking_short_passing: string, attacking_volleys: string, skill_d

In [0]:
df_Fifa = df_Fifa.select('short_name','age', 'height_cm', 'weight_kg', 'nationality', 'club', 'wage_eur')

In [0]:
df_Fifa.show(5)

+-----------------+---+---------+---------+-----------+-------------------+--------+
|       short_name|age|height_cm|weight_kg|nationality|               club|wage_eur|
+-----------------+---+---------+---------+-----------+-------------------+--------+
|Cristiano Ronaldo| 33|      187|       83|   Portugal|           Juventus|  405000|
|         L. Messi| 31|      170|       72|  Argentina|       FC Barcelona|  565000|
|        Neymar Jr| 26|      175|       68|     Brazil|Paris Saint-Germain|  290000|
|           De Gea| 27|      193|       76|      Spain|  Manchester United|  260000|
|     K. De Bruyne| 27|      181|       70|    Belgium|    Manchester City|  355000|
+-----------------+---+---------+---------+-----------+-------------------+--------+
only showing top 5 rows



In [0]:
# Cambiamos el nombre de alguna columna temporalmente ( a menos que lo guardemos ya sea en el mismo dataframe o en otro)
df_Fifa.withColumnRenamed('wage_eur','salario').show(5)

+-----------------+---+---------+---------+-----------+-------------------+-------+
|       short_name|age|height_cm|weight_kg|nationality|               club|salario|
+-----------------+---+---------+---------+-----------+-------------------+-------+
|Cristiano Ronaldo| 33|      187|       83|   Portugal|           Juventus| 405000|
|         L. Messi| 31|      170|       72|  Argentina|       FC Barcelona| 565000|
|        Neymar Jr| 26|      175|       68|     Brazil|Paris Saint-Germain| 290000|
|           De Gea| 27|      193|       76|      Spain|  Manchester United| 260000|
|     K. De Bruyne| 27|      181|       70|    Belgium|    Manchester City| 355000|
+-----------------+---+---------+---------+-----------+-------------------+-------+
only showing top 5 rows



In [0]:
# Jugadores de la Juventus menores a 25 años 
df_Fifa.where((df_Fifa.age < 25) & (df_Fifa.club == 'Juventus')).show(5)

+---------------+---+---------+---------+-----------+--------+--------+
|     short_name|age|height_cm|weight_kg|nationality|    club|wage_eur|
+---------------+---+---------+---------+-----------+--------+--------+
|      P. Dybala| 24|      177|       75|  Argentina|Juventus|  205000|
|F. Bernardeschi| 24|      185|       77|      Italy|Juventus|  110000|
|      D. Rugani| 23|      190|       84|      Italy|Juventus|   95000|
|   João Cancelo| 24|      182|       74|   Portugal|Juventus|   88000|
|         E. Can| 24|      184|       82|    Germany|Juventus|   91000|
+---------------+---+---------+---------+-----------+--------+--------+
only showing top 5 rows



## SQL

In [0]:
# Le dice a spark que guarde este DataFrame como una tabla SQL
df_Fifa.registerTempTable('FIFA2019')

In [0]:
# Los 5 jugadores con el mejor salario
consulta = '''
  SELECT  short_name, wage_eur
  FROM FIFA2019
  ORDER BY wage_eur DESC
  '''
spark.sql(consulta).show(5)

+-----------------+--------+
|       short_name|wage_eur|
+-----------------+--------+
|         L. Messi|  565000|
|        L. Suárez|  455000|
|        L. Modrić|  420000|
|Cristiano Ronaldo|  405000|
|     Sergio Ramos|  380000|
+-----------------+--------+
only showing top 5 rows



In [0]:
%sql 
/* La nomina total de jugadores por club */
SELECT club, SUM(wage_eur) AS nomina_total
  FROM FIFA2019
  GROUP BY club
  ORDER BY nomina_total DESC
  limit 5

club,nomina_total
Real Madrid,4961000
FC Barcelona,4747000
Manchester City,3721000
Manchester United,3522000
Juventus,3271000
