In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [2]:
spark = SparkContext(master='local', appName='dataframes')
sqlContext = SQLContext(spark)

# Dataframes

In [3]:
path = '/home/robert/Project/spark-2/transformations_actions/files/'

In [4]:
!head -n 5 /home/robert/Project/spark-2/transformations_actions/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [5]:
game_schema = StructType([
    StructField('juego_id', IntegerType(), False),
    StructField('anio', StringType(), False),
    StructField('temporada', StringType(), False),
    StructField('ciudad', StringType(), False)
])

In [6]:
juego_df = sqlContext.read.schema(game_schema) \
    .option('header', 'true').csv(path + 'juegos.csv')

In [7]:
juego_df.show(5)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
|       5|1908 Verano|     1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows



In [8]:
spark

In [9]:
deportista_rdd = spark.textFile(path + 'deportista.csv') \
    .map(lambda l: l.split(','))

In [10]:
deportista_rdd.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [11]:
def remove_head(index, iterator):
    return iter(list(iterator)[1:])

In [12]:
deportista_rdd = deportista_rdd.mapPartitionsWithIndex(remove_head)

In [13]:
deportista_rdd.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [14]:
deportista_rdd = deportista_rdd.map(lambda l: (
    int(l[0]), l[1], int(l[2]), int(l[3]), int(l[4]), float(l[5]), int(l[6])
))

In [15]:
schema = StructType([
    StructField('deportista_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('genero', IntegerType(), False),
    StructField('edad', IntegerType(), False),
    StructField('altura', IntegerType(), False),
    StructField('peso', FloatType(), False),
    StructField('equipo_id', IntegerType(), False),
])

In [16]:
deportista_df = sqlContext.createDataFrame(deportista_rdd, schema)

In [17]:
deportista_df.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [18]:
paises_rdd = spark.textFile(path + 'paises.csv').map(lambda line: line.split(','))
paises_rdd = paises_rdd.mapPartitionsWithIndex(remove_head)
paises_rdd = paises_rdd.map(lambda l: (
int(l[0]),
    l[1],
    l[2]
))

schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('equipo', StringType(), False),
    StructField('sigla', StringType(), False)
])

paises_df = sqlContext.createDataFrame(paises_rdd, schema)

In [19]:
schema_deportes = StructType([
    StructField('deporte_id', IntegerType(), False),
    StructField('deporte', StringType(), False)
])

deportes_df = sqlContext.read.schema(schema_deportes).option('head', 'true').csv(path + 'deporte.csv')

In [20]:
evento_schema = StructType([
    StructField('evento_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('deporte_id', IntegerType(), False)
])

deportes_olimpicos_df = sqlContext.read.schema(evento_schema).option('head', 'true').csv(path + 'evento.csv')

In [21]:
juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("anio",StringType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False),
])
juego_df = sqlContext.read.schema(juegoSchema).option("header","true").csv(path+"juegos.csv")

resultadoSchema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False),
])
resultado_df = sqlContext.read.schema(resultadoSchema).option("header","true").csv(path+"resultados.csv")

In [22]:
deportes_df.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [23]:
deportistaOlimpicoRDD =  spark.textFile(path+"deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = spark.textFile(path+"deportista2.csv").map(lambda line : line.split(","))
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

deportistaOlimpicoRDD=deportistaOlimpicoRDD.mapPartitionsWithIndex(remove_head)

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

schema = StructType([
StructField("deportista_id",IntegerType(),False)     ,
StructField("nombre",StringType(),False)        ,
StructField("genero",IntegerType(),False)        ,
StructField("edad",IntegerType(),True)      ,
StructField("altura",IntegerType(),True)        ,
StructField("peso",FloatType(),True)      ,
StructField("equipo_id",IntegerType(),True)     
])

deportista_olimpico_df = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [24]:
deportista_olimpico_df = deportista_olimpico_df.withColumnRenamed('genero', 'sexo').drop('altura')

In [25]:
deportista_olimpico_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [26]:
from pyspark.sql.functions import *

deportista_olimpico_df = deportista_olimpico_df.select('deportista_id', 'nombre', col('edad').alias('edadAlJugar'), 'equipo_id')

In [27]:
deportista_olimpico_df.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [28]:
deportista_olimpico_df.filter((col('edadAlJugar') != 0)).sort('edadAlJugar').show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        52070|        Etsuko Inada|         11|      514|
|        40129|    Luigina Giavotti|         11|      507|
|        37333|Carlos Bienvenido...|         11|      982|
|        47618|Sonja Henie Toppi...|         11|      742|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [29]:
deportista_olimpico_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = true)
 |-- equipo_id: integer (nullable = true)



In [30]:
resultado_df.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [31]:
juego_df.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- anio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [32]:
deportes_olimpicos_df.printSchema()

root
 |-- evento_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- deporte_id: integer (nullable = true)



In [33]:
join_df = deportista_olimpico_df.join(
    resultado_df, on='deportista_id', how='left'
) \
.join(
    juego_df, on='juego_id', how='left'
) \
.join(
    deportes_olimpicos_df, on='evento_id', how='left'
) \
.select(deportista_olimpico_df.nombre, 'edadAlJugar', 'medalla',
        col('anio').alias('Año de juego'), deportes_olimpicos_df.nombre.alias('nombre de disciplina'))

In [34]:
join_df.show(5, False)

+------------------------+-----------+-------+-------------+----------------------------------+
|nombre                  |edadAlJugar|medalla|Año de juego |nombre de disciplina              |
+------------------------+-----------+-------+-------------+----------------------------------+
|A Dijiang               |24         |NA     |1992 Verano  |Basketball Men's Basketball       |
|A Lamusi                |23         |NA     |2012 Verano  |Judo Men's Extra-Lightweight      |
|Gunnar Nielsen Aaby     |24         |NA     |1920 Verano  |Football Men's Football           |
|Edgar Lindenau Aabye    |34         |Gold   |1900 Verano  |Tug-Of-War Men's Tug-Of-War       |
|Christine Jacoba Aaftink|21         |NA     |1994 Invierno|Speed Skating Women's 1,000 metres|
+------------------------+-----------+-------+-------------+----------------------------------+
only showing top 5 rows



In [35]:
resultado_df.filter(col('medalla') != 'NA') \
.join(deportista_olimpico_df, on='deportista_id', how='left') \
.join(paises_df, col('id') == col('equipo_id'), how='left') \
.select('equipo', 'sigla', 'medalla') \
.dropDuplicates() \
.sort(col('sigla').desc()) \
.show()

+--------------------+-----+-------+
|              equipo|sigla|medalla|
+--------------------+-----+-------+
|            Zimbabwe|  ZIM| Silver|
|            Zimbabwe|  ZIM|   Gold|
|            Zimbabwe|  ZIM| Bronze|
|              Zambia|  ZAM| Silver|
|              Zambia|  ZAM| Bronze|
|          Yugoslavia|  YUG| Silver|
|          Yugoslavia|  YUG|   Gold|
|          Yugoslavia|  YUG| Bronze|
|West Indies Feder...|  WIF| Bronze|
|             Vietnam|  VIE|   Gold|
|             Vietnam|  VIE| Silver|
|           Venezuela|  VEN|   Gold|
|           Venezuela|  VEN| Bronze|
|           Venezuela|  VEN| Silver|
|          Uzbekistan|  UZB|   Gold|
|          Uzbekistan|  UZB| Bronze|
|          Uzbekistan|  UZB| Silver|
|       United States|  USA| Silver|
|     United States-2|  USA| Silver|
|New York Athletic...|  USA| Bronze|
+--------------------+-----+-------+
only showing top 20 rows



In [36]:
medallistaXAnio = deportista_olimpico_df.join(resultado_df, deportista_olimpico_df.deportista_id ==resultado_df.deportista_id,"left") \
        .join(juego_df, juego_df.juego_id == resultado_df.juego_id,"left") \
        .join(paises_df,deportista_olimpico_df.equipo_id == paises_df.id,"left") \
        .join(deportes_olimpicos_df, deportes_olimpicos_df.evento_id == resultado_df.evento_id,"left") \
        .join(deportes_df,deportes_olimpicos_df.deporte_id == deportes_df.deporte_id,"left") \
        .select("sigla",
                "anio",
                "medalla",
                deportes_olimpicos_df.nombre.alias("Nombre subdisciplina"),
                deportes_df.deporte.alias("Nombre disciplina"),
                deportista_olimpico_df.nombre,   
                )

In [37]:
medallistaXAnio2 = medallistaXAnio.filter(col('medalla') != 'NA') \
    .sort('anio') \
    .groupBy('sigla', 'anio', 'Nombre subdisciplina') \
    .count()

In [38]:
medallistaXAnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [39]:
medallistaXAnio2.groupBy('sigla', 'anio') \
    .agg(sum('count').alias('total_medallas'), avg('count').alias('medallas_promedio')) \
    .show()

+-----+-----------+--------------+------------------+
|sigla|       anio|total_medallas| medallas_promedio|
+-----+-----------+--------------+------------------+
|  FRA|1896 Verano|            11|             1.375|
|  SUI|1896 Verano|             3|               1.0|
|  GRE|1896 Verano|            48|1.6551724137931034|
|  USA|1896 Verano|            20|1.6666666666666667|
|  DEN|1896 Verano|             6|               1.0|
|  HUN|1896 Verano|             6|               1.0|
|  GBR|1896 Verano|             9|             1.125|
|  AUT|1896 Verano|             5|               1.0|
|  AUS|1896 Verano|             3|               1.0|
|  GER|1896 Verano|            32|2.6666666666666665|
|  FRA|1900 Verano|           250| 4.310344827586207|
|  GER|1900 Verano|            45| 6.428571428571429|
|  SUI|1900 Verano|            21|               3.5|
|  LUX|1900 Verano|             1|               1.0|
|  NED|1900 Verano|            26| 4.333333333333333|
|  GBR|1900 Verano|         

In [40]:
resultado_df.registerTempTable('resultado')
deportista_olimpico_df.registerTempTable('deportista')
paises_df.registerTempTable('paises')

In [41]:
sqlContext.sql('SELECT * FROM deportista').show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [42]:
sqlContext.sql('''
                SELECT medalla, equipo, sigla FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla DESC
                ''').show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



# Cached

In [52]:
from pyspark.storagelevel import StorageLevel

In [43]:
medallistaXAnio.is_cached

False

In [44]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[161] at javaToPython at NativeMethodAccessorImpl.java:0

In [48]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [49]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[161] at javaToPython at NativeMethodAccessorImpl.java:0

In [53]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[161] at javaToPython at NativeMethodAccessorImpl.java:0

In [56]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [57]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[161] at javaToPython at NativeMethodAccessorImpl.java:0

In [59]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[161] at javaToPython at NativeMethodAccessorImpl.java:0

In [67]:
medallistaXAnio.rdd.countApproxDistinct()

265570

In [68]:
medallistaXAnio.rdd.count()

271109