In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
#Objetos que dan formato al dataframe
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

#Mala práctica hacer Import *
import pyspark.sql.functions
from pyspark.sql.functions import count
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(master="local", appName="proyecto")

In [3]:
# Create SparkSession
spark = SparkSession.builder \
          .getOrCreate()

In [4]:
#Se llama
sqlContext = SQLContext(sc)



In [5]:
var = "C:/Users/Caroline/Videos/curso-apache-spark-platzi/files/"

depteRDD = sc.textFile(var+"deporte.csv").map(lambda line : line.split(","))
deptaRDD = sc.textFile(var+"deportista.csv").map(lambda line : line.split(","))
evenRDD  = sc.textFile(var+"evento.csv").map(lambda line : line.split(";"))
juegRDD  = sc.textFile(var+"juegos.csv").map(lambda line : line.split(","))
equiRDD  = sc.textFile(var+"paises.csv").map(lambda line : line.split(","))
resulRDD = sc.textFile(var+"resultados.csv").map(lambda line : line.split(";"))

In [6]:
#Función para validar el encabezado
#Iter: Devuelve valor a valor lo que se está procesando
def deletHeader(indice, interador):
    return iter(list(interador)[1:])

In [7]:
#Eliminar el encabezado
depteRDD = depteRDD.mapPartitionsWithIndex(deletHeader) #Deporte
deptaRDD = deptaRDD.mapPartitionsWithIndex(deletHeader) #Deportista
evenRDD = evenRDD.mapPartitionsWithIndex(deletHeader) #Evento
juegRDD = juegRDD.mapPartitionsWithIndex(deletHeader) #Juegos
equiRDD = equiRDD.mapPartitionsWithIndex(deletHeader) #Equipo
resulRDD = resulRDD.mapPartitionsWithIndex(deletHeader) #Resultados

In [8]:
#Antes de Tramsformar los datos del RDD, se debe transformar el tipo de dato
#Deporte
depteRDD = depteRDD.map(lambda l: (
int(l[0]),
l[1]
))


#Deportista
deptaRDD = deptaRDD.map(lambda l: (
int(l[0]),
l[1],    
int(l[2]),
int(l[3]),
float(l[4]),
float(l[5]),
int(l[6])
))


#Evento
evenRDD = evenRDD.map(lambda l: (
int(l[0]),
l[1],    
int(l[2])
))

#Juego
juegRDD = juegRDD.map(lambda l: (
int(l[0]),
l[1],    
int(l[2]),
l[3],
l[4]
))


#Equipos
equiRDD = equiRDD.map(lambda l: (
int(l[0]),
l[1],
l[2]
))


#Resultados
resulRDD = resulRDD.map(lambda l: (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4])
))



In [9]:
#Se crear el Schema

#Deporte
depte = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("nombre_deporte", StringType(), False)
])


#Deportista
depta = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre_deportista", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", FloatType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])


#Evento
even = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("nombre_evento", StringType(), False),
    StructField("deporte_id", IntegerType(), False)
])


#Juego
jueg = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("nombre_juego", StringType(), False),
    StructField("annio", IntegerType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)    
])


#Equipos
equi = StructType([
    StructField("equipo_id", IntegerType(), False),
    StructField("nombre_equipo", StringType(), False),
    StructField("sigla", StringType(), False)
])



#Resultados
resul = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])


In [10]:
#Aplicar el esquema al RDD

#Deporte
depteDF = sqlContext.createDataFrame(depteRDD, depte)

#Deportista
deptaDF = sqlContext.createDataFrame(deptaRDD, depta)

#Evento
evenDF = sqlContext.createDataFrame(evenRDD, even)

#Juego
juegDF = sqlContext.createDataFrame(juegRDD, jueg)

#Equipo
equiDF = sqlContext.createDataFrame(equiRDD, equi)

#Resultados
resulDF = sqlContext.createDataFrame(resulRDD, resul)

In [11]:
#Mostrar esquema
evenDF.printSchema()

root
 |-- evento_id: integer (nullable = false)
 |-- nombre_evento: string (nullable = false)
 |-- deporte_id: integer (nullable = false)



In [None]:
#Mostrar esquema
resulDF.printSchema()

In [None]:
#Función para eliminar nulos
#prueb = resulDF.fillna({'resultado_id':-1, 'evento_id': 1})
#resulDF.filter((resulDF.resultado_id == 34007))
#df.fillna({'Id': -1, 'Nombre': 'Desconocido'}).show()

In [12]:
#Prueba de JOINS
prueb = deptaDF.join(equiDF, deptaDF.equipo_id == equiDF.equipo_id, "inner")\
               .join(resulDF, deptaDF.deportista_id == resulDF.deportista_id, "inner")\
               .join(juegDF, resulDF.juego_id == juegDF.juego_id, "left")\
               .join(evenDF, resulDF.evento_id == evenDF.evento_id, "left")\
               .join(depteDF, evenDF.deporte_id == depteDF.deporte_id, "left")\
               .filter(resulDF.medalla != "NA")\
               .select(equiDF.nombre_equipo, resulDF.medalla)

In [13]:
prueb.createOrReplaceTempView("EMP")

In [18]:
#top 10 de los equipos con más medallas
spark.sql("SELECT nombre_equipo, COUNT(*) as cuenta FROM EMP GROUP BY nombre_equipo ORDER BY 2 DESC").show(10)

+-------------+------+
|nombre_equipo|cuenta|
+-------------+------+
|United States|  2753|
| Soviet Union|  1210|
|      Germany|   969|
|       France|   923|
|Great Britain|   885|
|        Italy|   754|
|       Sweden|   734|
|    Australia|   661|
|       Canada|   644|
|      Hungary|   609|
+-------------+------+
only showing top 10 rows



In [19]:
#top 10 de los equipos con menos medallas
spark.sql("SELECT nombre_equipo, COUNT(*) as cuenta FROM EMP GROUP BY nombre_equipo ORDER BY 2").show(10)

+--------------------+------+
|       nombre_equipo|cuenta|
+--------------------+------+
|     United States-4|     1|
|              Mutafo|     1|
|               Bonzo|     1|
|        Hurlingham-2|     1|
|                 Tip|     1|
|           Gyrinus-1|     1|
|United States Vir...|     1|
|          Complex II|     1|
|               Tango|     1|
|              Guyana|     1|
+--------------------+------+
only showing top 10 rows

