En esta práctica veremos los siguientes elementos usando pyspark
1) Crear DataFrames usando:  
  a) Matrices  
  b) Diccionarios (Veremos dos formas de crear, usando columnas como diccionarios o usando diccionario para armar tabla estructurada)  
2) Trabajaremos con union all de dataframes  
3) Leeremos un archivo y lo convertiremos en un DataFrame  
4) Veremos como mostrar el dataFrame y como limitar lo que estoy visualizando. (Explicaremos brevemente la forma en la que Spark ejecuta)  
5) Trabajaremos con la manipulación de datos:  
  a) Aprenderemos a filtrar DataFrames  
  b) Crear Columnas 
  c) Renombrar columnas 
  d) Revisaremos el módulo functions de pyspark  
  e) Trabajaremos agrupando DataFrames y creando sumarizaciones  

In [0]:

from pyspark.sql import SparkSession #Para poder trabajar con Spark vamos a tener que levantar una sesión
import pyspark.sql.functions as f #Esta línea es muy importante ya que aqui se encuentran todas las funciones que vamos a poder usar
from pyspark.sql.types import IntegerType

In [0]:
#Creo la Sesion
spark = SparkSession.builder.appName('abc').getOrCreate()

In [0]:
#Como vimos con Pandas, puedo crear un DataFrame usando una matriz de datos
matrix = [["Python", 1], ["php", 2], ["java", 3]]
lenguajes = spark.createDataFrame(data=matrix, schema = ["lenguaje","ranking"])
lenguajes.printSchema()
lenguajes.show()


root
 |-- lenguaje: string (nullable = true)
 |-- ranking: long (nullable = true)

+--------+-------+
|lenguaje|ranking|
+--------+-------+
|  Python|      1|
|     php|      2|
|    java|      3|
+--------+-------+



In [0]:
dataDictionary = [
    {"lenguaje": "javascript", "ranking":4},
    {"lenguaje": "R", "ranking":5}
    ]
lenguajes2 = spark.createDataFrame(data=dataDictionary)
lenguajes2.printSchema()
lenguajes2.show()


root
 |-- lenguaje: string (nullable = true)
 |-- ranking: long (nullable = true)

+----------+-------+
|  lenguaje|ranking|
+----------+-------+
|javascript|      4|
|         R|      5|
+----------+-------+



In [0]:
#Ahora lo que quiero hacer es contatenar ambos dataFrames, usando union all
from functools import reduce
from pyspark.sql import DataFrame

display(reduce(DataFrame.unionAll, [lenguajes, lenguajes2]))

lenguaje,ranking
Python,1
php,2
java,3
javascript,4
R,5


In [0]:
#Ahora quiero crear un DataFrame con valores anidados, usando un diccionario
peopleDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'red','eye':'grey'}),
        ('Jefferson',{'hair':'red','eye':''})
        ]
df = spark.createDataFrame(data=peopleDictionary, schema = ["name","properties"])
df.printSchema()
display(df)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



name,properties
James,"Map(eye -> brown, hair -> black)"
Michael,"Map(eye -> null, hair -> brown)"
Robert,"Map(eye -> black, hair -> red)"
Washington,"Map(eye -> grey, hair -> red)"
Jefferson,"Map(eye -> , hair -> red)"


In [0]:
#usando la funcion getItem puedo sacar elementos de una columna tipo diccionario
df.withColumn("hair", df.properties.getItem("hair")).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-4145016560760976>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mproperties[49m[38;5;241;43m.[39;49m[43mgetItem[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhair[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mshow[49m[43m([49m[43m)[49m

[0;31mTypeError[0m: 'Column' object is not callable

In [0]:
#Leo una tabla de un fileStore
file_location = "/FileStore/tables/titanic/titanic_train.txt"
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load(file_location)



In [0]:
#Para visualizar una base puedo usar dos métodos
#1) Usar la funcion display
#2) Usar la funcion show

display(df)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In [0]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [0]:
#El head() de pandas en pyspark es .limit
display(df.limit(10))

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In [0]:
#Hago un describe de la base
display(df.describe())

summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
max,99.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,9.0,8.0,6.0,WE/P 5735,93.5,T,S


In [0]:
#Para realizar filtros usamos funcion filter
clase3 = df.filter(f.col("Pclass") == 3)
#Como veran al ejecutar la notebook no van a ver la base. Esto se debe a la forma de procesar Spark


In [0]:
clase3.limit(3).show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171| 7.25| null|       S|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|7.925| null|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450| 8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-----+-----+--------+



In [0]:
#Puedo hacer un count de la base
df.count()

Out[64]: 891

In [0]:
display(df.groupBy(f.col("Pclass")).count())

Pclass,count
3,491
1,216
2,184


In [0]:
#Ahora quiero crear un nuevo campo en el dataset, que contenga por ejemplo un valor físico (cantidad de registros de la base)
total = df.count()
df = df.withColumn("totalDF", f.lit(total))
display(df)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,totalDF
1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,891
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38.0,1,0,PC 17599,71.2833,C85,C,891
3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,891
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,891
5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,891
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q,891
7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S,891
8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S,891
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S,891
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C,891


In [0]:
#ahora quiero renombrar el campo que acabamos de crear y asignarle el nombre totalTitanic
#uso funcion withColumnRenamed("nombreViejo", "nombreNuevo")
df = df.withColumnRenamed("totalDF", "totalTitanic")
display(df.limit(4))

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,totalTitanic
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S,891
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C,891
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S,891
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S,891


In [0]:
#Quiero seleccionar un subset de columnas
display(df.select(f.col("PassengerId"), f.col("Sex"), f.col("Age")))

PassengerId,Sex,Age
1,male,22.0
2,female,38.0
3,female,26.0
4,female,35.0
5,male,35.0
6,male,
7,male,54.0
8,male,2.0
9,female,27.0
10,female,14.0


In [0]:
dataSetAgeSex = df.select(f.col("PassengerId"), f.col("Sex"), f.col("Age"))
dataSetAgeSex = dataSetAgeSex.groupBy(f.col("Sex")).mean("Age").withColumnRenamed("mean(Age)", "meanAge")
display(dataSetAgeSex)

#Tirará error porque el tipo de datos definido es String 

In [0]:
#Vamos a cambiar el tipo de datos

dataSetAgeSex = dataSetAgeSex.withColumn("Age", dataSetAgeSex["Age"].cast(IntegerType()))
dataSetAgeSex = dataSetAgeSex.groupBy(f.col("Sex")).mean("Age").withColumnRenamed("avg(Age)", "meanAge")
display(dataSetAgeSex)


Sex,meanAge
female,27.90421455938697
male,30.70198675496689


In [0]:
#Ahora vamos a hacer un Join entre df y dataSetAgeSex
df = df.join(dataSetAgeSex, on="Sex", how="inner")
display(df.limit(10))

Sex,PassengerId,Survived,Pclass,Name,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,totalTitanic,meanAge
male,1,0,3,"Braund, Mr. Owen Harris",22.0,1,0,A/5 21171,7.25,,S,891,30.70198675496689
female,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",38.0,1,0,PC 17599,71.2833,C85,C,891,27.90421455938697
female,3,1,3,"Heikkinen, Miss. Laina",26.0,0,0,STON/O2. 3101282,7.925,,S,891,27.90421455938697
female,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0,1,0,113803,53.1,C123,S,891,27.90421455938697
male,5,0,3,"Allen, Mr. William Henry",35.0,0,0,373450,8.05,,S,891,30.70198675496689
male,6,0,3,"Moran, Mr. James",,0,0,330877,8.4583,,Q,891,30.70198675496689
male,7,0,1,"McCarthy, Mr. Timothy J",54.0,0,0,17463,51.8625,E46,S,891,30.70198675496689
male,8,0,3,"Palsson, Master. Gosta Leonard",2.0,3,1,349909,21.075,,S,891,30.70198675496689
female,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",27.0,0,2,347742,11.1333,,S,891,27.90421455938697
female,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",14.0,1,0,237736,30.0708,,C,891,27.90421455938697


In [0]:
#Ahora vamos a imputar los valores nulos de Age
df = df.withColumn("Age", df["Age"].cast(IntegerType()))#Cambio primero el tipo de datos a Int
df = df.withColumn("Age", f.when(f.col("Age").isNull(), f.col("meanAge")).otherwise(f.col("Age")))
display(df.limit(8))

Sex,PassengerId,Survived,Pclass,Name,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,totalTitanic,meanAge
male,1,0,3,"Braund, Mr. Owen Harris",22.0,1,0,A/5 21171,7.25,,S,891,30.70198675496689
female,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",38.0,1,0,PC 17599,71.2833,C85,C,891,27.90421455938697
female,3,1,3,"Heikkinen, Miss. Laina",26.0,0,0,STON/O2. 3101282,7.925,,S,891,27.90421455938697
female,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0,1,0,113803,53.1,C123,S,891,27.90421455938697
male,5,0,3,"Allen, Mr. William Henry",35.0,0,0,373450,8.05,,S,891,30.70198675496689
male,6,0,3,"Moran, Mr. James",30.70198675496689,0,0,330877,8.4583,,Q,891,30.70198675496689
male,7,0,1,"McCarthy, Mr. Timothy J",54.0,0,0,17463,51.8625,E46,S,891,30.70198675496689
male,8,0,3,"Palsson, Master. Gosta Leonard",2.0,3,1,349909,21.075,,S,891,30.70198675496689


In [0]:
#Ahora vamos a eliminar dos columnas, totalTitanic y meanAge
df = df.drop("totalTitanic", "meanAge")
display(df.limit(10))

Sex,PassengerId,Survived,Pclass,Name,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
male,1,0,3,"Braund, Mr. Owen Harris",22.0,1,0,A/5 21171,7.25,,S
female,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",38.0,1,0,PC 17599,71.2833,C85,C
female,3,1,3,"Heikkinen, Miss. Laina",26.0,0,0,STON/O2. 3101282,7.925,,S
female,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0,1,0,113803,53.1,C123,S
male,5,0,3,"Allen, Mr. William Henry",35.0,0,0,373450,8.05,,S
male,6,0,3,"Moran, Mr. James",30.70198675496689,0,0,330877,8.4583,,Q
male,7,0,1,"McCarthy, Mr. Timothy J",54.0,0,0,17463,51.8625,E46,S
male,8,0,3,"Palsson, Master. Gosta Leonard",2.0,3,1,349909,21.075,,S
female,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",27.0,0,2,347742,11.1333,,S
female,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",14.0,1,0,237736,30.0708,,C


In [0]:
#Usando la funcion select podemos hacer muchas cosas, cómo por ejemplo hacer un count de nulos
#Para esto vamos a necesitar:
#1) funcion f.count (nos contara los registros)
#2) f.when (la usaremos para traer solo valores que estan en nulos)
#3) f.col (traer la columna que queremos)
#4) hacemos un loop en una linea usando listas [x for x in iterador]
df.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+-----------+--------+------+----+---+-----+-----+------+----+-----+--------+
|Sex|PassengerId|Survived|Pclass|Name|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+---+-----------+--------+------+----+---+-----+-----+------+----+-----+--------+
|  0|          0|       0|     0|   0|  0|    0|    0|     0|   0|  687|       2|
+---+-----------+--------+------+----+---+-----+-----+------+----+-----+--------+



In [0]:
#Vamos a usar la funcion agg para poder crear muchas agregaciones al mismo tiempo
#Vamos a usar las funciones:
#1) f.min y f.max, nos darán los mínimos y máximos del campo que deseamos
#2) f.alias, esta función servirá para renombrar el campo 
display(df.groupBy("Sex").agg(f.min(f.col("Age")).alias("minAge"), f.max(f.col("Age")).alias("maxAge"), 
                      f.median(f.col("Age")).alias("medianAge"), f.mean("Fare").alias("meanFare"),
                      f.mean(f.col("Age"))))



Sex,minAge,maxAge,medianAge,meanFare,avg(Age)
female,0.0,63.0,27.90421455938697,44.47981783439487,27.90421455938696
male,0.0,80.0,30.70198675496689,25.523893414211415,30.70198675496684


In [0]:
#Crear un row_number como lo vemos en SQL
from pyspark.sql.window import *
windowSpec = Window.partitionBy("Sex").orderBy(f.desc("Age"))
nuevaDF = df.withColumn("rank", f.row_number().over(windowSpec))\
    .filter(f.col("rank") == 1)
display(nuevaDF)
