# Dataframes
**By Jorge S. Ruiz**
 - This is an introduction about how to use Dataframes in Spark.
 - Dataframes can be used as SQL tables.
 - Dataframes have better optimization because they use Catalyst as query optimization and Tugsten as execution engine.
 

## Creating a dataframe from csv file

In [1]:
# Libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Libraries for Datatypes (dataframes)
from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row

# Library for SQL
from pyspark.sql import SQLContext


In [2]:
# Initializing Spark
spark = SparkContext(master='local', appName='Dataframes')
# Initializing SQL Context
sqlContext = SQLContext(spark)

In [3]:
!ls /home/lastorder/Documents/curso-apache-spark-platzi/files

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [4]:
# Linux command to check the file content
!head -n 4 /home/lastorder/Documents/curso-apache-spark-platzi/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis


In [5]:
# Creating the PATH to csv files.
path = '/home/lastorder/Documents/curso-apache-spark-platzi/files/'


In [6]:
# First we need to create a schema with the information of the columns:
# Struct file helpus to indicate the parameters of the columns of the DF
# The fields are, name of the column, datatype and if the column is an optional field.
# False indicates that the column is a necessary field and true indicates that is optional.

gameSchema = StructType([
    StructField('game_id',IntegerType(),False),
    StructField('year',StringType(),False),
    StructField('season',StringType(),False),
    StructField('city',StringType(),False)
])

# Now we can create the dataframe using our previous Schema.

gameDF = sqlContext.read.schema(gameSchema).option('header','true') \
    .csv(path+'juegos.csv')

In [7]:
# In dataframes we can use "show" to obtain a better data visualization
gameDF.show(10)

+-------+-------------+------+--------+
|game_id|         year|season|    city|
+-------+-------------+------+--------+
|      1|  1896 Verano|  1896|  Verano|
|      2|  1900 Verano|  1900|  Verano|
|      3|  1904 Verano|  1904|  Verano|
|      4|  1906 Verano|  1906|  Verano|
|      5|  1908 Verano|  1908|  Verano|
|      6|  1912 Verano|  1912|  Verano|
|      7|  1920 Verano|  1920|  Verano|
|      8|1924 Invierno|  1924|Invierno|
|      9|  1924 Verano|  1924|  Verano|
|     10|1928 Invierno|  1928|Invierno|
+-------+-------------+------+--------+
only showing top 10 rows



In [8]:
# To access to Spark UI console, we use just "spark" command and click on Spark UI
spark

## Using a Extract from RDDs Notebook

In [9]:
# Exporting 2 RDDs, the first one contains the header and the second one contains the data.
OlimpicAthleteRDD = spark.textFile(path+'deportista.csv').map(lambda l : l.split(','))
OlimpicAthleteRDD2 = spark.textFile(path+'deportista2.csv').map(lambda l : l.split(','))

# To make a union between the RDDs we can use:
OlimpicAthleteRDD = OlimpicAthleteRDD.union(OlimpicAthleteRDD2)


In [10]:
# To make sure that the data is not corrupted, we can use count() to verify that spark is working correctly
# with that data
OlimpicAthleteRDD.count()

135572

In [11]:
# To see the first 10 rows of the RDD, we use top function (similar to SQL)
OlimpicAthleteRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [12]:
# Removing the header of the RDD
# 'iter' function, returns all values of we process in the function

def removeHeader(index, iterator):
    """A fuction that removes the header from a dataset or RDD"""
    return iter(list(iterator)[1:])

In [13]:
# We map the RDD assigning a index (this maps rows and columns per index)
# Now we can use a function that will take action in all rows and columns on the RDD.
OlimpicAthleteRDD_clean = OlimpicAthleteRDD.mapPartitionsWithIndex(removeHeader)

In [14]:
# To see if the header is gone
OlimpicAthleteRDD_clean.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [15]:
# To transform the values of the RDD
# For that we are goint to make a mapping

OlimpicAthleteRDD_clean = OlimpicAthleteRDD_clean.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])    
))


In [16]:
# We ned to define a new Schema:

schema = StructType([
    StructField('athlete_id', IntegerType(),False),
    StructField('name', StringType(),False),
    StructField('gender', IntegerType(),False),
    StructField('age', IntegerType(),False),
    StructField('height', IntegerType(),False),
    StructField('weight', FloatType(),False),
    StructField('team_id', IntegerType(),False),
])

In [17]:
# Creating a dataframe with sqlContext, using an existing RDD and a Schema
AthleteDF = sqlContext.createDataFrame(OlimpicAthleteRDD_clean, schema)

In [18]:
# To prove everything is correct with our new DF, we can use show function
AthleteDF.show(10)

+----------+--------------------+------+---+------+------+-------+
|athlete_id|                name|gender|age|height|weight|team_id|
+----------+--------------------+------+---+------+------+-------+
|         1|           A Dijiang|     1| 24|   180|  80.0|    199|
|         2|            A Lamusi|     1| 23|   170|  60.0|    199|
|         3| Gunnar Nielsen Aaby|     1| 24|     0|   0.0|    273|
|         4|Edgar Lindenau Aabye|     1| 34|     0|   0.0|    278|
|         5|Christine Jacoba ...|     2| 21|   185|  82.0|    705|
|         6|     Per Knut Aaland|     1| 31|   188|  75.0|   1096|
|         7|        John Aalberg|     1| 31|   183|  72.0|   1096|
|         8|Cornelia Cor Aalt...|     2| 18|   168|   0.0|    705|
|         9|    Antti Sami Aalto|     1| 26|   186|  96.0|    350|
|        10|Einar Ferdinand E...|     1| 26|     0|   0.0|    350|
+----------+--------------------+------+---+------+------+-------+
only showing top 10 rows



In [19]:
# To see the explicit values without format we use "take" function
AthleteDF.take(10)

[Row(athlete_id=1, name='A Dijiang', gender=1, age=24, height=180, weight=80.0, team_id=199),
 Row(athlete_id=2, name='A Lamusi', gender=1, age=23, height=170, weight=60.0, team_id=199),
 Row(athlete_id=3, name='Gunnar Nielsen Aaby', gender=1, age=24, height=0, weight=0.0, team_id=273),
 Row(athlete_id=4, name='Edgar Lindenau Aabye', gender=1, age=34, height=0, weight=0.0, team_id=278),
 Row(athlete_id=5, name='Christine Jacoba Aaftink', gender=2, age=21, height=185, weight=82.0, team_id=705),
 Row(athlete_id=6, name='Per Knut Aaland', gender=1, age=31, height=188, weight=75.0, team_id=1096),
 Row(athlete_id=7, name='John Aalberg', gender=1, age=31, height=183, weight=72.0, team_id=1096),
 Row(athlete_id=8, name='Cornelia Cor Aalten Strannood ', gender=2, age=18, height=168, weight=0.0, team_id=705),
 Row(athlete_id=9, name='Antti Sami Aalto', gender=1, age=26, height=186, weight=96.0, team_id=350),
 Row(athlete_id=10, name='Einar Ferdinand Einari Aalto', gender=1, age=26, height=0, we

In [20]:
# Creating another Dataframe from RDD
CountriesRDD = spark.textFile(path+"paises.csv")\
    .map(lambda line : line.split(","))

CountriesRDD = CountriesRDD.mapPartitionsWithIndex(removeHeader)


CountriesRDD = CountriesRDD.map(lambda l: (
    int(l[0]),
    l[1],
    l[2]
))

CountriesSchema = StructType([
    StructField('team_id', IntegerType(),False),
    StructField('team_name', StringType(),False),
    StructField('country_name', StringType(),False)
])

CountriesDF = sqlContext.createDataFrame(CountriesRDD, CountriesSchema)

In [21]:
CountriesDF.show(10)

+-------+--------------------+------------+
|team_id|           team_name|country_name|
+-------+--------------------+------------+
|      1|         30. Februar|         AUT|
|      2|A North American ...|         MEX|
|      3|           Acipactli|         MEX|
|      4|             Acturus|         ARG|
|      5|         Afghanistan|         AFG|
|      6|            Akatonbo|         IRL|
|      7|            Alain IV|         SUI|
|      8|             Albania|         ALB|
|      9|              Alcaid|         POR|
|     10|            Alcyon-6|         FRA|
+-------+--------------------+------------+
only showing top 10 rows



In [22]:
# Creating a Dataframefrom csv with a schema (without using an RDD)
OlympicSportsRDDSchema = StructType([
    StructField('sport_id', IntegerType(),False),
    StructField('sport_name', StringType(),False)
])

sportsDF = sqlContext.read.schema(OlympicSportsRDDSchema).option('header','true') \
    .csv(path+'deporte.csv')

In [23]:
sportsDF.show(10)

+--------+--------------------+
|sport_id|          sport_name|
+--------+--------------------+
|       1|          Basketball|
|       2|                Judo|
|       3|            Football|
|       4|          Tug-Of-War|
|       5|       Speed Skating|
|       6|Cross Country Skiing|
|       7|           Athletics|
|       8|          Ice Hockey|
|       9|            Swimming|
|      10|           Badminton|
+--------+--------------------+
only showing top 10 rows



In [24]:
# In the same way, we can create as many datasets as we want.
# All we need is to define an schema and import the csv file.
EventsRDDSchema = StructType([
    StructField('event_id', IntegerType(),False),
    StructField('event_name', StringType(),False),
    StructField('sport_id', IntegerType(),False)
])

OlympicEventsDF = sqlContext.read.schema(EventsRDDSchema).option('header','true') \
    .csv(path+'evento.csv')

In [25]:
OlympicEventsDF.show(10)

+--------+--------------------+--------+
|event_id|          event_name|sport_id|
+--------+--------------------+--------+
|       1|Basketball Men's ...|       1|
|       2|Judo Men's Extra-...|       2|
|       3|Football Men's Fo...|       3|
|       4|Tug-Of-War Men's ...|       4|
|       5|Speed Skating Wom...|       5|
|       6|Speed Skating Wom...|       5|
|       7|Cross Country Ski...|       6|
|       8|Cross Country Ski...|       6|
|       9|Cross Country Ski...|       6|
|      10|Cross Country Ski...|       6|
+--------+--------------------+--------+
only showing top 10 rows



In [26]:
GamesRDDSchema = StructType([
    StructField('game_id', IntegerType(),False),
    StructField('year', StringType(),False),
    StructField('season', StringType(),False),
    StructField('city', StringType(),False)
])

GamesDF = sqlContext.read.schema(GamesRDDSchema).option('header','true') \
    .csv(path+'juegos.csv')

In [27]:
GamesDF.show(10)

+-------+-------------+------+--------+
|game_id|         year|season|    city|
+-------+-------------+------+--------+
|      1|  1896 Verano|  1896|  Verano|
|      2|  1900 Verano|  1900|  Verano|
|      3|  1904 Verano|  1904|  Verano|
|      4|  1906 Verano|  1906|  Verano|
|      5|  1908 Verano|  1908|  Verano|
|      6|  1912 Verano|  1912|  Verano|
|      7|  1920 Verano|  1920|  Verano|
|      8|1924 Invierno|  1924|Invierno|
|      9|  1924 Verano|  1924|  Verano|
|     10|1928 Invierno|  1928|Invierno|
+-------+-------------+------+--------+
only showing top 10 rows



In [28]:
ResultsRDDSchema = StructType([
    StructField('result_id', IntegerType(),False),
    StructField('medal', StringType(),False),
    StructField('athlete_id', IntegerType(),False),
    StructField('game_id', IntegerType(),False),
    StructField('event_id', IntegerType(),False)
])

ResultsDF = sqlContext.read.schema(ResultsRDDSchema).option('header','true') \
    .csv(path+'resultados.csv')

In [29]:
ResultsDF.take(10)

[Row(result_id=1, medal='NA', athlete_id=1, game_id=39, event_id=1),
 Row(result_id=2, medal='NA', athlete_id=2, game_id=49, event_id=2),
 Row(result_id=3, medal='NA', athlete_id=3, game_id=7, event_id=3),
 Row(result_id=4, medal='Gold', athlete_id=4, game_id=2, event_id=4),
 Row(result_id=5, medal='NA', athlete_id=5, game_id=36, event_id=5),
 Row(result_id=6, medal='NA', athlete_id=5, game_id=36, event_id=6),
 Row(result_id=7, medal='NA', athlete_id=5, game_id=38, event_id=5),
 Row(result_id=8, medal='NA', athlete_id=5, game_id=38, event_id=6),
 Row(result_id=9, medal='NA', athlete_id=5, game_id=40, event_id=5),
 Row(result_id=10, medal='NA', athlete_id=5, game_id=40, event_id=6)]

In [30]:
# A way to see the structure of the schema is with the next command:
sportsDF.printSchema()
# Because sometimes we won't have the schema details.

root
 |-- sport_id: integer (nullable = true)
 |-- sport_name: string (nullable = true)



In [31]:
AthleteDF.printSchema()

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- gender: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- height: integer (nullable = false)
 |-- weight: float (nullable = false)
 |-- team_id: integer (nullable = false)



In [32]:
# To modify columns we use:
# We also drop columns with drop command

AthleteDF2 = AthleteDF.withColumnRenamed('gender','gender_athlete').drop('height')

In [33]:
AthleteDF2.printSchema()

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- gender_athlete: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- weight: float (nullable = false)
 |-- team_id: integer (nullable = false)



In [34]:
# Libraries to work with SQL commands in Spark
from pyspark.sql.functions import *


In [35]:
# Col function i to operate directly in the column header. it works differect 
# that a simple DF function
# Col works only with a certain column (don't run over the entire dataframe)
# Col created a list with the columns headers to operate with them.
# In this case we use col to rename a column
AthleteDF_select = AthleteDF2.select('athlete_id',
                  'name',
                  col('age').alias('PlayAge'),
                  'team_id')

In [36]:
AthleteDF_select.show(10)

+----------+--------------------+-------+-------+
|athlete_id|                name|PlayAge|team_id|
+----------+--------------------+-------+-------+
|         1|           A Dijiang|     24|    199|
|         2|            A Lamusi|     23|    199|
|         3| Gunnar Nielsen Aaby|     24|    273|
|         4|Edgar Lindenau Aabye|     34|    278|
|         5|Christine Jacoba ...|     21|    705|
|         6|     Per Knut Aaland|     31|   1096|
|         7|        John Aalberg|     31|   1096|
|         8|Cornelia Cor Aalt...|     18|    705|
|         9|    Antti Sami Aalto|     26|    350|
|        10|Einar Ferdinand E...|     26|    350|
+----------+--------------------+-------+-------+
only showing top 10 rows



In [37]:
# Sorting by PlayAge values in descending from:
AthleteDF_select.sort('PlayAge').show()
# We can see that there are some incorrect values with 0

+----------+--------------------+-------+-------+
|athlete_id|                name|PlayAge|team_id|
+----------+--------------------+-------+-------+
|       224|     Mohamed AbdelEl|      0|    308|
|       487|      Inni Aboubacar|      0|    721|
|       226|Sanad Bushara Abd...|      0|   1003|
|        58|    Georgi Abadzhiev|      0|    154|
|       230|    Moustafa Abdelal|      0|    308|
|       102|   Sayed Fahmy Abaza|      0|    308|
|       260|  Ahmed Abdo Mustafa|      0|   1003|
|       139|George Ioannis Abbot|      0|   1043|
|       281|      S. Abdul Hamid|      0|    487|
|       163|     Ismail Abdallah|      0|   1095|
|       285|Talal Hassoun Abd...|      0|    497|
|       173| Mohamed Abdel Fatah|      0|   1003|
|       179|Ibrahim Saad Abde...|      0|   1003|
|       378|     Angelik Abebame|      0|      0|
|       294|Mohamed Ghulom Ab...|      0|     81|
|       186| Mohamed Abdel Hafiz|      0|   1095|
|       300|     A. Abdul Razzak|      0|    497|


In [38]:
# To avoid 0 values, we will use a filter with a condition (similar to WHERE in SQL)
AthleteDF_filter = AthleteDF_select.filter(AthleteDF_select.PlayAge != 0)

In [39]:
# We can see that the youngest athlete in participated in olympic games was
# a 10 years child.
AthleteDF_filter.sort('PlayAge').show()

+----------+--------------------+-------+-------+
|athlete_id|                name|PlayAge|team_id|
+----------+--------------------+-------+-------+
|     71691|  Dimitrios Loundras|     10|    333|
|     70616|          Liu Luyang|     11|    199|
|    118925|Megan Olwen Deven...|     11|    413|
|     52070|        Etsuko Inada|     11|    514|
|     22411|Magdalena Cecilia...|     11|    413|
|     40129|    Luigina Giavotti|     11|    507|
|     47618|Sonja Henie Toppi...|     11|    742|
|     76675|   Marcelle Matthews|     11|    967|
|     37333|Carlos Bienvenido...|     11|    982|
|     51268|      Beatrice Hutiu|     11|    861|
|    126307|        Liana Vicens|     11|    825|
|     48939|             Ho Gang|     12|    738|
|     49142|        Jan Hoffmann|     12|    302|
|     42835|   Werner Grieshofer|     12|     71|
|     54620|Belita Gladys Lyn...|     12|    413|
|     31203|Patricia Anne Pat...|     12|    967|
|     43528|Antoinette Joyce ...|     12|    172|


## Dataframe JOIN

In [40]:
# First, we need to check the schema's structures
AthleteDF.printSchema()

root
 |-- athlete_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- gender: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- height: integer (nullable = false)
 |-- weight: float (nullable = false)
 |-- team_id: integer (nullable = false)



In [41]:
ResultsDF.printSchema()

root
 |-- result_id: integer (nullable = true)
 |-- medal: string (nullable = true)
 |-- athlete_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- event_id: integer (nullable = true)



In [42]:
GamesDF.printSchema()

root
 |-- game_id: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- season: string (nullable = true)
 |-- city: string (nullable = true)



In [43]:
OlympicEventsDF.printSchema()

root
 |-- event_id: integer (nullable = true)
 |-- event_name: string (nullable = true)
 |-- sport_id: integer (nullable = true)



In [44]:
# for join parameters, the first one is the table we want to join (in this case ResultsDF)
# next we need to introduce the condition (similar to JOIN - ON in SQL), and the kind of join (inner, left, etc)
AthleteDF.join(
                ResultsDF, 
                AthleteDF.athlete_id == ResultsDF.athlete_id,
                'left'
                ) \
         .join(
                GamesDF,
                ResultsDF.game_id == GamesDF.game_id,
                'left'
                ) \
         .join(
                OlympicEventsDF,
                ResultsDF.event_id == OlympicEventsDF.event_id,
                'left'
                ) \
         .select(
                AthleteDF.name.alias('athlete_name'), 
                col('age').alias('PlayAge'),
                'medal',
                col('year').alias('PlayYear'),
                OlympicEventsDF.event_name.alias('discipline_name')
                ).show()

+--------------------+-------+-----+-------------+--------------------+
|        athlete_name|PlayAge|medal|     PlayYear|     discipline_name|
+--------------------+-------+-----+-------------+--------------------+
|           A Dijiang|     24|   NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|     23|   NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|     24|   NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|     34| Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|     21|   NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|     21|   NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|     21|   NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|     21|   NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|     21|   NA|1988 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|     21|   NA|1988 Invierno|Speed Skating Wom...|
|     Per Knut Aaland|     31|   NA|1994 Invierno|Cross Country 

In [45]:
# Platzi Challenge !!
# Filtering result with medals
# join with countries and and athletes
# then sorting the results by a column in a descending form
ResultsDF.filter(ResultsDF.medal != 'NA') \
    .join(
           AthleteDF, 
           AthleteDF.athlete_id == ResultsDF.athlete_id,
           'left'
          ) \
    .join(
           CountriesDF,
           CountriesDF.team_id == AthleteDF.team_id,
           'left'
          ) \
    .select('medal','team_name','country_name') \
    .sort( col('country_name').desc()).show()

+------+---------+------------+
| medal|team_name|country_name|
+------+---------+------------+
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|Bronze| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
+------+---------+------------+
only showing top 20 rows



In [46]:
# Defining a new table with more info.
AthleteMedals = AthleteDF \
        .join(
                ResultsDF,
                ResultsDF.athlete_id == AthleteDF.athlete_id,
                'left'
             ) \
        .join(
                GamesDF,
                GamesDF.game_id == ResultsDF.game_id,
                'left'
             ) \
        .join(
                CountriesDF,
                CountriesDF.team_id == AthleteDF.team_id,
                'left'
             ) \
        .join(
                OlympicEventsDF,
                OlympicEventsDF.event_id == ResultsDF.event_id,
                'left'
             ) \
        .join(
                sportsDF,
                sportsDF.sport_id == OlympicEventsDF.sport_id,
                'left'
             ) \
        .select(
                'country_name',
                'year',
                'medal',
                OlympicEventsDF.event_name.alias('Subdiscipline_name'),
                sportsDF.sport_name.alias('Discipline_name'),
                AthleteDF.name,
               )
             

In [47]:
AthleteMedals.show(10)

+------------+-----------+-----+--------------------+---------------+--------------------+
|country_name|       year|medal|  Subdiscipline_name|Discipline_name|                name|
+------------+-----------+-----+--------------------+---------------+--------------------+
|         BRU|2016 Verano|   NA|Athletics Women's...|      Athletics|Maizurah Abdul Rahim|
|         BRU|2000 Verano|   NA|Shooting Men's Skeet|       Shooting|Jefri Kiko Bolkia...|
|         BRU|1996 Verano|   NA|Shooting Men's Skeet|       Shooting|Jefri Kiko Bolkia...|
|         BRU|2004 Verano|   NA|Athletics Men's 1...|      Athletics|     Jimmy Anak Ahar|
|         BRU|2000 Verano|   NA|Athletics Men's 1...|      Athletics|         Haseri Asli|
|         BRU|2016 Verano|   NA|Athletics Men's 1...|      Athletics|Mohamed Fakhri Is...|
|         BRU|2012 Verano|   NA|Swimming Men's 20...|       Swimming|Anderson Chee Wei...|
|         BRU|2012 Verano|   NA|Athletics Women's...|      Athletics|      Maziah Mahusin|

In [48]:
# #filtering only3 first places in the podium, Making a count() using groupby and sorting by year
AthleteMedals2 = AthleteMedals.filter(AthleteMedals.medal != 'NA') \
    .sort('year') \
    .groupBy('country_name','year','Subdiscipline_name') \
    .count()

In [49]:
AthleteMedals2.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- Subdiscipline_name: string (nullable = true)
 |-- count: long (nullable = false)



In [50]:
# agg (agregate) function is the best way to make aggregations
# We are making a sum column and an average column
# totals will be groupby country and year
AthleteMedals2.groupBy('country_name','year') \
    .agg(sum('count').alias('Total Medals'), \
    avg('count').alias('Average Medals')).show()

+------------+-------------+------------+------------------+
|country_name|         year|Total Medals|    Average Medals|
+------------+-------------+------------+------------------+
|         NED|1992 Invierno|           4|1.3333333333333333|
|         BEL|  2000 Verano|           7|               1.4|
|         MAS|  2012 Verano|           2|               1.0|
|         MGL|  2008 Verano|           5|              1.25|
|         SWE|  1976 Verano|          10|               2.0|
|         SUI|2014 Invierno|          29|3.2222222222222223|
|         ETH|  2004 Verano|           7|              1.75|
|         AUT|  1928 Verano|           5|              1.25|
|         SYR|  1984 Verano|           1|               1.0|
|         ITA|  1996 Verano|          69| 2.225806451612903|
|         THA|  2008 Verano|           4|               1.0|
|         URS|1984 Invierno|          56|               2.8|
|         DEN|  1896 Verano|           6|               1.0|
|         GRN|  2016 Ver

## Normal SQL Querys in Spark
It is highly recommended to use SQL Context to build normal tables or dataframes, but for big tables and complex joins it's better use native Spark functions because in that way we will use less cluster power processing.
SQL context is faster for consulting data but requires more power processing. For big data SQLContext is not worth.

In [55]:
# Frist, we register a table as a temporal SQL table with this sintaxis:
ResultsDF.registerTempTable('Results')
AthleteDF.registerTempTable('Athlete')
CountriesDF.registerTempTable('Countries')

In [56]:
# With SQL Context we can run traditional SQL querys in our temporal tables:
sqlContext.sql('SELECT * FROM Athlete').show(5)
sqlContext.sql('SELECT * FROM Results').show(5)
sqlContext.sql('SELECT * FROM Countries').show(5)

+----------+--------------------+------+---+------+------+-------+
|athlete_id|                name|gender|age|height|weight|team_id|
+----------+--------------------+------+---+------+------+-------+
|         1|           A Dijiang|     1| 24|   180|  80.0|    199|
|         2|            A Lamusi|     1| 23|   170|  60.0|    199|
|         3| Gunnar Nielsen Aaby|     1| 24|     0|   0.0|    273|
|         4|Edgar Lindenau Aabye|     1| 34|     0|   0.0|    278|
|         5|Christine Jacoba ...|     2| 21|   185|  82.0|    705|
+----------+--------------------+------+---+------+------+-------+
only showing top 5 rows

+---------+-----+----------+-------+--------+
|result_id|medal|athlete_id|game_id|event_id|
+---------+-----+----------+-------+--------+
|        1|   NA|         1|     39|       1|
|        2|   NA|         2|     49|       2|
|        3|   NA|         3|      7|       3|
|        4| Gold|         4|      2|       4|
|        5|   NA|         5|     36|       5|
+---

In [59]:
# JOINs with SQL
sqlContext.sql("""
                SELECT  r.medal,
                        c.team_name,
                        c.country_name
                FROM Results AS r
                JOIN Athlete AS a
                ON r.athlete_id = a.athlete_id
                JOIN Countries AS c
                ON c.team_id = a.team_id
                WHERE medal <> 'NA'
                ORDER BY country_name DESC
                """).show()

+------+---------+------------+
| medal|team_name|country_name|
+------+---------+------------+
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Bronze| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|Silver| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
|  Gold| Zimbabwe|         ZIM|
+------+---------+------------+
only showing top 20 rows



In [60]:
spark

## UDFs

In [62]:
!head -n 5 /home/lastorder/Documents/curso-apache-spark-platzi/files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [64]:
# Creating the RDD from a .csv file
athleteErrorRDD = spark.textFile(path+'deportistaError.csv')  \
    .map(lambda l: l.split(','))

In [65]:
# Removing the header of the RDD with the function we defined
athleteErrorRDD = athleteErrorRDD.mapPartitionsWithIndex(removeHeader)

In [66]:
# Now we have our RDD without header
athleteErrorRDD.take(2)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [69]:
# Maping the values of the RDD
# All with string type (l[x])
athleteErrorRDD = athleteErrorRDD.map(lambda l:
                                     (l[0],
                                      l[1],
                                      l[2],
                                      l[3],
                                      l[4],
                                      l[5],
                                      l[6])
                                     )

# Genering the schema

schema = StructType([
    StructField('athlete_id', StringType(), False),
    StructField('name', StringType(), False),
    StructField('gender', StringType(), False),
    StructField('age', StringType(), False),
    StructField('height', StringType(), False),
    StructField('weight', StringType(), False),
    StructField('team_id', StringType(), False),
])

# Creating the dataframe

athleteErrorDF = sqlContext.createDataFrame(athleteErrorRDD, schema)

In [70]:
athleteErrorDF.show()

+----------+--------------------+------+---+------+------+-------+
|athlete_id|                name|gender|age|height|weight|team_id|
+----------+--------------------+------+---+------+------+-------+
|         1|           A Dijiang|     1| 24|   180|    80|    199|
|         2|            A Lamusi|     1| 23|   170|    60|    199|
|         3| Gunnar Nielsen Aaby|     1| 24|      |      |    273|
|         4|Edgar Lindenau Aabye|     1| 34|      |      |    278|
|         5|Christine Jacoba ...|     2| 21|   185|    82|    705|
|         6|     Per Knut Aaland|     1| 31|   188|    75|   1096|
|         7|        John Aalberg|     1| 31|   183|    72|   1096|
|         8|"Cornelia ""Cor""...|     2| 18|   168|      |    705|
|         9|    Antti Sami Aalto|     1| 26|   186|    96|    350|
|        10|"Einar Ferdinand ...|     1| 26|      |      |    350|
|        11|  Jorma Ilmari Aalto|     1| 22|   182|  76.5|    350|
|        12|   Jyri Tapani Aalto|     1| 31|   172|    70|    

In [73]:
# Libraries to create UDFs

from pyspark.sql.functions import udf

# Creating python function
# this function transform to INT a column and return NULL if there's no value.
def convertionINT(value):
    return int(value) if len(value) > 0 else None

# Creating the UDF using the previous python function
convertionINT_udf = udf(lambda z: convertionINT(z), IntegerType())

# To register the UDF in Spark:
# Parameters: the name it will take in Spark the UDF, the UDF function.
sqlContext.udf.register('convertINT_udf', convertionINT_udf)

<function __main__.<lambda>(z)>

In [76]:
# Applying the UDF on our Dataframe to convert a string column to INT column
athleteErrorDF.select(convertionINT_udf('height').alias('height_UDF')).show()

+----------+
|height_UDF|
+----------+
|       180|
|       170|
|      null|
|      null|
|       185|
|       188|
|       183|
|       168|
|       186|
|      null|
|       182|
|       172|
|       159|
|       171|
|      null|
|       184|
|       175|
|       189|
|      null|
|       176|
+----------+
only showing top 20 rows



# Cluster management

In [77]:
# Importing libraries for storage management
from pyspark.storagelevel import StorageLevel

In [78]:
# To check if a RDD o DF are storage in memory we can use:
AthleteMedals2.is_cached

False

In [79]:
# Every time that you make an action, and use a table that is no saving in memory or disk
# Spark run that process to use the values, that means you process will take more time and use more
# processing from our cluster.

# To save in cache a RDD or dataframe:
# We use .rdd because is and primitive function of the RDD structure
AthleteMedals2.rdd.cache()

MapPartitionsRDD[228] at javaToPython at NativeMethodAccessorImpl.java:0

In [80]:
# To see what kind of saving is using for that table, we use getStorageLevel
AthleteMedals2.rdd.getStorageLevel()

# Parameters are: (useDisk, useMemory, useOffHeap, deserialized, replication).
# To check details go to official Spark documentation.

StorageLevel(False, True, False, False, 1)

In [82]:
# To agregrate persistency to our data:

# Before that we need to erase the table from memory, because we already save it in memory:
AthleteMedals2.rdd.unpersist()


# Check the complete list of Storage Levels in Spark Documentation
# Here we want to save the data in memory and disk with replicating the data 2 times.
AthleteMedals2.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[228] at javaToPython at NativeMethodAccessorImpl.java:0

In [84]:
# For create our own partitional Level
# For good practices we want to replicate the data 3 times to minimize duplicity errors (by statistics)
# Parameters are: (useDisk, useMemory, useOffHeap, deserialized, replication).

StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [85]:
# Using the persistance we created

AthleteMedals2.rdd.unpersist()
AthleteMedals2.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[228] at javaToPython at NativeMethodAccessorImpl.java:0