# Instalar pyspark

In [149]:
from pyspark.sql import SparkSession
import findspark
findspark.init()

In [150]:
spark = SparkSession.builder.getOrCreate()

In [151]:
rdd = spark.sparkContext.textFile("files/paises.csv")

In [152]:
rdd1 = rdd.map(lambda line:line.split(","))

In [153]:
rdd1

PythonRDD[245] at RDD at PythonRDD.scala:53

In [154]:
rdd1.take(15)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL']]

In [155]:
rdd1.map(lambda x: (x[2])).distinct().count()

231

# Cuantos equipos hay

In [156]:
rdd1.map(lambda x: (x[2],x[1])).groupByKey().mapValues(len).take(5)

[('sigla', 1), ('AFG', 1), ('SUI', 17), ('ALB', 1), ('POR', 21)]

## Mostrar todos los valores en lista

In [157]:
rdd1.map(lambda x: (x[2],x[1])).groupByKey().mapValues(list).take(5)

[('sigla', ['equipo']),
 ('AFG', ['Afghanistan']),
 ('SUI',
  ['Alain IV',
   'Ali-Baba IV',
   'Ali-Baba IX',
   'Ali-Baba VI',
   'Baccara',
   'Ballerina IV',
   'Fantasio III',
   'Kln',
   'Lerina',
   'Pousse-Moi Pas VII',
   'Switzerland',
   'Switzerland-1',
   'Switzerland-2',
   'Tim-Tam III',
   'Ylliam II',
   'Ylliam VII',
   'Ylliam VIII']),
 ('ALB', ['Albania']),
 ('POR',
  ['Alcaid',
   'Argus',
   'Calcinhas',
   'Camelia',
   'Ciocca III',
   'Espadarte',
   'Espardate',
   'Faneca',
   'Galopin De La Font',
   'Grifo III',
   'Grifo IV',
   'Hannover',
   "Ma'Lindo",
   'Notavel',
   'Oxalis',
   'Portugal',
   'Portugal-1',
   'Portugal-2',
   'Sjhxa',
   'Symphony',
   'Vicking'])]

## Ubicar datos que coincidad

In [158]:
equipos_argentinos = rdd1.filter(lambda l : "ARG" in l)
equipos_argentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

## Conteo total de todo el contenido

In [159]:
rdd1.count()

1185

## Saber cuantos valores tiene el RDD

In [160]:
rdd1.countApprox(20)

1185

## Crear nnuevo RDD

In [161]:
deportistaOlimpico = spark.sparkContext.textFile("files/deportista.csv").map(lambda line:line.split(","))

In [162]:
deportistaOlimpico2 = spark.sparkContext.textFile("files/deportista.csv").map(lambda line:line.split(","))

## Unimos RDD

In [163]:
deportistaOlimimpicoRDD = deportistaOlimpico.union(deportistaOlimpico2)

# Conteo de datos

In [164]:
deportistaOlimimpicoRDD.count()

135574

## Ver la ultima y primera linea de un archivo

In [165]:
rdd1.top(2)

[['id', 'equipo', 'sigla'], ['999', 'Stella-2', 'NOR']]

In [166]:
deportistaOlimimpicoRDD.top(2)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id']]

## Union de datos

In [167]:
deportistaOlimimpicoRDD.map(lambda l: [l[-1],l[:-1]]).join(rdd1.map(lambda x: [x[0],x[2]])).take(6)

[('742', (['20', 'Kjetil Andr Aamodt', '1', '20', '176', '85'], 'NOR')),
 ('742', (['21', 'Ragnhild Margrethe Aamodt', '2', '27', '163', '0'], 'NOR')),
 ('742', (['23', 'Fritz Aanes', '1', '22', '187', '89'], 'NOR')),
 ('742', (['24', 'Nils Egil Aaness', '1', '24', '0', '0'], 'NOR')),
 ('742', (['25', 'Alf Lied Aanning', '1', '24', '0', '0'], 'NOR')),
 ('742',
  (['26', 'Agnes Erika Aanonsen Eyde ', '2', '17', '169', '65'], 'NOR'))]

## Elegir muestras aleatorias

In [168]:
deportistaOlimimpicoRDD.map(lambda l: [l[-1],l[:-1]]).join(rdd1.map(lambda x: [x[0],x[2]])).takeSample(False,6,25)

[('138', (['27269', 'Cyro Marques Delgado', '1', '19', '182', '76'], 'BRA')),
 ('256', (['46574', 'Jaroslav Hatla', '1', '31', '178', '65'], 'CZE')),
 ('1065', (['5531', 'Fana Ashby', '2', '19', '168', '67'], 'TTO')),
 ('213', (['5917', 'Giorgio Audizio', '1', '37', '182', '80'], 'ITA')),
 ('35',
  (['24634', 'JeanJacques Nzadi da Conceio', '1', '28', '198', '100'], 'ANG')),
 ('15', (['38242', 'Zahra Gamir', '2', '34', '168', '55'], 'ALG'))]

## CARGAR NUEVO RDD

In [169]:
resultado = spark.sparkContext.textFile("files/resultados.csv").map(lambda line:line.split(","))

In [170]:
resultado = resultado.filter(lambda lines : "NA" not in lines[1])

In [171]:
resultado.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19'],
 ['39', 'Bronze', '15', '7', '20'],
 ['41', 'Bronze', '16', '50', '14']]

In [172]:
deportistaPais = deportistaOlimimpicoRDD.map(lambda l : [l[-1],l[: -1]]).join(rdd1.map(lambda x: [ x[0],x[2]]))

In [173]:
deportistaPais.join(resultado).take(6)

[('1090',
  ((['9150', 'Yelena Yuryevna Bechke Petrova Ellis ', '2', '26', '158', '48'],
    'EUN'),
   'Bronze')),
 ('1090',
  ((['9819', 'Gennady Vladimirovich Belyakov', '1', '23', '171', '84'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['41896', 'Andrey Vladimirovich Gorokhov', '1', '23', '185', '92'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['64345', 'Sergey Valeryevich Kruglov', '1', '31', '0', '0'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['9150', 'Yelena Yuryevna Bechke Petrova Ellis ', '2', '26', '158', '48'],
    'EUN'),
   'Bronze')),
 ('1090',
  ((['9819', 'Gennady Vladimirovich Belyakov', '1', '23', '171', '84'], 'EUN'),
   'Bronze'))]

## Operaciones Matematicas dentro de los RDD

In [174]:
valoresMedallas = {
  'Gold':7,
  'Silver':5,
  "Bronze":4
}

In [175]:
paisesMedallas = deportistaPais.join(resultado)

In [176]:
paisesMedallas.map(lambda x: [x[1][0][-1],x[1][1]]).take(4)

[['EUN', 'Bronze'], ['EUN', 'Bronze'], ['EUN', 'Bronze'], ['EUN', 'Bronze']]

In [177]:
paisesMedallas = paisesMedallas.map(lambda x: [x[1][0][-1], valoresMedallas[x[1][1]]])

In [178]:
from operator import add

In [179]:
conclucion =  paisesMedallas.reduceByKey(add).sortBy(lambda x: x[1], ascending=False)

In [180]:
conclucion.take(4)

[('CAN', 33142), ('ARG', 12908), ('HUN', 11272), ('MEX', 6056)]

## Crear Df en base a RDD

### Eliminar encabezado

In [181]:
def removeHeader(indice, interador):
  return iter(list(interador)[1:])

In [182]:
deportistaOlimpicoRDD = deportistaOlimimpicoRDD.mapPartitionsWithIndex(removeHeader)

In [183]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

### Transformar Valores

In [184]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [185]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType,
    FloatType,
)
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

In [186]:
sqlContext = SQLContext(spark)



### Esquema del nuevo DataFrame

In [187]:
schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

In [188]:
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [189]:
deportistaDF.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



## Convirtiendo el resto de rdds

In [190]:
paisesRDD = spark.sparkContext.textFile("files/paises.csv").map(
    lambda line: line.split(",")
)

In [191]:
paisesRDD.take(2)

[['id', 'equipo', 'sigla'], ['1', '30. Februar', 'AUT']]

In [192]:
paisesRDD = paisesRDD.mapPartitionsWithIndex(removeHeader)

In [193]:
paisesRDD = paisesRDD.map(lambda l : (
    int(l[0]),
    l[1],
    l[2]
))

In [194]:
paisesRDD.take(2)

[(1, '30. Februar', 'AUT'), (2, 'A North American Team', 'MEX')]

In [195]:
paisesSchema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("equipo", StringType(), False),
        StructField("sigla",StringType(),False)
    ]
)

In [196]:
paisesDF = sqlContext.createDataFrame(paisesRDD,paisesSchema)

In [197]:
paisesDF.show(2)

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
+---+--------------------+-----+
only showing top 2 rows



In [198]:
deportesSchema = StructType(
    [
        StructField("deporte_id",IntegerType(),False),
        StructField("deporte",StringType(),False)
    ]
)

In [199]:
deportesOlimpicosDF = (
    sqlContext.read.schema(deportesSchema)
    .option("header", "True")
    .csv("files/deporte.csv")
)

In [200]:
deportesOlimpicosDF.take(5)

[Row(deporte_id=1, deporte='Basketball'),
 Row(deporte_id=2, deporte='Judo'),
 Row(deporte_id=3, deporte='Football'),
 Row(deporte_id=4, deporte='Tug-Of-War'),
 Row(deporte_id=5, deporte='Speed Skating')]

In [201]:
eventoSchema = StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])

In [202]:
eventoDF = sqlContext.read.schema(eventoSchema).option("header","True").csv("files/evento.csv")
eventoDF.show(5)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
+---------+--------------------+----------+
only showing top 5 rows



In [203]:
eventoDF.take(5)

[Row(evento_id=1, evento="Basketball Men's Basketball", deporte_id=1),
 Row(evento_id=2, evento="Judo Men's Extra-Lightweight", deporte_id=2),
 Row(evento_id=3, evento="Football Men's Football", deporte_id=3),
 Row(evento_id=4, evento="Tug-Of-War Men's Tug-Of-War", deporte_id=4),
 Row(evento_id=5, evento="Speed Skating Women's 500 metres", deporte_id=5)]

In [204]:
resultadosSchema = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("meadalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])

In [205]:
resultadosDF = (
    sqlContext.read.schema(resultadosSchema)
    .option("header", "True")
    .csv("files/resultados.csv")
)

In [206]:
resultadosDF.take(2)

[Row(resultado_id=1, meadalla='NA', deportista_id=1, juego_id=39, evento_id=1),
 Row(resultado_id=2, meadalla='NA', deportista_id=2, juego_id=49, evento_id=2)]

## Ver la estrucutra de un dataframe

In [207]:
deportesOlimpicosDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [208]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



## Renombrar columnas

In [209]:
deportistaDF = deportistaDF.withColumnRenamed("genero", "sexo").drop("altura")

In [210]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [211]:
from pyspark.sql.functions import *

In [212]:
deportistaDF = deportistaDF.select(
    "deportista_id", "nombre", col("edad").alias("edadAlJugar"), "equipo_id"
)

In [213]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



## Filtrar datos excluyendo los valores en 0

In [214]:
deportistaDF = deportistaDF.filter(deportistaDF.edadAlJugar != 0)

In [215]:
deportistaDF.sort("edadAlJugar").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        22411|Magdalena Cecilia...|         11|      413|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        40129|    Luigina Giavotti|         11|      507|
|        37333|Carlos Bienvenido...|         11|      982|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



## Agrupaciones

In [216]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [217]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- meadalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [218]:
juegoSchema = StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("temporada",StringType(),False),
    StructField("anio",IntegerType(),False),
    StructField("estacion",StringType(),False),
    StructField("ciudad",StringType(),False)
])

In [219]:
juegosDF = (
    sqlContext.read.schema(juegoSchema)
    .option("header", "True")
    .csv("files/juegos.csv")
)

In [220]:
juegosDF.show(1)

+--------+-----------+----+--------+------+
|juego_id|  temporada|anio|estacion|ciudad|
+--------+-----------+----+--------+------+
|       1|1896 Verano|1896|  Verano|Athina|
+--------+-----------+----+--------+------+
only showing top 1 row



In [221]:
juegosDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- temporada: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- estacion: string (nullable = true)
 |-- ciudad: string (nullable = true)



## Realizar Join a DF

In [223]:
deportistaDF.join(
    resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left").show()

+-------------+--------------------+-----------+---------+------------+--------+-------------+--------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|resultado_id|meadalla|deportista_id|juego_id|evento_id|
+-------------+--------------------+-----------+---------+------------+--------+-------------+--------+---------+
|            1|           A Dijiang|         24|      199|           1|      NA|            1|      39|        1|
|            2|            A Lamusi|         23|      199|           2|      NA|            2|      49|        2|
|            3| Gunnar Nielsen Aaby|         24|      273|           3|      NA|            3|       7|        3|
|            4|Edgar Lindenau Aabye|         34|      278|           4|    Gold|            4|       2|        4|
|            5|Christine Jacoba ...|         21|      705|          10|      NA|            5|      40|        6|
|            5|Christine Jacoba ...|         21|      705|           9|      NA|        

In [226]:
agrupacionDF = deportistaDF.join(
    resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left"
)

In [228]:
agrupacionDF = agrupacionDF.join(juegosDF, resultadosDF.juego_id == juegosDF.juego_id, "left")

In [229]:
agrupacionDF.show(5)

+-------------+--------------------+-----------+---------+------------+--------+-------------+--------+---------+--------+-------------+----+--------+-----------+
|deportista_id|              nombre|edadAlJugar|equipo_id|resultado_id|meadalla|deportista_id|juego_id|evento_id|juego_id|    temporada|anio|estacion|     ciudad|
+-------------+--------------------+-----------+---------+------------+--------+-------------+--------+---------+--------+-------------+----+--------+-----------+
|            1|           A Dijiang|         24|      199|           1|      NA|            1|      39|        1|      39|  1992 Verano|1992|  Verano|  Barcelona|
|            2|            A Lamusi|         23|      199|           2|      NA|            2|      49|        2|      49|  2012 Verano|2012|  Verano|     London|
|            3| Gunnar Nielsen Aaby|         24|      273|           3|      NA|            3|       7|        3|       7|  1920 Verano|1920|  Verano|  Antwerpen|
|            4|Edgar L

In [235]:
agrupacionDF.filter(agrupacionDF.meadalla != "NA").sort("edadAlJugar").groupBy(
    "nombre", "temporada", "ciudad"
).count().show()

+--------------------+-------------+---------+-----+
|              nombre|    temporada|   ciudad|count|
+--------------------+-------------+---------+-----+
| Osmay Acosta Duarte|  2008 Verano|  Beijing|    2|
|          Riaz Ahmed|  1972 Verano|   Munich|    2|
|  Salvatore Amitrano|  2004 Verano|   Athina|    2|
|    Borislav Ananiev|  1980 Verano|   Moskva|    2|
|Yaroslav Viktorov...|  1988 Verano|    Seoul|    2|
|Yelena Petrovna A...|  1976 Verano| Montreal|    2|
| Susan Margaret Auch|1998 Invierno|   Nagano|    2|
|Yelena Yuryevna A...|  2000 Verano|   Sydney|    2|
|Roman Vladimirovi...|  2004 Verano|   Athina|    2|
|Colin Kenneth Bea...|  1996 Verano|  Atlanta|    2|
|Serhiy Vasylovych...|  1992 Verano|Barcelona|    2|
|       Stefan Blcher|  1988 Verano|    Seoul|    2|
|       Yvonne Bnisch|  2004 Verano|   Athina|    2|
|     Katrin Borchert|  2000 Verano|   Sydney|    2|
|Jennifer Lori Bot...|1998 Invierno|   Nagano|    2|
|Giovanni Battista...|  1964 Verano|    Tokyo|

## Analizando la estructura anterior

In [236]:
medallas = agrupacionDF.filter(agrupacionDF.meadalla != "NA").sort("edadAlJugar").groupBy(
    "nombre", "temporada", "ciudad"
).count()

In [237]:
medallas.printSchema()

root
 |-- nombre: string (nullable = false)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- count: long (nullable = false)



In [241]:
medallas.groupBy("temporada","ciudad").agg(
    sum("count").alias("Total de medallas"), avg("count").alias("medallas promedio")
).show(5)

+-------------+-----------+-----------------+------------------+
|    temporada|     ciudad|Total de medallas| medallas promedio|
+-------------+-----------+-----------------+------------------+
|1988 Invierno|    Calgary|              214|2.3516483516483517|
|1994 Invierno|Lillehammer|              366|2.4078947368421053|
|2006 Invierno|     Torino|              572|2.4761904761904763|
|1932 Invierno|Lake Placid|               90|2.0930232558139537|
|1980 Invierno|Lake Placid|              192|2.2857142857142856|
+-------------+-----------+-----------------+------------------+
only showing top 5 rows



## SQL

### Tabla Temporal

In [242]:
resultadosDF.registerTempTable("resultados")



In [243]:
deportistaDF.registerTempTable("deportistas")
paisesDF.registerTempTable("paises")



In [245]:
sqlContext.sql("SELECT * FROM deportistas").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
|            6|     Per Knut Aaland|         31|     1096|
|            7|        John Aalberg|         31|     1096|
|            8|Cornelia Cor Aalt...|         18|      705|
|            9|    Antti Sami Aalto|         26|      350|
|           10|Einar Ferdinand E...|         26|      350|
|           11|  Jorma Ilmari Aalto|         22|      350|
|           12|   Jyri Tapani Aalto|         31|      350|
|           13|  Minna Maarit Aalto|         30|      350|
|           14|Pirjo Hannele Aal...|         32|      35

In [251]:
sqlContext.sql("SELECT * FROM resultados").show()

+------------+--------+-------------+--------+---------+
|resultado_id|meadalla|deportista_id|juego_id|evento_id|
+------------+--------+-------------+--------+---------+
|           1|      NA|            1|      39|        1|
|           2|      NA|            2|      49|        2|
|           3|      NA|            3|       7|        3|
|           4|    Gold|            4|       2|        4|
|           5|      NA|            5|      36|        5|
|           6|      NA|            5|      36|        6|
|           7|      NA|            5|      38|        5|
|           8|      NA|            5|      38|        6|
|           9|      NA|            5|      40|        5|
|          10|      NA|            5|      40|        6|
|          11|      NA|            6|      38|        7|
|          12|      NA|            6|      38|        8|
|          13|      NA|            6|      38|        9|
|          14|      NA|            6|      38|       10|
|          15|      NA|        

In [257]:
sqlContext.sql("SELECT * FROM paises").show()

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
|  4|             Acturus|  ARG|
|  5|         Afghanistan|  AFG|
|  6|            Akatonbo|  IRL|
|  7|            Alain IV|  SUI|
|  8|             Albania|  ALB|
|  9|              Alcaid|  POR|
| 10|            Alcyon-6|  FRA|
| 11|            Alcyon-7|  FRA|
| 12|           Aldebaran|  ITA|
| 13|        Aldebaran II|  ITA|
| 14|              Aletta|  IRL|
| 15|             Algeria|  ALG|
| 16|         Ali-Baba II|  SWE|
| 17|         Ali-Baba IV|  SUI|
| 18|         Ali-Baba IX|  SUI|
| 19|         Ali-Baba VI|  SUI|
| 20|             Allegro|  FRA|
+---+--------------------+-----+
only showing top 20 rows



In [258]:
sqlContext.sql(
    """SELECT meadalla,equipo,sigla FROM resultados r JOIN deportista d ON r.deportista_id = d.deportista_id JOIN paises p ON p.id = d.equipo_id WHERE meadalla <> 'NA' ORDER BY sigla DESC"""
).show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `deportista` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 52;
'Sort ['sigla DESC NULLS LAST], true
+- 'Project ['meadalla, 'equipo, 'sigla]
   +- 'Filter NOT ('meadalla = NA)
      +- 'Join Inner, ('p.id = 'd.equipo_id)
         :- 'Join Inner, ('r.deportista_id = 'd.deportista_id)
         :  :- SubqueryAlias r
         :  :  +- SubqueryAlias resultados
         :  :     +- View (`resultados`, [resultado_id#1041,meadalla#1042,deportista_id#1043,juego_id#1044,evento_id#1045])
         :  :        +- Relation [resultado_id#1041,meadalla#1042,deportista_id#1043,juego_id#1044,evento_id#1045] csv
         :  +- 'SubqueryAlias d
         :     +- 'UnresolvedRelation [deportista], [], false
         +- SubqueryAlias p
            +- SubqueryAlias paises
               +- View (`paises`, [id#990,equipo#991,sigla#992])
                  +- LogicalRDD [id#990, equipo#991, sigla#992], false


In [259]:
spark