In [17]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, Row

In [18]:
spark = SparkContext(master="local", appName="DataFrames")
sql_context = SQLContext(spark)

In [19]:
!ls files/

deporte.csv           deportistaError.csv   modelo_relacional.jpg
deportista.csv        evento.csv            paises.csv
deportista2.csv       juegos.csv            resultados.csv


In [20]:
! head -n 5 files/juegos.csv
PATH = 'files/'

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [25]:
juego_schema = StructType([
    StructField('juego_id', IntegerType(), False),
    StructField('nombre_juego', StringType(), False),
    StructField('annio', StringType(), False),
    StructField('temporada', StringType(), False),
    StructField('ciudad', StringType(), False)
])

juego_df = sql_context.read.schema(juego_schema) \
    .option('header', 'true').csv(PATH+'juegos.csv')

In [26]:
juego_df.show(4)

+--------+------------+-----+---------+---------+
|juego_id|nombre_juego|annio|temporada|   ciudad|
+--------+------------+-----+---------+---------+
|       1| 1896 Verano| 1896|   Verano|   Athina|
|       2| 1900 Verano| 1900|   Verano|    Paris|
|       3| 1904 Verano| 1904|   Verano|St. Louis|
|       4| 1906 Verano| 1906|   Verano|   Athina|
+--------+------------+-----+---------+---------+
only showing top 4 rows



In [34]:
deportistaOlimpicoRDD = spark.textFile(PATH + 'deportista.csv') \
    .map(lambda l: l.split(','))

In [35]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [36]:
def elimina_encabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [37]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(elimina_encabezado)

In [38]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [42]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [43]:
schema = StructType([
    StructField('deportista_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('genero', IntegerType(), False),
    StructField('edad', IntegerType(), False),
    StructField('altura', IntegerType(), False),
    StructField('peso', FloatType(), False),
    StructField('equipo_id', IntegerType(), False)
])

df = sql_context.createDataFrame(deportistaOlimpicoRDD, schema)

In [45]:
df.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [51]:
resultados = spark.textFile(PATH + 'resultados.csv') \
    .map(lambda l: l.split(','))
resultados = resultados.filter(lambda x: x[1] != 'NA')
resultados = resultados.mapPartitionsWithIndex(elimina_encabezado)
resultados = resultados.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4])
))
schema = StructType([
    StructField('resultado_id', IntegerType(), False),
    StructField('medalla', StringType(), False),
    StructField('deportista_id', IntegerType(), False),
    StructField('juego_id', IntegerType(), False),
    StructField('evento_id', IntegerType(), False)
])

resultados_df = sql_context.createDataFrame(resultados, schema)

In [52]:
resultados_df.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           4|   Gold|            4|       2|        4|
|          38| Bronze|           15|       7|       19|
|          39| Bronze|           15|       7|       20|
|          41| Bronze|           16|      50|       14|
|          42| Bronze|           17|      17|       21|
+------------+-------+-------------+--------+---------+
only showing top 5 rows

