In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.types import Row, StringType, FloatType

from pyspark.sql import SQLContext

In [3]:
# Iniciar Sesion de Spark

spark = SparkContext(master='local', appName='DataFrames')
sqlContext = SQLContext(spark)

In [4]:
!ls files/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [5]:
!head -n 5 files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [6]:
# Crear el schema en el cual se alojaran los datos

games_schema = StructType([
    StructField('game_id', IntegerType(),False),
    StructField('year', StringType(), False),
    StructField('season', StringType(), False),
    StructField('city',StringType(),False),
])

In [7]:
# Generar DataFrame apartir del archivo y el schema creado

games_DF = sqlContext.read.schema(games_schema)\
    .option("header", "true").csv("files/juegos.csv")

In [8]:
games_DF.show(4)

+-------+-----------+------+------+
|game_id|       year|season|  city|
+-------+-----------+------+------+
|      1|1896 Verano|  1896|Verano|
|      2|1900 Verano|  1900|Verano|
|      3|1904 Verano|  1904|Verano|
|      4|1906 Verano|  1906|Verano|
+-------+-----------+------+------+
only showing top 4 rows



In [9]:
spark

# Crear DF apartir de un RDD

In [10]:
olympic_athlete_RDD = spark.textFile("files/deportista.csv") \
    .map(lambda l: l.split(','))
olympic_athlete2_RDD = spark.textFile("files/deportista2.csv") \
    .map(lambda l : l.split(','))

In [11]:
olympic_athlete_RDD = olympic_athlete_RDD \
    .union(olympic_athlete2_RDD)

In [12]:
olympic_athlete_RDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [13]:
def remove_header(index, iterator):
    return iter(list(iterator)[1:])

In [14]:
olympic_athlete_RDD = olympic_athlete_RDD.mapPartitionsWithIndex(remove_header)

In [15]:
olympic_athlete_RDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [16]:
olympic_athlete_RDD = olympic_athlete_RDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [17]:
schema = StructType([
    StructField("athlete_id", IntegerType(),False),
    StructField("name", StringType(), False),
    StructField("gender", IntegerType(), False),
    StructField("age", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("weight", FloatType(), False),
    StructField("team_id", IntegerType(), False)
])

athlete_DF = sqlContext.createDataFrame(olympic_athlete_RDD, schema)
athlete_DF.show(5)

+----------+--------------------+------+---+------+------+-------+
|athlete_id|                name|gender|age|height|weight|team_id|
+----------+--------------------+------+---+------+------+-------+
|         1|           A Dijiang|     1| 24|   180|  80.0|    199|
|         2|            A Lamusi|     1| 23|   170|  60.0|    199|
|         3| Gunnar Nielsen Aaby|     1| 24|     0|   0.0|    273|
|         4|Edgar Lindenau Aabye|     1| 34|     0|   0.0|    278|
|         5|Christine Jacoba ...|     2| 21|   185|  82.0|    705|
+----------+--------------------+------+---+------+------+-------+
only showing top 5 rows



# Creando DataFrames apartir de archivos CSV

In [18]:
schema_teams = StructType([
    StructField("team_id", IntegerType(),False),
    StructField("team_name", StringType(), False),
    StructField("country", StringType(), False)
])

teams_DF = sqlContext.read.schema(schema_teams)\
    .option("header", "true").csv("files/paises.csv")

teams_DF.show(5)

+-------+--------------------+-------+
|team_id|           team_name|country|
+-------+--------------------+-------+
|      1|         30. Februar|    AUT|
|      2|A North American ...|    MEX|
|      3|           Acipactli|    MEX|
|      4|             Acturus|    ARG|
|      5|         Afghanistan|    AFG|
+-------+--------------------+-------+
only showing top 5 rows



In [19]:
schema_medals = StructType([
    StructField("result_id", IntegerType(),False),
    StructField("medal", StringType(), False),
    StructField("athlete_id", IntegerType(), False),
    StructField("game_id", IntegerType(), False),
    StructField("event_id", IntegerType(), False),
])

medals_DF = sqlContext.read.schema(schema_medals)\
    .option("header", "true").csv("files/resultados.csv")

medals_DF.show(5)

+---------+-----+----------+-------+--------+
|result_id|medal|athlete_id|game_id|event_id|
+---------+-----+----------+-------+--------+
|        1|   NA|         1|     39|       1|
|        2|   NA|         2|     49|       2|
|        3|   NA|         3|      7|       3|
|        4| Gold|         4|      2|       4|
|        5|   NA|         5|     36|       5|
+---------+-----+----------+-------+--------+
only showing top 5 rows



In [20]:
schema_sports = StructType([
    StructField("sport_id", IntegerType(),False),
    StructField("sport", StringType(), False),
])

sports_DF = sqlContext.read.schema(schema_sports)\
    .option("header", "true").csv("files/deporte.csv")

sports_DF.show(5)

+--------+-------------+
|sport_id|        sport|
+--------+-------------+
|       1|   Basketball|
|       2|         Judo|
|       3|     Football|
|       4|   Tug-Of-War|
|       5|Speed Skating|
+--------+-------------+
only showing top 5 rows



In [21]:
schema_events = StructType([
    StructField("event_id", IntegerType(),False),
    StructField("event_name", StringType(), False),
    StructField("sport_id", IntegerType(), False)
])

events_DF = sqlContext.read.schema(schema_events)\
    .option("header", "true").csv("files/evento.csv")

events_DF.show(5)

+--------+--------------------+--------+
|event_id|          event_name|sport_id|
+--------+--------------------+--------+
|       1|Basketball Men's ...|       1|
|       2|Judo Men's Extra-...|       2|
|       3|Football Men's Fo...|       3|
|       4|Tug-Of-War Men's ...|       4|
|       5|Speed Skating Wom...|       5|
+--------+--------------------+--------+
only showing top 5 rows



In [22]:
schema_games = StructType([
    StructField("game_id", IntegerType(),False),
    StructField("year-season", StringType(), False),
    StructField("year", StringType(), False),
    StructField("season", StringType(), False),
    StructField("city", StringType(), False),
])

games_DF = sqlContext.read.schema(schema_games)\
    .option("header", "true").csv("files/juegos.csv")

games_DF.show(5)
games_DF.take(3)

+-------+-----------+----+------+---------+
|game_id|year-season|year|season|     city|
+-------+-----------+----+------+---------+
|      1|1896 Verano|1896|Verano|   Athina|
|      2|1900 Verano|1900|Verano|    Paris|
|      3|1904 Verano|1904|Verano|St. Louis|
|      4|1906 Verano|1906|Verano|   Athina|
|      5|1908 Verano|1908|Verano|   London|
+-------+-----------+----+------+---------+
only showing top 5 rows



[Row(game_id=1, year-season='1896 Verano', year='1896', season='Verano', city='Athina'),
 Row(game_id=2, year-season='1900 Verano', year='1900', season='Verano', city='Paris'),
 Row(game_id=3, year-season='1904 Verano', year='1904', season='Verano', city='St. Louis')]

# Imprimir el Schema nos permite visualizar de forma general la estructura del DataFrame

In [23]:
sports_DF.printSchema()

root
 |-- sport_id: integer (nullable = true)
 |-- sport: string (nullable = true)



In [24]:
athlete_DF.printSchema()

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- gender: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- height: integer (nullable = false)
 |-- weight: float (nullable = false)
 |-- team_id: integer (nullable = false)



In [25]:
athlete_DF = athlete_DF.withColumnRenamed('gender','sex').drop('height')

athlete_DF.printSchema()

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- sex: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- weight: float (nullable = false)
 |-- team_id: integer (nullable = false)



# Existen diferentes funciones de pyspark que nos permiten realizar operaciones con los DataFrames

In [26]:
from pyspark.sql.functions import *

In [27]:
athlete_DF = athlete_DF\
    .select("athlete_id", "name", col("age").alias("age_in_competition"), "team_id")

In [28]:
athlete_DF.show(5)

+----------+--------------------+------------------+-------+
|athlete_id|                name|age_in_competition|team_id|
+----------+--------------------+------------------+-------+
|         1|           A Dijiang|                24|    199|
|         2|            A Lamusi|                23|    199|
|         3| Gunnar Nielsen Aaby|                24|    273|
|         4|Edgar Lindenau Aabye|                34|    278|
|         5|Christine Jacoba ...|                21|    705|
+----------+--------------------+------------------+-------+
only showing top 5 rows



### Filtrar DF con las condiciones que pongamos

In [29]:
athlete_DF = athlete_DF.filter((athlete_DF.age_in_competition != 0))

In [30]:
athlete_DF.sort("age_in_competition").show(5)

+----------+--------------------+------------------+-------+
|athlete_id|                name|age_in_competition|team_id|
+----------+--------------------+------------------+-------+
|     71691|  Dimitrios Loundras|                10|    333|
|     52070|        Etsuko Inada|                11|    514|
|     40129|    Luigina Giavotti|                11|    507|
|     37333|Carlos Bienvenido...|                11|    982|
|     47618|Sonja Henie Toppi...|                11|    742|
+----------+--------------------+------------------+-------+
only showing top 5 rows



# Agrupaciones y operaciones Joins sobre DF

In [31]:
teams_DF.printSchema()

root
 |-- team_id: integer (nullable = true)
 |-- team_name: string (nullable = true)
 |-- country: string (nullable = true)



In [32]:
athlete_DF.printSchema()  

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- age_in_competition: integer (nullable = false)
 |-- team_id: integer (nullable = false)



In [33]:
medals_DF.printSchema()

root
 |-- result_id: integer (nullable = true)
 |-- medal: string (nullable = true)
 |-- athlete_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- event_id: integer (nullable = true)



In [34]:
games_DF.printSchema()

root
 |-- game_id: integer (nullable = true)
 |-- year-season: string (nullable = true)
 |-- year: string (nullable = true)
 |-- season: string (nullable = true)
 |-- city: string (nullable = true)



In [35]:
sports_DF.printSchema()

root
 |-- sport_id: integer (nullable = true)
 |-- sport: string (nullable = true)



In [36]:
events_DF.printSchema()

root
 |-- event_id: integer (nullable = true)
 |-- event_name: string (nullable = true)
 |-- sport_id: integer (nullable = true)



In [37]:
athlete_DF.join(medals_DF, athlete_DF.athlete_id == medals_DF.athlete_id,"left")\
    .join(games_DF, games_DF.game_id == medals_DF.game_id,"left")\
    .join(events_DF, events_DF.event_id == medals_DF.event_id,"left")\
    .select(athlete_DF.name, athlete_DF.age_in_competition, medals_DF.medal, 
            col('year').alias("competition_year"), events_DF.event_name).show()

+--------------------+------------------+-----+----------------+--------------------+
|                name|age_in_competition|medal|competition_year|          event_name|
+--------------------+------------------+-----+----------------+--------------------+
|           A Dijiang|                24|   NA|            1992|Basketball Men's ...|
|            A Lamusi|                23|   NA|            2012|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|                24|   NA|            1920|Football Men's Fo...|
|Edgar Lindenau Aabye|                34| Gold|            1900|Tug-Of-War Men's ...|
|Christine Jacoba ...|                21|   NA|            1994|Speed Skating Wom...|
|Christine Jacoba ...|                21|   NA|            1994|Speed Skating Wom...|
|Christine Jacoba ...|                21|   NA|            1992|Speed Skating Wom...|
|Christine Jacoba ...|                21|   NA|            1992|Speed Skating Wom...|
|Christine Jacoba ...|                21|   NA|       

# Ejercicio, DF con las medallas ganadoras, pais y equipo

### DataFrame de las filtrado de las medallas ganadoras

In [38]:
medals_winned = medals_DF.filter(medals_DF.medal != "NA")
medals_winned.show()

+---------+------+----------+-------+--------+
|result_id| medal|athlete_id|game_id|event_id|
+---------+------+----------+-------+--------+
|        4|  Gold|         4|      2|       4|
|       38|Bronze|        15|      7|      19|
|       39|Bronze|        15|      7|      20|
|       41|Bronze|        16|     50|      14|
|       42|Bronze|        17|     17|      21|
|       43|  Gold|        17|     17|      22|
|       45|  Gold|        17|     17|      24|
|       49|  Gold|        17|     17|      28|
|       51|Bronze|        17|     19|      22|
|       61|  Gold|        20|     38|      32|
|       62|Bronze|        20|     38|      33|
|       64|Silver|        20|     40|      31|
|       65|Bronze|        20|     40|      32|
|       68|Silver|        20|     40|      35|
|       74|  Gold|        20|     44|      32|
|       77|  Gold|        20|     44|      35|
|       79|  Gold|        20|     46|      32|
|       80|  Gold|        21|     47|      36|
|       87|Si

### Join Entre DF medallas ganadas, paises y equipos

In [39]:
medal_team_counrty = teams_DF.join(athlete_DF, teams_DF.team_id == athlete_DF.team_id, "left")\
    .join(medals_winned, athlete_DF.athlete_id == medals_winned.athlete_id, "left")\
    .select(medals_winned.medal, teams_DF.team_name, teams_DF.country)

In [40]:
medal_team_counrty.filter(medal_team_counrty.medal != "NA").show()

+------+--------------+-------+
| medal|     team_name|country|
+------+--------------+-------+
|  Gold|Denmark/Sweden|    SWE|
|Bronze|       Finland|    FIN|
|Bronze|       Finland|    FIN|
|Bronze|       Finland|    FIN|
|Bronze|       Finland|    FIN|
|  Gold|       Finland|    FIN|
|  Gold|       Finland|    FIN|
|  Gold|       Finland|    FIN|
|Bronze|       Finland|    FIN|
|  Gold|        Norway|    NOR|
|  Gold|        Norway|    NOR|
|  Gold|        Norway|    NOR|
|Silver|        Norway|    NOR|
|Bronze|        Norway|    NOR|
|Silver|        Norway|    NOR|
|Bronze|        Norway|    NOR|
|  Gold|        Norway|    NOR|
|  Gold|        Norway|    NOR|
|Silver|        Norway|    NOR|
|Bronze|   Netherlands|    NED|
+------+--------------+-------+
only showing top 20 rows



## Funciones de agregacion

In [41]:
winner_per_year = athlete_DF\
    .join(medals_DF, athlete_DF.athlete_id == medals_DF.athlete_id, "left")\
    .join(games_DF, games_DF.game_id == medals_DF.game_id,"left")\
    .join(teams_DF, athlete_DF.team_id == teams_DF.team_id, "left")\
    .join(events_DF, events_DF.event_id == medals_DF.event_id,"left")\
    .join(sports_DF, events_DF.sport_id == sports_DF.sport_id, "left")\
    .select(teams_DF.country, games_DF.year, medals_DF.medal,
           events_DF.event_name, sports_DF.sport, athlete_DF.name)

In [42]:
winner_per_year.show()

+-------+----+-----+--------------------+--------------------+--------------------+
|country|year|medal|          event_name|               sport|                name|
+-------+----+-----+--------------------+--------------------+--------------------+
|    CHN|1992|   NA|Basketball Men's ...|          Basketball|           A Dijiang|
|    CHN|2012|   NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|    DEN|1920|   NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|    SWE|1900| Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|    NED|1994|   NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|    NED|1994|   NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|    NED|1992|   NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|    NED|1992|   NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|    NED|1988|   NA|Speed Skating Wom...|       Speed Skating|Christine Jaco

In [43]:
medal_country_per_year = winner_per_year.filter(winner_per_year.medal != "NA")\
    .sort("year")\
    .groupBy("country","year","event_name")\
    .count()

In [44]:
medal_country_per_year.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- count: long (nullable = false)



In [45]:
medal_country_per_year.groupBy('country', 'year')\
    .agg(sum('count').alias("total_medals"),avg("count").alias("medals_mean")).show()

+-------+----+------------+------------------+
|country|year|total_medals|       medals_mean|
+-------+----+------------+------------------+
|    USA|1896|          20|1.6666666666666667|
|    GER|1896|          30|               2.5|
|    GBR|1896|           8|1.1428571428571428|
|    FRA|1896|          11|             1.375|
|    GRE|1896|           9|1.2857142857142858|
|    HUN|1896|           6|               1.0|
|    AUS|1896|           3|               1.0|
|    AUT|1896|           5|               1.0|
|    DEN|1896|           6|               1.0|
|    SUI|1896|           3|               1.0|
|    SWE|1900|           6|               3.0|
|    USA|1900|          65|2.4074074074074074|
|    FRA|1900|         179| 3.314814814814815|
|    GER|1900|          27|               4.5|
|    NOR|1900|           9|               1.8|
|    GBR|1900|          68|2.8333333333333335|
|    HUN|1900|           5|               1.0|
|    SUI|1900|          21|               3.5|
|    NED|1900

## Usar SQL de forma nativa

In [58]:
# Cargar DataFrames como tablas SQL

medals_DF.registerTempTable("medals")
athlete_DF.registerTempTable("athlete")
teams_DF.registerTempTable("teams")

In [49]:
sqlContext.sql("SELECT * FROM athlete").show(5)

+----------+--------------------+------------------+-------+
|athlete_id|                name|age_in_competition|team_id|
+----------+--------------------+------------------+-------+
|         1|           A Dijiang|                24|    199|
|         2|            A Lamusi|                23|    199|
|         3| Gunnar Nielsen Aaby|                24|    273|
|         4|Edgar Lindenau Aabye|                34|    278|
|         5|Christine Jacoba ...|                21|    705|
+----------+--------------------+------------------+-------+
only showing top 5 rows



In [50]:
sqlContext.sql("SELECT * FROM medals").show(5)

+---------+-----+----------+-------+--------+
|result_id|medal|athlete_id|game_id|event_id|
+---------+-----+----------+-------+--------+
|        1|   NA|         1|     39|       1|
|        2|   NA|         2|     49|       2|
|        3|   NA|         3|      7|       3|
|        4| Gold|         4|      2|       4|
|        5|   NA|         5|     36|       5|
+---------+-----+----------+-------+--------+
only showing top 5 rows



In [51]:
sqlContext.sql("SELECT * FROM teams").show(5)

+-------+--------------------+-------+
|team_id|           team_name|country|
+-------+--------------------+-------+
|      1|         30. Februar|    AUT|
|      2|A North American ...|    MEX|
|      3|           Acipactli|    MEX|
|      4|             Acturus|    ARG|
|      5|         Afghanistan|    AFG|
+-------+--------------------+-------+
only showing top 5 rows



In [56]:
sqlContext.sql("""
    SELECT medal,team_name, country FROM medals m
    JOIN athlete a
    ON m.athlete_id = a.athlete_id
    JOIN teams t
    ON t.team_id = a.team_id
    WHERE medal <> "NA"
    ORDER BY country DESC
    """).show(5)

+------+---------+-------+
| medal|team_name|country|
+------+---------+-------+
|  Gold| Zimbabwe|    ZIM|
|Silver| Zimbabwe|    ZIM|
|  Gold| Zimbabwe|    ZIM|
|Silver| Zimbabwe|    ZIM|
|Silver| Zimbabwe|    ZIM|
+------+---------+-------+
only showing top 5 rows



In [57]:
spark

## UDF (User Defined Functions)

In [60]:
!ls files/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [61]:
!head -n 10 files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278
5,Christine Jacoba Aaftink,2,21,185,82,705
6,Per Knut Aaland,1,31,188,75,1096
7,John Aalberg,1,31,183,72,1096
8,"Cornelia ""Cor"" Aalten (-Strannood)",2,18,168,,705
9,Antti Sami Aalto,1,26,186,96,350


In [68]:
athlete_error_RDD = spark.textFile("files/deportistaError.csv")\
    .map(lambda l: l.split(","))

In [69]:
athlete_error_RDD.take(3)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [70]:
def delete_header(index, iterator):
    return iter(list(iterator)[1:])

In [71]:
athlete_error_RDD = athlete_error_RDD.mapPartitionsWithIndex(delete_header)

In [73]:
athlete_error_RDD.take(3)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273']]

In [74]:
athlete_error_RDD = athlete_error_RDD.map(lambda l:
    (l[0], l[1], l[2], l[3], l[4], l[5], l[6]))

In [75]:
schema_athlete_error = StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("gender", StringType(), False),
    StructField("age", StringType(), False),
    StructField("height", StringType(), False),
    StructField("weight", StringType(), False),
    StructField("team_id", StringType(), False),
])

athlete_error_DF = sqlContext.createDataFrame(athlete_error_RDD, schema_athlete_error)

In [76]:
athlete_error_DF.show()

+---+--------------------+------+---+------+------+-------+
| id|                name|gender|age|height|weight|team_id|
+---+--------------------+------+---+------+------+-------+
|  1|           A Dijiang|     1| 24|   180|    80|    199|
|  2|            A Lamusi|     1| 23|   170|    60|    199|
|  3| Gunnar Nielsen Aaby|     1| 24|      |      |    273|
|  4|Edgar Lindenau Aabye|     1| 34|      |      |    278|
|  5|Christine Jacoba ...|     2| 21|   185|    82|    705|
|  6|     Per Knut Aaland|     1| 31|   188|    75|   1096|
|  7|        John Aalberg|     1| 31|   183|    72|   1096|
|  8|"Cornelia ""Cor""...|     2| 18|   168|      |    705|
|  9|    Antti Sami Aalto|     1| 26|   186|    96|    350|
| 10|"Einar Ferdinand ...|     1| 26|      |      |    350|
| 11|  Jorma Ilmari Aalto|     1| 22|   182|  76.5|    350|
| 12|   Jyri Tapani Aalto|     1| 31|   172|    70|    350|
| 13|  Minna Maarit Aalto|     2| 30|   159|  55.5|    350|
| 14|Pirjo Hannele Aal...|     2| 32|   

In [77]:
from pyspark.sql.functions import udf

In [79]:
def to_int(value):
    return int(value) if len(value) > 0 else None

In [80]:
to_int_UDF = udf(lambda z: to_int(z), IntegerType())
sqlContext.udf.register("to_int_UDF", to_int_UDF)

<function __main__.<lambda>(z)>

In [83]:
athlete_error_DF.select(to_int_UDF("height").alias("height_UDF")).show(5)

+----------+
|height_UDF|
+----------+
|       180|
|       170|
|      null|
|      null|
|       185|
+----------+
only showing top 5 rows



In [84]:
#spark.stop()

## Particionado y Persistencia de datos
#### Documentacion
https://spark.apache.org/docs/2.4.6/api/python/pyspark.html#pyspark.StorageLevel

https://spark.apache.org/docs/2.4.6/api/python/pyspark.html

In [85]:
from pyspark.storagelevel import StorageLevel

In [86]:
medal_country_per_year.is_cached

False

In [87]:
medal_country_per_year.rdd.cache()

MapPartitionsRDD[222] at javaToPython at NativeMethodAccessorImpl.java:0

In [88]:
medal_country_per_year.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [89]:
medal_country_per_year.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

Py4JJavaError: An error occurred while calling o794.persist.
: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
	at org.apache.spark.rdd.RDD.persist(RDD.scala:170)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:195)
	at org.apache.spark.api.java.JavaRDD.persist(JavaRDD.scala:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)


In [90]:
medal_country_per_year.rdd.unpersist()

MapPartitionsRDD[222] at javaToPython at NativeMethodAccessorImpl.java:0

In [91]:
medal_country_per_year.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[222] at javaToPython at NativeMethodAccessorImpl.java:0

In [93]:
medal_country_per_year.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 2)

## Crear Percistencias 

In [94]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

In [95]:
medal_country_per_year.rdd.unpersist()

MapPartitionsRDD[222] at javaToPython at NativeMethodAccessorImpl.java:0

In [96]:
medal_country_per_year.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[222] at javaToPython at NativeMethodAccessorImpl.java:0

In [99]:
medal_country_per_year.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 3)

In [100]:
spark.stop()

## Particionamiento

In [101]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Partition")\
    .master("local[5]").getOrCreate()

In [102]:
df = spark.range(0,20)
df.rdd.getNumPartitions()

5

In [105]:
rdd1 = spark.sparkContext.parallelize((0,20), 10)
rdd1.getNumPartitions()

10

## Cargar archivos con particiones especificas

In [106]:
rdd_from_file = spark.sparkContext.textFile("files/deporte.csv",10)

In [108]:
rdd_from_file.getNumPartitions()

10

In [110]:
rdd_from_file.saveAsTextFile("files/rdd_info")

In [113]:
!ls files/rdd_info

part-00000  part-00002	part-00004  part-00006	part-00008  _SUCCESS
part-00001  part-00003	part-00005  part-00007	part-00009


In [114]:
!head -n 5 files/rdd_info/part-00000

deporte_id,deporte
1,Basketball
2,Judo
3,Football
4,Tug-Of-War


In [115]:
!head -n 5 files/rdd_info/part-00001

7,Athletics
8,Ice Hockey
9,Swimming
10,Badminton
11,Sailing


### Cargar archivos desde particiones

In [116]:
rdd = spark.sparkContext.wholeTextFiles("files/rdd_info/*")

In [119]:
rdd.take(4)

[('file:/home/mau/Documents/spark_project/files/rdd_info/part-00007',
  '47,Beach Volleyball\n48,Triathlon\n49,Ski Jumping\n50,Curling\n51,Snowboarding\n52,Rugby\n53,Short Track Speed Skating\n'),
 ('file:/home/mau/Documents/spark_project/files/rdd_info/part-00000',
  'deporte_id,deporte\n1,Basketball\n2,Judo\n3,Football\n4,Tug-Of-War\n5,Speed Skating\n6,Cross Country Skiing\n'),
 ('file:/home/mau/Documents/spark_project/files/rdd_info/part-00004',
  '29,Cycling\n30,Diving\n31,Canoeing\n32,Tennis\n33,Modern Pentathlon\n34,Figure Skating\n35,Golf\n'),
 ('file:/home/mau/Documents/spark_project/files/rdd_info/part-00001',
  '7,Athletics\n8,Ice Hockey\n9,Swimming\n10,Badminton\n11,Sailing\n12,Biathlon\n13,Gymnastics\n14,Art Competitions\n')]

In [120]:
list_rdd = rdd.mapValues(lambda x: x.split()).collect()

In [122]:
list_rdd = [l[0] for l in list_rdd]
list_rdd.sort()

In [124]:
list_rdd

['file:/home/mau/Documents/spark_project/files/rdd_info/part-00000',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00001',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00002',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00003',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00004',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00005',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00006',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00007',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00008',
 'file:/home/mau/Documents/spark_project/files/rdd_info/part-00009']

In [125]:
rdd2 = spark.sparkContext.textFile(','.join(list_rdd),10)\
    .map(lambda l : l.split(","))

In [126]:
rdd2.take(5)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football'],
 ['4', 'Tug-Of-War']]

In [127]:
spark.stop()