# Repaso de los conceptos básicos de Spark SQL

### Convierte en un Data Frame el siguiente array de tuplas de Python

Es una lista de personas:
    - Id
    - Nombre
    - Edad
    - Color de ojos

In [3]:
import findspark

findspark.init()

In [4]:
from pyspark.sql import *

spark: SparkSession = SparkSession.builder \
    .appName('IntroSparkSQLAlumno') \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config('spark.sql.repl.eagerEval.maxNumRows', 10) \
    .getOrCreate()

In [5]:
listaPersonas = [(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')]

In [6]:
type(listaPersonas)

list

### 1. Sin crear esquema, infiriendo el esquema

In [7]:
dfPersonas = spark.createDataFrame(listaPersonas)

In [8]:
dfPersonas.show()

                                                                                

+---+-------+---+-----+
| _1|     _2| _3|   _4|
+---+-------+---+-----+
|123|  Katie| 19|brown|
|234|Michael| 22|green|
|345| Simone| 23| blue|
+---+-------+---+-----+



In [11]:
dfPersonas

_1,_2,_3,_4
123,Katie,19,brown
234,Michael,22,green
345,Simone,23,blue


In [12]:
dfPersonas.display()

AttributeError: 'DataFrame' object has no attribute 'display'

### 2. Creando un esquema previamente

In [16]:
from pyspark.sql.types import *

In [17]:
esquema = StructType([
    StructField("id", LongType(), True),    
    StructField("nombre", StringType(), True),
    StructField("edad", IntegerType(), True),
    StructField("colorOjos", StringType(), True)
])

In [18]:
dfPersonasConEsquema = spark.createDataFrame(listaPersonas,esquema)

In [21]:
dfPersonasConEsquema

id,nombre,edad,colorOjos
123,Katie,19,brown
234,Michael,22,green
345,Simone,23,blue


### 3. Modifica ahora el esquema anterior y el array inicial para añadir una columna que almacene el peso de la persona en formato Integer

In [32]:
esquemaPeso = StructType([
    StructField("id", LongType(), True),    
    StructField("nombre", StringType(), True),
    StructField("edad", ByteType(), True),
    StructField("colorOjos", StringType(), True),
    StructField("peso", IntegerType(), True)
])

In [33]:
listaPersonasPeso = [(123, 'Katie', 19, 'brown', 50), (234, 'Michael', 22, 'green', 68), (345, 'Simone', 23, 'blue', 90)]

In [34]:
dfPersonasPeso = spark.createDataFrame(listaPersonasPeso,esquemaPeso)
#Llama al método read de la sesión de spark para crear el Data Frame como hicimos previamente

In [27]:
dfPersonasPeso.show()
dfPersonasPeso.printSchema()

+---+-------+----+---------+----+
| id| nombre|edad|colorOjos|peso|
+---+-------+----+---------+----+
|123|  Katie|  19|    brown|  50|
|234|Michael|  22|    green|  68|
|345| Simone|  23|     blue|  90|
+---+-------+----+---------+----+

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: byte (nullable = true)
 |-- colorOjos: string (nullable = true)
 |-- peso: integer (nullable = true)



### Consultas básicas

### 1. Selecciona las columnas nombre y peso

In [36]:
#Pista: Usa el método select(columna1, columna2,...) sobre el Data Frame dfPersonasPeso
#dfPersonasPeso.select('Nombre', 'Peso')
dfPersonasPeso.select(dfPersonasPeso['nombre'],dfPersonasPeso['peso'])

nombre,peso
Katie,50
Michael,68
Simone,90


### 2. Selecciona las columnas 2 y 5

In [39]:
dfPersonasPeso.select(dfPersonasPeso[1],dfPersonasPeso[4])

nombre,peso
Katie,50
Michael,68
Simone,90


### 3. Renombra la columna peso a pesoKg

In [44]:
#Pista: Usa el método withColumnRenamed(nombreCol, nuevoNombreCol)
dfPersonasPeso.withColumnRenamed('peso', 'pesoKg')

id,nombre,edad,colorOjos,pesoKg
123,Katie,19,brown,50
234,Michael,22,green,68
345,Simone,23,blue,90


### 4. Añade una columna "edadXpeso" resultado de multiplicar la edad por el peso

In [47]:
from pyspark.sql.functions import expr

In [48]:
dfPersonasPeso.withColumn('edadXPeso',expr('edad * peso'))

id,nombre,edad,colorOjos,peso,edadXPeso
123,Katie,19,brown,50,950
234,Michael,22,green,68,1496
345,Simone,23,blue,90,2070


### 5. Obten las personas que tengan más de 20 años

In [49]:
#Pista: Usa el método where (o filter, que es equivalente) para indicar la condición de dfPersonasPeso['edad'] > 20
dfPersonasPeso.filter(dfPersonasPeso['edad'] > 20)

id,nombre,edad,colorOjos,peso
234,Michael,22,green,68
345,Simone,23,blue,90


### 6. Genera un nuevo Data Frame que solo tenga las personas que tengan más de 20 años y pesen menos de 90 Kg

In [56]:
#Pista: Para que sea más sencillo encadena dos filtros Where, uno por condición
dfRes = dfPersonasPeso.select("*").filter(dfPersonasPeso['edad'] > 20).filter(dfPersonasPeso['peso'] < 90)
dfRes

id,nombre,edad,colorOjos,peso
234,Michael,22,green,68


### 7. Ordena dfPersonasPeso por Edad

In [57]:
#Pista: Usa el método df.orderBy(col)
dfPersonasPeso.orderBy('edad')

id,nombre,edad,colorOjos,peso
123,Katie,19,brown,50
234,Michael,22,green,68
345,Simone,23,blue,90


### 8. Ordena dfPersonasPeso por Edad descendente

In [58]:
#Pista: Sobre la columna a ordenar aplica el método .desc()
dfPersonasPeso.orderBy('edad', ascending = False)

id,nombre,edad,colorOjos,peso
345,Simone,23,blue,90
234,Michael,22,green,68
123,Katie,19,brown,50


### 9. Crea un segundo Data Frame con el mismo esquema que dfPersonasPeso y con 3 personas nuevas + 1 con los mismos datos que otra que exista en listaPersonasPeso (copia ese elemento en la nueva lista).

In [66]:
listaPersonasPeso2 = [(235, 'Sheila', 19, 'brown', 51), (734, 'Alicia', 22, 'green', 68), (145, 'Pedro', 23, 'blue', 84), (123, 'Katie', 19, 'brown', 50)]

In [67]:
dfPersonasPeso2 = spark.createDataFrame(listaPersonasPeso2,esquemaPeso)
#crear el data frame a partir de la lista anterior y el esquema

In [68]:
dfPersonasPeso2

id,nombre,edad,colorOjos,peso
235,Sheila,19,brown,51
734,Alicia,22,green,68
145,Pedro,23,blue,84
123,Katie,19,brown,50


### 10. Concatena los datos (filas) de dfPersonasPeso2 con dfPersonasPeso generando un nuevo DF con nombre dfMasPersonas y que incluya todas las filas de ambos

In [69]:
dfMasPersonas = dfPersonasPeso2.union(dfPersonasPeso)

In [70]:
dfMasPersonas

id,nombre,edad,colorOjos,peso
235,Sheila,19,brown,51
734,Alicia,22,green,68
145,Pedro,23,blue,84
123,Katie,19,brown,50
123,Katie,19,brown,50
234,Michael,22,green,68
345,Simone,23,blue,90


### 11. Haz un recuento de filas del DF dfMasPersonas aplicando df.count()

In [71]:
#Pista, usa el método count()
dfMasPersonas.count()

7

### 12. Elimina del dfPersonasPeso las filas duplicadas y genera un nuevo DF dfSinDup

In [74]:
#Pista, usa el método distinct()
dfSinDup = dfMasPersonas.distinct()
dfSinDup

                                                                                

id,nombre,edad,colorOjos,peso
123,Katie,19,brown,50
234,Michael,22,green,68
145,Pedro,23,blue,84
235,Sheila,19,brown,51
345,Simone,23,blue,90
734,Alicia,22,green,68


### 13. Haz un recuento de filas del DF dfSinDup aplicando df.count()

In [75]:
dfSinDup.count()

                                                                                

6