<a href="https://colab.research.google.com/github/Saulrega/Spark/blob/main/Complete.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q https://dlcdn.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.1.3-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

## Sample

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

In [None]:
spark.stop()

## Transformations

In [None]:
from pyspark import SparkContext

In [None]:
# Create a initialator
sc = SparkContext(master='local', appName="transformacionesYAcciones")

In [None]:
# run drive in the colab
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
# Read dataset
%cd '/content/drive/My Drive/Colab Notebooks/Spark/files'

/content/drive/My Drive/Colab Notebooks/Spark/files


In [None]:
# Create a read distribuited data 
rdd1 = sc.parallelize([1,2,3])
type(rdd1)

pyspark.rdd.RDD

In [None]:
!ls

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [None]:
# load rdd
path = '/content/drive/My Drive/Colab Notebooks/Spark/files/'
equiposOlimpicosRDD = sc.textFile(path+'paises.csv').map(lambda line: line.split(','))

In [None]:
# Visaulize rdd
rdd1.collect()

[1, 2, 3]

In [None]:
# Show context of Spark
sc

In [None]:
# Read first 5 rows
equiposOlimpicosRDD.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

In [None]:
# Stop the memory
# sc.stop()

## Actions

In [None]:
# Read first 15 rows
equiposOlimpicosRDD.take(15)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL']]

In [None]:
# count number of rows
equiposOlimpicosRDD.map(lambda x: x[2]).distinct().count()

231

In [None]:
# group by teams for country
equiposOlimpicosRDD.map(lambda x: (x[2],x[1])).groupByKey().mapValues(len).take(5)

[('sigla', 1), ('AUT', 11), ('MEX', 9), ('ARG', 18), ('AFG', 1)]

In [None]:
# diference with 'list' in stead of 'len'
equiposOlimpicosRDD.map(lambda x: (x[2],x[1])).groupByKey().mapValues(list).take(5)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan'])]

In [None]:
# createa filter to Argentina values
equiposArgentinos = equiposOlimpicosRDD.filter(lambda x: 'ARG' in x)
equiposArgentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

In [None]:
# count Aprox is a count of data, rows or columns in a dataframe wit a time limit counting in miliseconds
equiposOlimpicosRDD.countApprox(10)

1185

In [None]:
equiposOlimpicosRDD.count()

1185

In [None]:
# read and join
deportistaOlimpicoRDD = sc.textFile(path+'deportista.csv').map(lambda line: line.split(','))
deportistaOlimpicoRDD2 = sc.textFile(path+'deportista2.csv').map(lambda line: line.split(','))
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [None]:
# count athletes
deportistaOlimpicoRDD.count()

135572

In [None]:
# show top rows
equiposOlimpicosRDD.top(5)

[['id', 'equipo', 'sigla'],
 ['999', 'Stella-2', 'NOR'],
 ['998', 'State VI', 'CAN'],
 ['997', 'Starlight III', 'GBR'],
 ['996', 'Starita', 'NED']]

In [None]:
deportistaOlimpicoRDD.top(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967'],
 ['99998', 'Robert John Bob Renney', '1', '21', '178', '90', '66'],
 ['99997', 'Thomas Renner', '1', '24', '183', '86', '71'],
 ['99996', 'Sara Renner', '2', '21', '168', '63', '174']]

In [None]:
# read 'resultados.csv'
resultado = sc.textFile(path+'resultados.csv').map(lambda x: x.split(','))
resultado.top(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['99999', 'NA', '50604', '51', '568'],
 ['99998', 'NA', '50603', '47', '36'],
 ['99997', 'NA', '50602', '49', '262'],
 ['99996', 'NA', '50601', '47', '614']]

In [None]:
# drop NA values that are people without medals
resultadoGanador = resultado.filter(lambda l: 'NA' not in l[1])
resultadoGanador.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19'],
 ['39', 'Bronze', '15', '7', '20'],
 ['41', 'Bronze', '16', '50', '14']]

In [None]:
# join 3 RDDs "equipos", "deportistas" and "resultados"

equiposYDeportistas = deportistaOlimpicoRDD.map(lambda l: [l[-1],l[:-1]]).join(equiposOlimpicosRDD.map(lambda x: [x[0], x[2]])).join(resultadoGanador.map(lambda y: [y[2], y[1]]))
equiposYDeportistas.takeSample(False,6,25)

[('507',
  ((['18909', 'Enrico Castelli', '1', '27', '0', '0'], 'ITA'), 'Silver')),
 ('705',
  ((['99600', 'John Philip Ernest Marie Flip Regout', '1', '21', '0', '0'],
    'NED'),
   'Silver')),
 ('399',
  ((['15937', 'Magdalena Brzeska Peschel Sabolocka ', '2', '18', '173', '48'],
    'GER'),
   'Silver')),
 ('705',
  ((['13408',
     'Anna Johanna Geertruida Maria Annie Borckink',
     '2',
     '24',
     '0',
     '0'],
    'NED'),
   'Bronze')),
 ('705',
  ((['48310', 'Johannes Hendricus Heuckelbach', '1', '27', '0', '0'], 'NED'),
   'Gold')),
 ('619',
  ((['59062', 'Noraseela Mohd Khalid', '2', '32', '166', '55'], 'MAS'),
   'Gold'))]

In [None]:
deportistaPaises = deportistaOlimpicoRDD.map(lambda x: [x[-1], x[:-1]]).join(equiposOlimpicosRDD.map(lambda x: [x[0], x[2]]))

In [None]:
deportistaPaises.join(resultadoGanador).take(5)

[('74',
  ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')),
 ('74', ((['129', 'Ruslan Abbasov', '1', '22', '181', '74'], 'AZE'), 'Gold')),
 ('74', ((['130', 'Tural Abbasov', '1', '18', '182', '76'], 'AZE'), 'Gold')),
 ('74', ((['131', 'Tran Abbasova', '2', '33', '159', '53'], 'AZE'), 'Gold')),
 ('74',
  ((['335', 'Abdulqdir Abdullayev', '1', '28', '188', '91'], 'AZE'), 'Gold'))]

## Number Operations

In [None]:
# save medal values according to international commission of olimpic games
valoresMedallas = {'Gold': 7,
                   'Silver': 5,
                   'Bronze': 4}

In [None]:
paisesMedallas = deportistaPaises.join(resultadoGanador)

In [None]:
# asing medal values  to RDD
paisesMedallas = paisesMedallas.map(lambda x: (x[1][0][-1], valoresMedallas[x[1][1]]))

In [None]:
# import operator library
from operator import add

In [None]:
# sum country by medal
conclusion = paisesMedallas.reduceByKey((add)).sortBy(lambda x: x[1], ascending=False)

In [None]:
conclusion.take(10)

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218)]

## Dataframes 

In [None]:
# load libraries for dataframes
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, Row

In [None]:
# Create a initialator
sqlContext = SQLContext(sc)

In [None]:
juegoSchema = StructType([
                          StructField("juego_id", IntegerType(), False),
                          StructField("annio", StringType(), False),
                          StructField("temporada", StringType(), False),
                          StructField("ciudad", StringType(), False),
])


juegoDF = sqlContext.read.schema(juegoSchema).option("header","true").csv(path+'juegos.csv')

In [None]:
juegoDF.show(5)

+--------+-----------+---------+------+
|juego_id|      annio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
|       5|1908 Verano|     1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows



### Inferencia types

In [None]:
def eliminarEncabezado(indice, iterador):
  return iter(list(iterador)[1:])

In [None]:
# remove heads of RDDs
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminarEncabezado)

In [None]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [None]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda x: (
    int(x[0]),
    x[1],
    int(x[2]),
    int(x[3]),
    int(x[4]),
    float(x[5]),
    int(x[6]),

))

In [None]:
# create Schemas
deportista_schema = StructType([
          StructField("deportista_id",IntegerType(),False),
          StructField("nombre",StringType(),False),
          StructField("genero",IntegerType(),False),
          StructField("edad",IntegerType(),False),
          StructField("altura",IntegerType(),False),
          StructField("peso",FloatType(),False),
          StructField("equipo_id",IntegerType(),False)
])


deporte_schema = StructType([
          StructField("deporte_id",IntegerType(),False),
          StructField("nombre", StringType(),False)
])

juego_schema = StructType([
          StructField("juego_id",IntegerType(),False),
          StructField("annio",StringType(),False),
          StructField("temporada",StringType(),False),
          StructField("ciudad",StringType(),False)
])

evento_schema = StructType([
          StructField("evento_id",IntegerType(),False),
          StructField("evento",StringType(),False),
          StructField("deporte_id",IntegerType(),False)
])

paises_schema = StructType([
          StructField("id",IntegerType(),False),
          StructField("equipo",StringType(),False),
          StructField("sigla",StringType(),False)
])

resultados_schema = StructType([
          StructField("resultado_id",IntegerType(),False),
          StructField("medalla",StringType(),False),
          StructField("deportista_id",IntegerType(),False),
          StructField("juego_id",IntegerType(),False),
          StructField("evento_id",IntegerType(),False)
])

In [None]:
# creation of dataframes
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,deportista_schema)
deporteDF = sqlContext.read.schema(deporte_schema).option("header","true").csv(path+'deporte.csv')
# juegoDF = sqlContext.read.schema(juego_schema).option("header","true").csv(path+'juego.csv')
eventoDF = sqlContext.read.schema(evento_schema).option("header","true").csv(path+'evento.csv')
paisesDF = sqlContext.read.schema(paises_schema).option("header","true").csv(path+'paises.csv')
resultadosDF = sqlContext.read.schema(resultados_schema).option("header","true").csv(path+'resultados.csv')

## Operations on DataFrames

In [None]:
# conocer el schema
deporteDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- nombre: string (nullable = true)



In [None]:
# rename column
deportistaDF = deportistaDF.withColumnRenamed("genero", "sexo")

In [None]:
# delete column
deportistaDF = deportistaDF.drop("altura")

In [None]:
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import avg as _avg
from pyspark.sql.functions import *
deportistaDF=deportistaDF.select("deportista_id", "nombre", col("edad").alias("edadAlJugar"), "equipo_id")

In [None]:
deportistaDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [None]:
deportistaDF = deportistaDF.filter((deportistaDF.edadAlJugar != 0))

In [None]:
deportistaDF.sort("edadAlJugar").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        52070|        Etsuko Inada|         11|      514|
|        40129|    Luigina Giavotti|         11|      507|
|        37333|Carlos Bienvenido...|         11|      982|
|        47618|Sonja Henie Toppi...|         11|      742|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



## Joins

In [None]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [None]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [None]:
deporteDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- nombre: string (nullable = true)



In [None]:
deportistaDF.join(
    resultadosDF,
    deportistaDF.deportista_id == resultadosDF.deportista_id,
    'left'
    ).join(
    juegoDF,
    juegoDF.juego_id == resultadosDF.juego_id,
    "left"
).join(
    deporteDF,
    deporteDF.deporte_id == resultadosDF.evento_id,
    "left"        
).select(
    deportistaDF.nombre, col("edadAlJugar").alias("Edad al jugar"),
    "medalla", col("annio").alias("Año de juego"),
    deporteDF.nombre.alias("Nombre de disciplina")
)

DataFrame[nombre: string, Edad al jugar: int, medalla: string, Año de juego: string, Nombre de disciplina: string]

In [None]:
paisesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- equipo: string (nullable = true)
 |-- sigla: string (nullable = true)



In [None]:
resultadosDF.join(deportistaDF,
                 deportistaDF.deportista_id == resultadosDF.deportista_id,
                 "left").join(
                 paisesDF, paisesDF.id == deportistaDF.equipo_id
                 ).select("medalla", "equipo","sigla").where(resultadosDF.medalla != "NA").show(20)

+-------+--------------+-----+
|medalla|        equipo|sigla|
+-------+--------------+-----+
|   Gold|Denmark/Sweden|  SWE|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
| Bronze|       Finland|  FIN|
|   Gold|       Finland|  FIN|
|   Gold|       Finland|  FIN|
|   Gold|       Finland|  FIN|
| Bronze|       Finland|  FIN|
|   Gold|        Norway|  NOR|
|   Gold|        Norway|  NOR|
|   Gold|        Norway|  NOR|
| Silver|        Norway|  NOR|
| Bronze|        Norway|  NOR|
| Silver|        Norway|  NOR|
| Bronze|        Norway|  NOR|
|   Gold|        Norway|  NOR|
|   Gold|        Norway|  NOR|
| Silver|        Norway|  NOR|
| Bronze|   Netherlands|  NED|
+-------+--------------+-----+
only showing top 20 rows



In [None]:
resultadosDF.filter(resultadosDF.medalla != "NA").join(
    deportistaDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left"
).join(
    paisesDF, paisesDF.id == deportistaDF.equipo_id, "left"
).select("medalla", "equipo", "sigla").sort(col("sigla").desc())

DataFrame[medalla: string, equipo: string, sigla: string]

## Agrupaciones

In [None]:
medallistaXAnnio = deportistaDF.join(
        resultadosDF, 
        deportistaDF.deportista_id == resultadosDF.deportista_id, 
        "left"
    ).join(
        juegoDF, 
        juegoDF.juego_id == resultadosDF.juego_id, 
        "left"
    ).join(
        paisesDF, 
        deportistaDF.equipo_id == paisesDF.id, 
        "left"
    ).join(
        eventoDF, 
        eventoDF.evento_id == resultadosDF.evento_id, 
        "left"
    ).join(
        deporteDF, 
        eventoDF.deporte_id == deporteDF.deporte_id, 
        "left"
    ).select(
        "sigla",
        "annio",
        "medalla",
        eventoDF.evento.alias("Nombre subdisciplina"),
        deporteDF.nombre.alias("Nombre disciplina"),
        deportistaDF.nombre    
    )

In [None]:
medallistaXAnnio.show(5)

+-----+-------------+-------+--------------------+-----------------+--------------------+
|sigla|        annio|medalla|Nombre subdisciplina|Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+-----------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|       Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|             Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|         Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|       Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
+-----+-------------+-------+--------------------+-----------------+--------------------+
only showing top 5 rows



In [None]:
medallistaXAnnio2 = medallistaXAnnio.filter(medallistaXAnnio.medalla != "NA").sort(
    "annio"
).groupBy("sigla", "annio", "Nombre subdisciplina").count()

In [None]:
medallistaXAnnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- annio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [None]:
medallistaXAnnio2.groupBy("sigla", "annio").agg(_sum("count").alias("Total de medallas"),
                                                _avg("count").alias("Medallas promedio")).show()

+-----+-------------+-----------------+------------------+
|sigla|        annio|Total de medallas| Medallas promedio|
+-----+-------------+-----------------+------------------+
|  NED|1992 Invierno|                4|1.3333333333333333|
|  BEL|  2000 Verano|                7|               1.4|
|  MAS|  2012 Verano|                2|               1.0|
|  MGL|  2008 Verano|                5|              1.25|
|  SWE|  1976 Verano|               10|               2.0|
|  SUI|2014 Invierno|               29|3.2222222222222223|
|  ETH|  2004 Verano|                7|              1.75|
|  AUT|  1928 Verano|                5|              1.25|
|  SYR|  1984 Verano|                1|               1.0|
|  ITA|  1996 Verano|               69| 2.225806451612903|
|  THA|  2008 Verano|                4|               1.0|
|  URS|1984 Invierno|               56|               2.8|
|  DEN|  1896 Verano|                6|               1.0|
|  GRN|  2016 Verano|                1|               1.

## SQL

In [None]:
resultadosDF.createOrReplaceTempView("resultado")
deportistaDF.createOrReplaceTempView("deportista")
paisesDF.createOrReplaceTempView("paises")

In [None]:
sqlContext.sql("""SELECT medalla, equipo, sigla
                    FROM resultado r
                    JOIN deportista d
                      ON r.deportista_id = d.deportista_id
                    JOIN paises p
                     ON p.id = d.equipo_id
                  WHERE medalla <> "NA"
                  ORDER BY sigla DESC
""").show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



In [None]:
sqlContext.sql("""
                SELECT DISTINCT medalla, equipo, sigla
                FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.id = d.equipo_id
                WHERE medalla <> 'NA'
                ORDER BY sigla DESC
                """).show()

+-------+--------------------+-----+
|medalla|              equipo|sigla|
+-------+--------------------+-----+
| Silver|            Zimbabwe|  ZIM|
| Bronze|            Zimbabwe|  ZIM|
|   Gold|            Zimbabwe|  ZIM|
| Silver|              Zambia|  ZAM|
|   Gold|          Yugoslavia|  YUG|
| Silver|          Yugoslavia|  YUG|
| Bronze|          Yugoslavia|  YUG|
| Bronze|West Indies Feder...|  WIF|
| Silver|             Vietnam|  VIE|
|   Gold|             Vietnam|  VIE|
|   Gold|           Venezuela|  VEN|
| Bronze|           Venezuela|  VEN|
| Silver|           Venezuela|  VEN|
| Silver|          Uzbekistan|  UZB|
|   Gold|          Uzbekistan|  UZB|
| Bronze|          Uzbekistan|  UZB|
|   Gold|New York Athletic...|  USA|
| Bronze|New York Athletic...|  USA|
|   Gold|            Minotaur|  USA|
|   Gold|            Kathleen|  USA|
+-------+--------------------+-----+
only showing top 20 rows



In [None]:
sqlContext.sql("""
    SELECT 
        medalla,
        equipo,
        sigla,
        COUNT(medalla) AS total_medalla
    FROM resultado r
    JOIN deportista d
    ON r.deportista_id = d.deportista_id
    JOIN paises p
    ON p.id = d.equipo_id
    WHERE medalla <> 'NA'
    GROUP BY medalla, equipo, sigla
    ORDER BY sigla DESC
""").show(10)


+-------+--------------------+-----+-------------+
|medalla|              equipo|sigla|total_medalla|
+-------+--------------------+-----+-------------+
| Bronze|            Zimbabwe|  ZIM|            1|
| Silver|            Zimbabwe|  ZIM|            4|
|   Gold|            Zimbabwe|  ZIM|           17|
| Silver|              Zambia|  ZAM|            1|
| Bronze|          Yugoslavia|  YUG|          100|
| Silver|          Yugoslavia|  YUG|          180|
|   Gold|          Yugoslavia|  YUG|          136|
| Bronze|West Indies Feder...|  WIF|            1|
| Silver|             Vietnam|  VIE|            3|
|   Gold|             Vietnam|  VIE|            1|
+-------+--------------------+-----+-------------+
only showing top 10 rows



In [None]:
sqlContext.sql("""
    SELECT 
        'Paises con Melladas'AS estatus,
        COUNT(DISTINCT sigla) AS total_medalla
    FROM resultado r
    JOIN deportista d
    ON r.deportista_id = d.deportista_id
    JOIN paises p
    ON p.equipo_id = d.equipo_id
    WHERE medalla <> 'NA'
    
    UNION ALL
    
    SELECT 
        'Paises Participantes'AS estatus,
        COUNT(DISTINCT sigla) AS total_medalla
    FROM resultado r
    JOIN deportista d
    ON r.deportista_id = d.deportista_id
    JOIN paises p
    ON p.id = d.equipo_id
""").show(10)

## UDF

In [None]:
deportistaError = sc.textFile(path+"deportistaError.csv").map(
    lambda x: x.split(",")
)

In [None]:
deportistaError=deportistaError.mapPartitionsWithIndex(eliminarEncabezado)

In [None]:
deportistaError = deportistaError.map(lambda x:(
    x[0],
    x[1],
    x[2],
    x[3],
    x[4],
    x[5],
    x[6]
))

In [None]:
deportistaError_schema = StructType([
          StructField("deportista_id",StringType(),False),
          StructField("nombre",StringType(),False),
          StructField("genero",StringType(),False),
          StructField("edad",StringType(),False),
          StructField("altura",StringType(),False),
          StructField("peso",StringType(),False),
          StructField("equipo_id",StringType(),False)
])

In [None]:
deportistaErrorDF = sqlContext.createDataFrame(deportistaError, deportistaError_schema)

In [None]:
deportistaErrorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [None]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
  return int(valor) if len(valor) > 0 else None


conversionEnteros_udf = udf(lambda z : conversionEnteros(z), IntegerType())
sqlContext.udf.register("conversionEnteros_udf", conversionEnteros_udf)

<function __main__.<lambda>>

In [None]:
deportistaErrorDF.select(conversionEnteros_udf("altura").alias("alturaUDF")).show()

+---------+
|alturaUDF|
+---------+
|      180|
|      170|
|     null|
|     null|
|      185|
|      188|
|      183|
|      168|
|      186|
|     null|
|      182|
|      172|
|      159|
|      171|
|     null|
|      184|
|      175|
|      189|
|     null|
|      176|
+---------+
only showing top 20 rows



## Perisistencia y replicacion

In [None]:
from pyspark.storagelevel import StorageLevel

In [None]:
medallistaXAnnio.is_cached

False

In [None]:
medallistaXAnnio.rdd.cache()

MapPartitionsRDD[311] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
medallistaXAnnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [None]:
medallistaXAnnio.rdd.unpersist()

MapPartitionsRDD[311] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
medallistaXAnnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[311] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
medallistaXAnnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

In [None]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [None]:
medallistaXAnnio.rdd.unpersist()

MapPartitionsRDD[311] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
medallistaXAnnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)


MapPartitionsRDD[311] at javaToPython at NativeMethodAccessorImpl.java:0

## Particionado

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Particionado").master("local[5]").getOrCreate()

In [None]:
df = spark.range(0, 20)
df.rdd.getNumPartitions()

1

In [None]:
rdd2 = spark.sparkContext.parallelize((0,20), 10)
rdd2.getNumPartitions

<bound method RDD.getNumPartitions of ParallelCollectionRDD[318] at readRDDFromFile at PythonRDD.scala:274>

In [None]:
rddDesdeArchivo = spark.sparkContext.textFile(path+"deporte.csv", 10)

In [None]:
rddDesdeArchivo.getNumPartitions()

10

In [None]:
rddDesdeArchivo.saveAsTextFile("/content/drive/My Drive/Colab Notebooks/Spark/output/")

In [None]:
!ls /content/drive/My Drive/Colab Notebooks/Spark/output/

ls: cannot access '/content/drive/My': No such file or directory
ls: cannot access 'Drive/Colab': No such file or directory
ls: cannot access 'Notebooks/Spark/output/': No such file or directory


In [None]:
rdd3 = spark.sparkContext.wholeTextFiles("/content/drive/My Drive/Colab Notebooks/Spark/output/*")

In [None]:
rdd3.take(3)

[('file:/content/drive/My Drive/Colab Notebooks/Spark/output/part-00000',
  'deporte_id,deporte\n1,Basketball\n2,Judo\n3,Football\n4,Tug-Of-War\n5,Speed Skating\n6,Cross Country Skiing\n'),
 ('file:/content/drive/My Drive/Colab Notebooks/Spark/output/part-00001',
  '7,Athletics\n8,Ice Hockey\n9,Swimming\n10,Badminton\n11,Sailing\n12,Biathlon\n13,Gymnastics\n14,Art Competitions\n'),
 ('file:/content/drive/My Drive/Colab Notebooks/Spark/output/part-00002',
  '15,Alpine Skiing\n16,Handball\n17,Weightlifting\n18,Wrestling\n19,Luge\n20,Water Polo\n')]

In [None]:
lista = rdd3.mapValues(lambda x:x.split()).collect()

In [None]:
lista = [l[0] for l in lista]
lista.sort()

In [None]:
rdd4 = spark.sparkContext.textFile(",".join(lista), 10).map(lambda x:x.split(","))

In [None]:
rdd4.take(4)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football']]