In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType,StringType,FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [None]:
#spark = SparkContext(master="local",appName="Dataframes")
#sqlContext = SQLContext(spark)
spark = SparkSession.builder\
                    .master("local") \
                    .appName("Dataframes") \
                    .getOrCreate()

In [None]:
! head -n 5 curso-apache-spark-platzi/files/juegos.csv

In [None]:
path = "/home/gerardo/proyectos_PLATZI/Curso_Spark/curso-apache-spark-platzi/files/"

In [None]:
juego_schema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("nombre_juego",StringType(),False),
    StructField("annio",IntegerType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False)
])

juego_df = spark.read.schema(juego_schema) \
        .option("header","true") \
        .csv(path + "juegos.csv")

In [None]:
juego_df.show(10)

In [None]:
#spark.stop()

### Inferencia de tipos de datos

In [None]:
!tail -n 5 curso-apache-spark-platzi/files/deportista2.csv

In [None]:
deportista_schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

deportista_df = spark.read \
.csv([path + "deportista.csv",path + "deportista2.csv"],schema=deportista_schema)

In [None]:
deportista_df.show()

In [None]:
!head -n 5 curso-apache-spark-platzi/files/resultados.csv

In [None]:
resultados_schema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False),
])
resultados_df = spark.read.schema(resultados_schema) \
                .option("header","true") \
                .csv(path + "resultados.csv")

In [None]:
resultados_df.show(5)

In [None]:
paises_schema = StructType([
    StructField("id",IntegerType(),False),
    StructField("equipo",StringType(),False),
    StructField("sigla",StringType(),False)
])
paises_df = spark.read.schema(paises_schema) \
                .option("header","true") \
                .csv(path + "paises.csv")

In [None]:
paises_df.show(5)

In [None]:
#spark.stop()

In [None]:
deportes_schema =  StructType([
    StructField("deporte_id",IntegerType(),False),
    StructField("deporte",StringType(),False)
])

deportes_df = spark.read.schema(deportes_schema) \
                .csv(path + "deporte.csv",header = True)

In [None]:
deportes_df.show(5)

In [None]:
deportes_df.printSchema()

In [None]:
evento_schema =  StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])

evento_df = spark.read.schema(evento_schema) \
                .csv(path + "evento.csv",header = True)

In [None]:
from pyspark.sql.functions import trim
deportes_df = deportes_df.join(evento_df, trim(deportes_df.deporte_id) == trim(evento_df.deporte_id),"left") \
               .select(deportes_df.deporte_id,"deporte","evento_id")
deportes_df.show(5)

In [None]:
#cambiar nombre de cplumna
deportista_df = deportista_df.withColumnRenamed("genero","sexo").drop("altura")

In [None]:
deportista_df.printSchema()

In [None]:
import pyspark.sql.functions as f
deportista_df = deportista_df.select("deportista_id","nombre",
                     f.col("edad").alias("edadALJugar"),
                     "equipo_id"
                    )

In [None]:
deportista_df.show(5)

In [None]:
deportista_df = deportista_df.filter(deportista_df.edadALJugar != 0)
deportista_df.sort("edadALJugar").show(5)

In [None]:
deportista_df.join(resultados_df,deportista_df.deportista_id == resultados_df.deportista_id,"left") \
    .join(juego_df,juego_df.juego_id == resultados_df.juego_id,"left")\
    .join(deportes_df,deportes_df.evento_id == resultados_df.evento_id,"left") \
    .select(deportista_df.nombre,f.col("edadALJugar"),"medalla",f.col("annio").alias("Año de juego"),
           deportes_df.deporte.alias("Nombre de disciplina")
           ).show()

In [None]:
deportista_df.join(paises_df,deportista_df.equipo_id == paises_df.id,"left") \
            .join(resultados_df,deportista_df.deportista_id == resultados_df.deportista_id, "left") \
            .filter(resultados_df.medalla != "NA") \
            .select("medalla","sigla","equipo") \
            .show(20)

In [None]:
#spark.stop()

In [None]:
medallista_x_anio = deportista_df \
        .join(resultados_df,deportista_df.deportista_id == resultados_df.deportista_id,"left") \
        .join(juego_df,juego_df.juego_id == resultados_df.juego_id,"left") \
        .join(paises_df,deportista_df.equipo_id ==  paises_df.id,"left") \
        .join(evento_df,evento_df.evento_id == resultados_df.evento_id,"left") \
        .join(deportes_df,deportes_df.deporte_id == evento_df.deporte_id,"left") \
        .select("sigla",
               "annio",
               "medalla",
               evento_df.evento.alias("Nombre subdisciplina"),
               deportes_df.deporte.alias("Nombre disciplina"),
               deportista_df.nombre)

In [None]:
medallista_x_anio_2 = medallista_x_anio.filter(medallista_x_anio.medalla != "NA") \
    .sort("annio") \
    .groupBy("sigla","annio","Nombre subdisciplina") \
    .count()

In [None]:
medallista_x_anio_2.show()

In [None]:
from pyspark.sql.functions import sum,avg
medallista_x_anio_2.groupBy("sigla","annio") \
        .agg(sum("count").alias("Total de medallas"), \
        avg("count").alias("Medallas promedio")).show()

## SQL

In [None]:
#createOrReplaceTempView() reemplaza registerTempTable() en la versión 2 de pyspark 
resultados_df.createOrReplaceTempView("resultado")
deportista_df.createOrReplaceTempView("deportista")
paises_df.createOrReplaceTempView("paises")

In [None]:
spark.sql('''
            SELECT medalla, equipo, sigla
                    FROM resultado r
                    JOIN deportista d
                      ON r.deportista_id = d.deportista_id
                    JOIN paises p
                      ON p.id = d.equipo_id
                   WHERE medalla <> "NA"
                   ORDER BY sigla DESC
          ''').show()

In [1]:
#spark.stop()

NameError: name 'spark' is not defined