In [55]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.functions import sum, avg
from pyspark.storagelevel import StorageLevel

In [2]:
spark = SparkContext(master='local', appName='Dataframes')
sqlContext = SQLContext(spark)

In [3]:
#aca cargo la ruta del archivo
path = "files/"

In [28]:
equiposOlimpicosRDD = spark.textFile(path+'paises.csv') \
    .map(lambda line : line.split(','))

In [29]:
#spark.stop() #stop detiene el contexto

In [30]:
deportistaOlimpicoRDD = spark.textFile(path+'deportista.csv').map(lambda l: l.split(','))
deportistaOlimpicoRDD2 = spark.textFile(path+'deportista2.csv').map(lambda l: l.split(','))

In [31]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [32]:
def eliminaEncabezado(indice, interador):
    return iter(list(interador)[1:])

In [33]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

In [34]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: 
    (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
    )
)

In [35]:
schema = StructType([
    StructField('deportista_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('genero', IntegerType(), False),
    StructField('edad', IntegerType(), False),
    StructField('altura', IntegerType(), False),
    StructField('peso', FloatType(), False),
    StructField('equipo_id', IntegerType(), False )
])

In [36]:
deportistaOlimpicoDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schema)

In [37]:
evento_schema = StructType([
    StructField('evento_id', IntegerType(), False),
    StructField('evento', StringType(), False),
    StructField('deporte_id', IntegerType(), False)
])

In [38]:
eventoDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(evento_schema).\
        load(path+"evento.csv")

In [39]:
paises_schema = StructType([
    StructField("equipo_id", IntegerType(), False),
    StructField("equipo", StringType(), False), 
    StructField("sigla", StringType(), False)
])


In [40]:
paisesDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(paises_schema).\
        load(path+"paises.csv")


In [41]:
resultados_schema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])


In [42]:
resultadosDF = sqlContext.read.format("csv").\
            option("header", True).\
            schema(resultados_schema).\
            load(path+"resultados.csv")


In [43]:
deportes_schema = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("deporte", StringType(), False)
])

In [44]:
deportesDF = sqlContext.read.format("csv").\
            option("header", True).\
            schema(deportes_schema).\
            load(path+"deporte.csv")

In [45]:
#eliminanado y renombrando columnas
deportistaOlimpicoDF = deportistaOlimpicoDF.withColumnRenamed('genero','sexo').drop('altura')

In [46]:
deportistaOlimpicoDF = deportistaOlimpicoDF.select('deportista_id', 'nombre', col('edad').alias('edadAlJugar'), 'equipo_id')

In [47]:
deportistaOlimpicoDF = deportistaOlimpicoDF.filter(deportistaOlimpicoDF.edadAlJugar != 0)

In [48]:
juego_schema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("anio", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

In [49]:
juegoDF = sqlContext.read.format("csv").\
            option("header", True).\
            schema(juego_schema).\
            load(path+"juegos.csv")

In [50]:
medallistaXAnio = deportistaOlimpicoDF \
    .join(
        resultadosDF, 
        deportistaOlimpicoDF.deportista_id == resultadosDF.deportista_id, 
        "left"
    ) \
    .join(
        juegoDF, 
        juegoDF.juego_id == resultadosDF.juego_id, 
        "left"
    ) \
    .join(
        paisesDF, 
        deportistaOlimpicoDF.equipo_id == paisesDF.equipo_id, 
        "left"
    ) \
    .join(
        eventoDF, 
        eventoDF.evento_id == resultadosDF.evento_id, 
        "left"
    ) \
    .join(
        deportesDF, 
        eventoDF.deporte_id == deportesDF.deporte_id, 
        "left"
    ) \
    .select(
        "sigla",
        "anio",
        "medalla",
        eventoDF.evento.alias("nombre subdisciplina"),
        deportesDF.deporte.alias("nombre disciplina"),
        deportistaOlimpicoDF.nombre    
    )


In [51]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != 'NA') \
    .sort('anio') \
        .groupBy('sigla', 'anio','nombre subdisciplina') \
            .count()

In [59]:
medallistaXAnio.is_cached

False

In [60]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0

In [61]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [64]:
#si le damos persistir despues de guardarlo en cache, da error, xq no se puede modificar el nivel de persistencia, sino hay que darle unpersist y luego guardar
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0

In [65]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0

In [66]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [67]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0

In [68]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0