In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, when, lit

In [None]:
spark.stop()

In [None]:
spark = SparkContext(master="local", appName="DataFrame")
sqlContext = SQLContext(spark)

In [None]:
!head -n 5 ./files/deportista.csv

In [None]:
deportistaSchema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero", IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",IntegerType(),False),
    StructField("equipo_id",IntegerType(),False)
])

In [None]:
!head -n 5 ./files/resultados.csv

In [None]:
resultadoSchema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])

In [None]:
!head -n 5 ./files/juegos.csv

In [None]:
juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("anio",StringType(),False),
    StructField("temporada", StringType(),False),
    StructField("ciudad",StringType(),False)
])

In [None]:
!head -n 5 ./files/evento.csv

In [None]:
deportesOlimpicosSchema = StructType([
    StructField("evento_id", IntegerType(),False),
    StructField("nombre", StringType(),False),
    StructField("deporte_id", IntegerType(),False)
])

In [None]:
deportistaDF = sqlContext.read.schema(deportistaSchema)\
    .option("header","true").csv("./files/deportista.csv") \
    .union (sqlContext.read.schema(deportistaSchema)\
    .option("header","false").csv("./files/deportista2.csv"))

resultadosDF = sqlContext.read.schema(resultadoSchema)\
    .option("header","true").csv("./files/resultados.csv")

juegoDF = sqlContext.read.schema(juegoSchema)\
    .option("header","true").csv("./files/juegos.csv")

deportesOlimpicosDF = sqlContext.read.schema(deportesOlimpicosSchema)\
    .option("header","true").csv("./files/evento.csv")

In [None]:
print(f'Tamaño de deportistaDF: {deportistaDF.count()}')
print(f'Tamaño de resultadosDF: {resultadosDF.count()}')
print(f'Tamaño de juegoDF: {juegoDF.count()}')
print(f'Tamaño de deportesOlimpicosDF: {deportesOlimpicosDF.count()}')

In [None]:
deportistaDF.printSchema()

In [None]:
resultadosDF.printSchema()

In [None]:
juegoDF.printSchema()

In [None]:
deportesOlimpicosDF.printSchema()

In [None]:
df = deportistaDF.join(
    resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left")\
    .join(juegoDF, juegoDF.juego_id == resultadosDF.juego_id,"left")\
    .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadosDF.evento_id,"left")\
    .select(
        deportistaDF.nombre,col("edad").alias("edad_al_jugar"),
        "medalla",col("anio").alias("anio_de_juego"),
        deportesOlimpicosDF.nombre.alias("Nombre_Diciplina"))

In [None]:
df.show()

In [None]:
df2 = df.filter(df.medalla != "NA")\
    .sort(df.anio_de_juego)\
    .groupBy(df.nombre,df.anio_de_juego)\
    .count()

In [None]:
df2.show()

In [None]:
deportistaDF.registerTempTable("deportista")
resultadosDF.registerTempTable("resultados")
juegoDF.registerTempTable("juego")
deportesOlimpicosDF.registerTempTable("deportesOlimpicos")

In [None]:
sqlContext.sql("""
               SELECT *
               FROM deportista
               """).show()

In [None]:
spark

In [None]:
spark.stop()