In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [2]:
spark = SparkContext(master="local", appName="DataFrame")
sqlContext = SQLContext(spark)

In [3]:
path="D:\\Proyectos Personales\\Curso Spark\\files"


In [4]:
''' Estructuras Schemas '''

deportistaSchema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)  
])

juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("nombre_juego",StringType(),False),
    StructField("anio",IntegerType(),False),
    StructField("temporada",StringType(),False),  
    StructField("ciudad",StringType(),False) 
])

eventoSchema = StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)

])

paisSchema = StructType ([
    StructField("id",IntegerType(),False),
    StructField("equipo",StringType(),False),
    StructField("sigla",StringType(),False)

])

resultadoSchema = StructType ([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False),

])

deporteSchema= StructType([
    StructField("deporte_id",IntegerType(),False),
    StructField("deporte",StringType(),False)
])


In [5]:

deportistaDF = sqlContext.read.schema(deportistaSchema)\
.option("header","true")\
.csv(path+"\\deportista.csv")

juegoDF = sqlContext.read.schema(juegoSchema)\
.option("header","true")\
.csv(path+"\\juegos.csv")

eventoDF = sqlContext.read.schema(eventoSchema)\
.option("header","true")\
.csv(path+"\\evento.csv")

paisDF = sqlContext.read.schema(paisSchema)\
.option("header","true")\
.csv(path+"\\paises.csv")

resultadoDF = sqlContext.read.schema(resultadoSchema)\
.option("header","true")\
.csv(path+"\\resultados.csv")


deporteDF = sqlContext.read.schema(deporteSchema)\
.option("header","true")\
.csv(path+"\\deporte.csv")

In [6]:
deporteDF.show()

+----------+--------------------+
|deporte_id|             deporte|
+----------+--------------------+
|         1|          Basketball|
|         2|                Judo|
|         3|            Football|
|         4|          Tug-Of-War|
|         5|       Speed Skating|
|         6|Cross Country Skiing|
|         7|           Athletics|
|         8|          Ice Hockey|
|         9|            Swimming|
|        10|           Badminton|
|        11|             Sailing|
|        12|            Biathlon|
|        13|          Gymnastics|
|        14|    Art Competitions|
|        15|       Alpine Skiing|
|        16|            Handball|
|        17|       Weightlifting|
|        18|           Wrestling|
|        19|                Luge|
|        20|          Water Polo|
+----------+--------------------+
only showing top 20 rows



In [7]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [8]:
deportistaDF =  deportistaDF.withColumnRenamed("genero","sexo").drop("altura")

In [9]:
from pyspark.sql.functions import *

In [10]:
deportistaDF=deportistaDF.select("deportista_id","nombre",col("edad").alias("edadAlJugar"),"equipo_id")

In [11]:
deportistaDF.show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
|            6|     Per Knut Aaland|         31|     1096|
|            7|        John Aalberg|         31|     1096|
|            8|Cornelia Cor Aalt...|         18|      705|
|            9|    Antti Sami Aalto|         26|      350|
|           10|Einar Ferdinand E...|         26|      350|
|           11|  Jorma Ilmari Aalto|         22|      350|
|           12|   Jyri Tapani Aalto|         31|      350|
|           13|  Minna Maarit Aalto|         30|      350|
|           14|Pirjo Hannele Aal...|         32|      35

In [12]:
deportistaDF.sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|          224|     Mohamed AbdelEl|          0|      308|
|          487|      Inni Aboubacar|          0|      721|
|          226|Sanad Bushara Abd...|          0|     1003|
|           58|    Georgi Abadzhiev|          0|      154|
|          230|    Moustafa Abdelal|          0|      308|
|          102|   Sayed Fahmy Abaza|          0|      308|
|          260|  Ahmed Abdo Mustafa|          0|     1003|
|          139|George Ioannis Abbot|          0|     1043|
|          281|      S. Abdul Hamid|          0|      487|
|          163|     Ismail Abdallah|          0|     1095|
|          285|Talal Hassoun Abd...|          0|      497|
|          173| Mohamed Abdel Fatah|          0|     1003|
|          179|Ibrahim Saad Abde...|          0|     1003|
|          378|     Angelik Abebame|          0|        

In [13]:
deportistaDF = deportistaDF.filter(deportistaDF.edadAlJugar!=0 )

In [14]:
deportistaDF.sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        47618|Sonja Henie Toppi...|         11|      742|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|         5291|Marcia Arriaga La...|         12|      656|
|        24191| Philippe Cuelenaere|         12|       96|
|        42835|   Werner Grieshofer|         12|       71|
|        25877|Olga Lucia de Ang...|         12|      225|
|        31203|Patricia Anne Pat...|         12|      967|
|        43528|Antoinette Joyce ...|         12|      172|
|        46578|        Diana Hatler|         12|      825|
|        48728|      Margery Hinton|         12|      41

In [15]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edadAlJugar: integer (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [16]:
''' Joins '''

deportistaDF.join(resultadoDF,"deportista_id","left").join(juegoDF,"juego_id","left").join(eventoDF,"evento_id","left").join(deporteDF,"deporte_id","left")\
    .select("nombre",col("edadAlJugar").alias("Edad Al Jugar"),"medalla","nombre_juego","evento").show(30)




+--------------------+-------------+-------+-------------+--------------------+
|              nombre|Edad Al Jugar|medalla| nombre_juego|              evento|
+--------------------+-------------+-------+-------------+--------------------+
|           A Dijiang|           24|     NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|           23|     NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|1988 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|           21|    

In [19]:
"""Debes crear un join que tenga, todas las medallas ganadoras🎖(solo las ganadoras), unidas con el país y el equipo
al cual pertenecen estas medallas"""

ResultadosGanadoresDF=resultadoDF.filter(resultadoDF.medalla != 'NA')
ResultadosGanadoresDF.show(10)

EquiposGanadoresDF = deportistaDF.join(ResultadosGanadoresDF,"deportista_id").select("equipo_id","medalla")
EquiposGanadoresDF.show(10)

PaisesGanadoresDF = EquiposGanadoresDF.join(paisDF,EquiposGanadoresDF.equipo_id==paisDF.id).select("medalla","equipo","sigla")

PaisesGanadoresDF.sort(col("sigla").desc()).show(20)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           4|   Gold|            4|       2|        4|
|          38| Bronze|           15|       7|       19|
|          39| Bronze|           15|       7|       20|
|          41| Bronze|           16|      50|       14|
|          42| Bronze|           17|      17|       21|
|          43|   Gold|           17|      17|       22|
|          45|   Gold|           17|      17|       24|
|          49|   Gold|           17|      17|       28|
|          51| Bronze|           17|      19|       22|
|          61|   Gold|           20|      38|       32|
+------------+-------+-------------+--------+---------+
only showing top 10 rows

+---------+-------+
|equipo_id|medalla|
+---------+-------+
|      278|   Gold|
|      350| Bronze|
|      350| Bronze|
|      350| Bronze|
|      350| Bronze|
|      350|   Gold|
|      350

In [39]:
medallistaXAnio = deportistaDF \
    .join(
        resultadoDF, 
        deportistaDF.deportista_id == resultadoDF.deportista_id, 
        "left"
    ) \
    .join(
        juegoDF, 
        juegoDF.juego_id == resultadoDF.juego_id, 
        "left"
    ) \
    .join(
        paisDF, 
        deportistaDF.equipo_id == paisDF.id, 
        "left"
    ) \
    .join(
        eventoDF, 
        eventoDF.evento_id == resultadoDF.evento_id, 
        "left"
    ) \
    .join(
        deporteDF, 
        deporteDF.deporte_id == eventoDF.deporte_id, 
        "left"
    ) \
    .select(
        "sigla",
        "nombre_juego",
        "medalla",
        eventoDF.evento.alias("Nombre subdisciplina"),
        deporteDF.deporte.alias("Nombre disciplina"),
        deportistaDF.nombre    
    )

medallistaXAnio.show()

+-----+-------------+-------+--------------------+--------------------+--------------------+
|sigla| nombre_juego|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+--------------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating

In [64]:
medallistaXAnio2=medallistaXAnio.filter(medallistaXAnio.medalla!='NA')\
                                 .sort("nombre_juego")\
                                 .groupBy("sigla","nombre_juego","Nombre subdisciplina")\
                                 .count()

In [65]:
medallistaXAnio2.show()

+-----+------------+--------------------+-----+
|sigla|nombre_juego|Nombre subdisciplina|count|
+-----+------------+--------------------+-----+
|  USA| 1896 Verano|Athletics Men's 1...|    1|
|  GER| 1896 Verano|Gymnastics Men's ...|    4|
|  GER| 1896 Verano|Gymnastics Men's ...|    4|
|  GBR| 1896 Verano|Tennis Men's Singles|    1|
|  GBR| 1896 Verano|Tennis Men's Doubles|    1|
|  USA| 1896 Verano|Athletics Men's 1...|    2|
|  USA| 1896 Verano|Athletics Men's 4...|    2|
|  FRA| 1896 Verano|Fencing Men's Foi...|    2|
|  GRE| 1896 Verano|Tennis Men's Singles|    1|
|  GRE| 1896 Verano|Tennis Men's Doubles|    1|
|  USA| 1896 Verano|Athletics Men's H...|    3|
|  USA| 1896 Verano|Athletics Men's L...|    3|
|  USA| 1896 Verano|Athletics Men's T...|    1|
|  USA| 1896 Verano|Athletics Men's 1...|    1|
|  HUN| 1896 Verano|Athletics Men's 8...|    1|
|  GBR| 1896 Verano|Weightlifting Men...|    1|
|  GBR| 1896 Verano|Weightlifting Men...|    1|
|  AUS| 1896 Verano|Tennis Men's Doubles

In [66]:
medallistaXAnioResumen = medallistaXAnio2.groupBy("sigla","nombre_juego")\
    .agg(sum("count").alias("TotalMedallas"))
medallistaXAnioResumen.show()

+-----+------------+-------------+
|sigla|nombre_juego|TotalMedallas|
+-----+------------+-------------+
|  USA| 1896 Verano|           16|
|  GER| 1896 Verano|           13|
|  GBR| 1896 Verano|            7|
|  FRA| 1896 Verano|            5|
|  GRE| 1896 Verano|            4|
|  HUN| 1896 Verano|            4|
|  AUS| 1896 Verano|            3|
|  AUT| 1896 Verano|            1|
|  DEN| 1896 Verano|            3|
|  SWE| 1900 Verano|            2|
|  USA| 1900 Verano|           41|
|  FRA| 1900 Verano|           96|
|  GER| 1900 Verano|           17|
|  NOR| 1900 Verano|            3|
|  GBR| 1900 Verano|           42|
|  HUN| 1900 Verano|            2|
|  SUI| 1900 Verano|           11|
|  NED| 1900 Verano|           12|
|  BEL| 1900 Verano|            8|
|  ITA| 1900 Verano|            2|
+-----+------------+-------------+
only showing top 20 rows



In [69]:
resultadoDF.registerTempTable("resultado")
deportistaDF.registerTempTable("deportista")
paisDF.registerTempTable("paises")

In [70]:
sqlContext.sql("SELECT * FROM deportista").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
|            6|     Per Knut Aaland|         31|     1096|
|            7|        John Aalberg|         31|     1096|
|            8|Cornelia Cor Aalt...|         18|      705|
|            9|    Antti Sami Aalto|         26|      350|
|           10|Einar Ferdinand E...|         26|      350|
|           11|  Jorma Ilmari Aalto|         22|      350|
|           12|   Jyri Tapani Aalto|         31|      350|
|           13|  Minna Maarit Aalto|         30|      350|
|           14|Pirjo Hannele Aal...|         32|      35

In [77]:
sqlContext.sql("""
               SELECT medalla,equipo,sigla
               FROM resultado  r
               JOIN deportista d ON r.deportista_id = d.deportista_id
               JOIN paises  p ON  p.id = d.equipo_id
               WHERE medalla <> 'NA'
               ORDER BY sigla DESC
""").show()

+-------+----------+-----+
|medalla|    equipo|sigla|
+-------+----------+-----+
| Silver|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Silver|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
|   Gold|  Zimbabwe|  ZIM|
| Bronze|  Zimbabwe|  ZIM|
| Bronze|Yugoslavia|  YUG|
|   Gold|Yugoslavia|  YUG|
| Silver|Yugoslavia|  YUG|
| Silver|Yugoslavia|  YUG|
| Silver|Yugoslavia|  YUG|
+-------+----------+-----+
only showing top 20 rows



In [78]:
spark