In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType , StringType ,FloatType, Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import col

In [2]:
sc = SparkContext(master='local',appName='Spark_app')
# Creo un sql Context 
sql_context = SQLContext(sc)

In [3]:
%ls /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/

deporte.csv      deportistaError.csv  [0m[01;35mmodelo_relacional.jpg[0m
deportista2.csv  evento.csv           paises.csv
deportista.csv   juegos.csv           resultados.csv


In [4]:
!head -n 5 /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


**Creeo la estructura del schema** , para luego leer el archivo

In [5]:
path = '/home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/'

#Creo un schema para poder cargar el archivo  
juegoSchema = (StructType
               ([
    StructField('id',IntegerType(),False), #El tercer atributo hace referencia a si es obligatorio que exista el dato
    StructField('name_game',StringType(),False),
    StructField('year',IntegerType(),False),
    StructField('season',StringType(),False),
    StructField('city',StringType(),False)
])
              )

Cargo el archivo como una base de datos 

In [6]:
juegos_df = sql_context.read.schema(juegoSchema).option('header','true').csv(path+'juegos.csv')

In [7]:
juegos_df.show(5)

+---+-----------+----+------+---------+
| id|  name_game|year|season|     city|
+---+-----------+----+------+---------+
|  1|1896 Verano|1896|Verano|   Athina|
|  2|1900 Verano|1900|Verano|    Paris|
|  3|1904 Verano|1904|Verano|St. Louis|
|  4|1906 Verano|1906|Verano|   Athina|
|  5|1908 Verano|1908|Verano|   London|
+---+-----------+----+------+---------+
only showing top 5 rows



In [8]:
!head -n 10 /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/deportista.csv 

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,0,0,273
4,Edgar Lindenau Aabye,1,34,0,0,278
5,Christine Jacoba Aaftink,2,21,185,82,705
6,Per Knut Aaland,1,31,188,75,1096
7,John Aalberg,1,31,183,72,1096
8,Cornelia Cor Aalten Strannood ,2,18,168,0,705
9,Antti Sami Aalto,1,26,186,96,350


In [9]:
path = '/home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/'

#Creo un schema para poder cargar el archivo  
Deportista_schema = (StructType
               ([
    StructField('id',IntegerType(),False), #El tercer atributo hace referencia a si es obligatorio que exista el dato
    StructField('name',StringType(),False),
    StructField('gender',IntegerType(),False),
    StructField('age',IntegerType(),False),
    StructField('altura',IntegerType(),False),
    StructField('peso',FloatType(),False),
    StructField('equipo_id',StringType(),False)
])
              )

In [10]:
deportista_df = sql_context.read.schema(Deportista_schema).option('header','true').csv(path+'deportista.csv')

In [11]:
deportista_df2= sql_context.read.schema(Deportista_schema).option('header','true').csv(path+'deportista2.csv')

In [12]:
deportista_df.show(2)

+---+---------+------+---+------+----+---------+
| id|     name|gender|age|altura|peso|equipo_id|
+---+---------+------+---+------+----+---------+
|  1|A Dijiang|     1| 24|   180|80.0|      199|
|  2| A Lamusi|     1| 23|   170|60.0|      199|
+---+---------+------+---+------+----+---------+
only showing top 2 rows



In [13]:
!head -n 10 /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/paises.csv

id,equipo,sigla
1,30. Februar,AUT
2,A North American Team,MEX
3,Acipactli,MEX
4,Acturus,ARG
5,Afghanistan,AFG
6,Akatonbo,IRL
7,Alain IV,SUI
8,Albania,ALB
9,Alcaid,POR


In [14]:
#Creo un schema para poder cargar el archivo  
paises_schema = (StructType
               ([
    StructField('id',IntegerType(),False), #El tercer atributo hace referencia a si es obligatorio que exista el dato
    StructField('team',StringType(),False),
    StructField('sigla',StringType(),False)
])
              )

In [15]:
paises_df = sql_context.read.schema(paises_schema).option('header','true').csv(path+'paises.csv')

In [16]:
evento_schema = StructType([
    StructField('evento_id',IntegerType(),False),
    StructField('name:evento',StringType(),False),
    StructField('deporte_id',IntegerType(),False)
])

evento_df = sql_context.read.option('header','true').csv(path+'evento.csv')

deporte_schema = StructType([
    StructField('deporte_id',IntegerType(),False),
    StructField('sport_name',StringType(),False),
])
deporte_df = sql_context.read.option('header','true').csv(path+'deporte.csv')

In [17]:
#Creo un schema para poder cargar el archivo  
resultado_schema = (StructType
                    ([
    StructField('resultado_id',IntegerType(),False), #El tercer atributo hace referencia a si es obligatorio que exista el dato
    StructField('medal',StringType(),False),
    StructField('deportista_id',IntegerType(),False),
    StructField('juego_id',IntegerType(),False),
    StructField('evento_id',StringType(),False)
]))

In [18]:
resultado_df = sql_context.read.option('header','true').csv(path+'resultados.csv')

In [19]:
deportista_df = deportista_df.union(deportista_df2)

In [20]:
print(*'-'+'deportista')
deportista_df.printSchema()
print(*'-'+'resultado')
resultado_df.printSchema()
print(*'-'+'equipo')
paises_df.printSchema()
print(*'-'+'juegos')
juegos_df.printSchema()
print(*'-'+'evento')
evento_df.printSchema()
print(*'-'+'deporte')
deporte_df.printSchema()

- d e p o r t i s t a
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: string (nullable = true)

- r e s u l t a d o
root
 |-- resultado_id: string (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: string (nullable = true)
 |-- juego_id: string (nullable = true)
 |-- evento_id: string (nullable = true)

- e q u i p o
root
 |-- id: integer (nullable = true)
 |-- team: string (nullable = true)
 |-- sigla: string (nullable = true)

- j u e g o s
root
 |-- id: integer (nullable = true)
 |-- name_game: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- city: string (nullable = true)

- e v e n t o
root
 |-- evento_id: string (nullable = true)
 |-- evento: string (nullable = true)
 |-- deporte_id: string (nullable = true)



In [21]:
full_table = (deportista_df.join(
    resultado_df,
    deportista_df.id == resultado_df.deportista_id,
    'right').join(
    paises_df,
    paises_df.id == deportista_df.equipo_id,
    'left').join(
    juegos_df,
    juegos_df.id == resultado_df.juego_id,
    'left').join(
    evento_df,
    evento_df.evento_id == resultado_df.evento_id,
    'left').join(
    deporte_df,
    deporte_df.deporte_id == evento_df.deporte_id,
    'left')
 )

In [22]:
(full_table.select('name','medalla','sigla','team')
 .filter(col('medalla') != 'NA')
 .show())

+--------------------+-------+-----+--------------+
|                name|medalla|sigla|          team|
+--------------------+-------+-----+--------------+
|Edgar Lindenau Aabye|   Gold|  SWE|Denmark/Sweden|
|Arvo Ossian Aaltonen| Bronze|  FIN|       Finland|
|Arvo Ossian Aaltonen| Bronze|  FIN|       Finland|
|Juhamatti Tapio A...| Bronze|  FIN|       Finland|
|Paavo Johannes Aa...| Bronze|  FIN|       Finland|
|Paavo Johannes Aa...|   Gold|  FIN|       Finland|
|Paavo Johannes Aa...|   Gold|  FIN|       Finland|
|Paavo Johannes Aa...|   Gold|  FIN|       Finland|
|Paavo Johannes Aa...| Bronze|  FIN|       Finland|
|  Kjetil Andr Aamodt|   Gold|  NOR|        Norway|
|  Kjetil Andr Aamodt| Bronze|  NOR|        Norway|
|  Kjetil Andr Aamodt| Silver|  NOR|        Norway|
|  Kjetil Andr Aamodt| Bronze|  NOR|        Norway|
|  Kjetil Andr Aamodt| Silver|  NOR|        Norway|
|  Kjetil Andr Aamodt|   Gold|  NOR|        Norway|
|  Kjetil Andr Aamodt|   Gold|  NOR|        Norway|
|  Kjetil An

# Agrupar 

In [23]:
medaliista_porAño = (full_table
  .select('sigla','name_game','medalla',col('deporte').alias('Disciplina'),col('evento').alias('Sub Disciplina'),'name')
 .filter(col('medalla') != 'NA')
 .sort(col('year'))
)

In [24]:
medaliista_porAño.show()

+-----+-----------+-------+----------+--------------------+--------------------+
|sigla|  name_game|medalla|Disciplina|      Sub Disciplina|                name|
+-----+-----------+-------+----------+--------------------+--------------------+
|  GBR|1896 Verano| Bronze|   Cycling|Cycling Men's Roa...|       Edward Battel|
|  GRE|1896 Verano| Bronze|  Swimming|Swimming Men's 10...|    Dimitrios Drivas|
|  USA|1896 Verano| Silver| Athletics|Athletics Men's 1...|Arthur Charles Blake|
|  GRE|1896 Verano|   Gold|Gymnastics|Gymnastics Men's ...|Nikolaos Andriako...|
|  GER|1896 Verano|   Gold|Gymnastics|Gymnastics Men's ...|Conrad Helmut Fri...|
|  USA|1896 Verano|   Gold| Athletics|Athletics Men's H...|Ellery Harding Clark|
|  GRE|1896 Verano| Bronze|  Shooting|Shooting Men's Mi...|    Nikolaos Dorakis|
|  USA|1896 Verano|   Gold| Athletics|Athletics Men's L...|Ellery Harding Clark|
|  GER|1896 Verano|   Gold|Gymnastics|Gymnastics Men's ...|Conrad Helmut Fri...|
|  USA|1896 Verano| Silver| 

Agrupo por la sigla , año y sub disciplina. Y la operación que aplico es de contar 

In [25]:
medallista_xyear = (medaliista_porAño
 .groupBy('sigla',col('name_game').alias('year'),'Sub Disciplina')
 .count()
 .sort(['name_game','sigla'],ascending=True)
)

In [26]:
medallista_xyear.show(5)

+-----+-----------+--------------------+-----+
|sigla|       year|      Sub Disciplina|count|
+-----+-----------+--------------------+-----+
|  AUS|1896 Verano|Athletics Men's 8...|    1|
|  AUS|1896 Verano|Tennis Men's Doubles|    1|
|  AUS|1896 Verano|Athletics Men's 1...|    1|
|  AUT|1896 Verano|Cycling Men's 333...|    1|
|  AUT|1896 Verano|Cycling Men's 10,...|    1|
+-----+-----------+--------------------+-----+
only showing top 5 rows



In [27]:
medallista_xyear.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- year: string (nullable = true)
 |-- Sub Disciplina: string (nullable = true)
 |-- count: long (nullable = false)



Todo la columna de count y la uso para hacer operaciones, la menera correcta de hacer varias opeación cuando estoy agrupando, es hacerlo mediante el agg

In [28]:
from pyspark.sql.functions import sum , avg, round

Para poder usar las operaciones las tengo que importar, de lo contrario me saldrá un error 

In [29]:
(medallista_xyear.groupBy('sigla')
 .agg(sum('count').alias('Total de medallas ganadas'),round(avg('count'),2).alias('Primedio por año'))
 .show()
)


+-----+-------------------------+----------------+
|sigla|Total de medallas ganadas|Primedio por año|
+-----+-------------------------+----------------+
|  POL|                      564|            1.85|
|  JAM|                      164|            2.34|
|  BRA|                      475|            3.83|
|  ARM|                       19|             1.0|
|  MOZ|                        2|             1.0|
|  CUB|                      410|            1.92|
|  JOR|                        1|             1.0|
|  FRA|                     1785|            2.34|
|  ALG|                       17|             1.0|
|  BRN|                        3|             1.0|
|  NEP|                        1|             1.0|
|  VIE|                        4|             1.0|
|  BOT|                        1|             1.0|
|  ETH|                       53|            1.26|
|  RSA|                      134|            1.52|
|  UKR|                      200|            1.43|
|  ITA|                     162

# Operaciones tipo SQL

Registro los df con alias para poder usar en el query

In [30]:
resultado_df.registerTempTable('resultado')
paises_df.registerTempTable('paises')
deportista_df.registerTempTable('deportista')

In [31]:
sql_context.sql('''
SELECT sigla,  medalla , count(medalla) 
FROM resultado as r 
JOIN deportista as d ON r.deportista_id == d.id
JOIN paises as p ON p.id == d.equipo_id
WHERE medalla != 'NA' 
group by sigla , medalla
order by sigla desc
''').show()

+-----+-------+--------------+
|sigla|medalla|count(medalla)|
+-----+-------+--------------+
|  ZIM| Bronze|             1|
|  ZIM|   Gold|            17|
|  ZIM| Silver|             4|
|  ZAM| Bronze|             1|
|  ZAM| Silver|             1|
|  YUG| Silver|           180|
|  YUG|   Gold|           136|
|  YUG| Bronze|           101|
|  WIF| Bronze|             1|
|  VIE|   Gold|             1|
|  VIE| Silver|             3|
|  VEN| Silver|             3|
|  VEN|   Gold|             2|
|  VEN| Bronze|            10|
|  UZB|   Gold|            10|
|  UZB| Silver|             7|
|  UZB| Bronze|            17|
|  USA| Bronze|          1356|
|  USA| Silver|          1648|
|  USA|   Gold|          2639|
+-----+-------+--------------+
only showing top 20 rows



In [36]:
%ls /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/deportistaError.csv

deporte.csv      deportistaError.csv  [0m[01;35mmodelo_relacional.jpg[0m
deportista2.csv  evento.csv           paises.csv
deportista.csv   juegos.csv           resultados.csv


In [37]:
!head -n 5 /home/cesar/Documents/prac_sp/curso-apache-spark-platzi/files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


Ya que hay columnas faltantes, tengo que hacer un tratado de datos con una UDF

# UDF

In [51]:
deportista_error = sc.textFile(path+'deportistaError.csv').map(lambda x : x.split(','))
deportista_error.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '', '', '278']]

In [43]:
def delete_header(iterator):
    return iter(list(iterator)[1:])

In [52]:
# Elimino el emcabezado 
deportista_error = deportista_error.mapPartitions(delete_header)
deportista_error.take(3)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273']]

**Tengo que separar los datos , para que el UDF me funcione**

In [94]:
deportista_error = deportista_error.map(lambda x : 
 (x[0],
  x[1],
  x[2],
  x[3],
  x[4],
  x[5],
  x[6]))

In [96]:
deportista_error.take(2)

[('1', 'A Dijiang', '1', '24', '180', '80', '199'),
 ('2', 'A Lamusi', '1', '23', '170', '60', '199')]

In [97]:

deportista_error_schema = StructType([
    StructField('id',StringType(),False), 
    StructField('name',StringType(),False),
    StructField('gender',StringType(),False),
    StructField('age',StringType(),False),
    StructField('height',StringType(),False),
    StructField('weight',StringType(),False),
    StructField('equipo_id',StringType(),False)
])
deportista_errorDF = sql_context.createDataFrame(deportista_error,deportista_error_schema)
deportista_errorDF.show()

+---+--------------------+------+---+------+------+---------+
| id|                name|gender|age|height|weight|equipo_id|
+---+--------------------+------+---+------+------+---------+
|  1|           A Dijiang|     1| 24|   180|    80|      199|
|  2|            A Lamusi|     1| 23|   170|    60|      199|
|  3| Gunnar Nielsen Aaby|     1| 24|      |      |      273|
|  4|Edgar Lindenau Aabye|     1| 34|      |      |      278|
|  5|Christine Jacoba ...|     2| 21|   185|    82|      705|
|  6|     Per Knut Aaland|     1| 31|   188|    75|     1096|
|  7|        John Aalberg|     1| 31|   183|    72|     1096|
|  8|"Cornelia ""Cor""...|     2| 18|   168|      |      705|
|  9|    Antti Sami Aalto|     1| 26|   186|    96|      350|
| 10|"Einar Ferdinand ...|     1| 26|      |      |      350|
| 11|  Jorma Ilmari Aalto|     1| 22|   182|  76.5|      350|
| 12|   Jyri Tapani Aalto|     1| 31|   172|    70|      350|
| 13|  Minna Maarit Aalto|     2| 30|   159|  55.5|      350|
| 14|Pir

In [54]:
from pyspark.sql.functions import udf

Creo la función para cambiar de str a int las columnas, esta será mi función de UDF

In [98]:
def conver_int(column):
    return int(column) if len(column) > 0 else None

In [99]:
# udf(lambda z : función_creada(z),formato al que voy a convertir)
convert_int_udf = udf(lambda z : conver_int(z),IntegerType())

#Ahora tengo que reguistrar la UDF

In [122]:
sql_context.udf.register('convert',convert_int_udf)

#sql_context.udf.register('El nombre con que se va a registar la función',La función de donde aue va a tomar)

<function __main__.<lambda>(z)>

In [123]:
deportista_errorDF.select(convert_int_udf('height').alias('height')).show(3)

+------+
|height|
+------+
|   180|
|   170|
|  null|
+------+
only showing top 3 rows



In [107]:
deportista_errorDF.registerTempTable('error_deportista')

In [135]:
sql_context.sql('''
SELECT age , avg(convert(height)) OVER(PARTITION BY (convert(equipo_id))) as promedio
FROM error_deportista
''').show()

+---+------------------+
|age|          promedio|
+---+------------------+
| 17|             163.0|
| 23|             163.0|
| 22|             163.0|
| 26|             163.0|
| 25|             163.0|
| 24|             163.0|
|   |174.73706896551724|
| 20|174.73706896551724|
| 28|174.73706896551724|
| 22|174.73706896551724|
| 24|174.73706896551724|
| 34|174.73706896551724|
| 27|174.73706896551724|
| 25|174.73706896551724|
| 25|174.73706896551724|
| 23|174.73706896551724|
| 25|174.73706896551724|
| 18|174.73706896551724|
| 27|174.73706896551724|
| 37|174.73706896551724|
+---+------------------+
only showing top 20 rows



In [37]:
sc

In [136]:
sc.stop()