# Trabajando con Dataframes

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Dataframes") \
        .master('local[*]') \
        .getOrCreate()

#spark = SparkSession.builder.appName("Dataframes") \
#        .getOrCreate()

In [None]:
spark

## Creación 

In [None]:
data = [('Alicia', 25), ('Bob', 40), ("Charlie",35)]

In [None]:
columns = ['Name', 'Age']

In [None]:
# El sistema infiere el esquema
df = spark.createDataFrame(data, columns)
type(df)

In [None]:
df.printSchema()

df.dtypes

In [None]:
# Le puedo proporcionar el esquema
schema = 'Name: string, Age: int'
df = spark.createDataFrame(data, schema)

In [None]:
df.printSchema()

## Contenido

In [None]:
df.show()

In [None]:
df.take(2)

In [None]:
df.head()

In [None]:
df.tail(2)

### Podemos obtener un resumén estadístico del **DataFrame**

In [None]:
# Resumen estadístico (el método describe regresa un nuevo DataFrame)
df.describe().show()

## Particiones

In [None]:
df.rdd.getNumPartitions()

In [None]:
#res = df.repartition(4)
#res.rdd.getNumPartitions()

## Seleccionar columnas

### Seleccionar una columna

In [None]:
df.Age

In [None]:
df['Name']

### Crear **DataFrame** a partir de una columna

In [None]:
df.select('Name').show()

In [None]:
df.select(df.Age).show()

### Crear **DataFrame** a partir de multiples columnas (observa como escogí el orden de columnas distinto al original)

In [None]:
df.select('Age', 'Name').show()

### Seleccionar todas las columnas en un solo paso

In [None]:
df.select('*').show()

### Dado que el resultado de `select` es un nuevo **DataFrame**, podemos obtener una descripción estadística de columnas deseadas

In [None]:
df.select(df.Age).describe().show()

## Filtrado

#### Utilizando una columna y una condición

In [None]:
df.filter(df.Age > 30).show()

#### Utilizando un `string` con el nombre de una columna y una condición

In [None]:
df.filter('Age > 30').show()

#### El statement `where` es un alias para `filter`

In [None]:
df.where(df['Age'] > 30).show()

#### Podemos emplear los operadores lógicos `and` (`&`), `or` (`|`) y `not` (`~`)

In [None]:
df.where( (df.Age > 30) & (df.Age < 40) ).show()

In [None]:
df.where( (df.Age < 30) | (df.Age > 36) ).show()

In [None]:
df.where( ~( (df.Age < 30) | (df.Age > 36) ) ).show()

In [None]:
df.where( ~(df.Name == 'Bob') ).show()

#### También podemos utilizar expresiones utilizando la sintaxis de SQL

In [None]:
df.filter( 'Age > 30 AND Age < 40' ).show()

In [None]:
df.where( 'Age < 30 OR Age > 36' ).show()

In [None]:
df.where( 'NOT (Name = "Bob")' ).show()

## Modificar

### Renombrar

In [None]:
# Renombrar una columna
df.withColumnRenamed('Name', 'name').show()

In [None]:
# Renombrar multiples columnas
df.withColumnsRenamed({'Name':'name', 'Age':'age'}).show()

### Eliminar

#### Para eliminar una o múltiples columnas

In [None]:
df.drop("Age").show()

In [None]:
df.drop('Name', 'Age').show()

### Lidiar con valores `NULL`

In [None]:
data = [('Gilberto', 22, 'Informatico'), 
        ('Raul', None, 'Ingeniero'), 
        ('Karla', 25, 'Abogada'), 
        ('Rocio', None, None), 
        (None, None, None),
        ('Perla', 10, None)]
schema = 'name: string, age: int, carrera: string'

new_df = spark.createDataFrame(data, schema=schema)
new_df.show()

#### Eliminar renglones con valores nulos (`NULL`)

##### Eliminar aquellos con al menos un valor `NULL`

In [None]:
new_df.na.drop().show()

##### Eliminar cuando todos los valores sean `NULL`

In [None]:
new_df.na.drop(how = 'all').show()

##### Eliminar cuando el número de valores no nulos sea menor a un limite deseado; en otras palabras asegurar que contemos con un número mínimo de valores (2 en este caso)

In [None]:
# Al menos dos valores no nulos
new_df.na.drop(thresh = 2).show()

##### Asegurarse que no haya valores nulos en columnas deseadas

In [None]:
# Solo los renglones donde name y age no sean nulos
new_df.na.drop(subset = ['name', 'age']).show()

#### Reemplazar valores `NULL`

##### Reemplazar en todas las columnas de tipo `string` con el valor 'unknown'

In [None]:
new_df.na.fill('unknown').show()

##### Reemplazar en todas las columnas numéricas con `-1`

In [None]:
new_df.na.fill(-1).show()

##### Reemplazar en las columnas deseadas

In [None]:
new_df.na.fill({'age':-1, 'carrera':'unknown'}).show()

### Modificar el tipo de datos de columnas

#### Modifiquemos la columna `df.Age` de **integer** a **float**

In [None]:
# DataFrame original
df.printSchema()
df.show()

In [None]:
# Nuevo DataFrame con schema deseado
from pyspark.sql.types import FloatType
new_df = df.withColumn('Age', df.Age.cast(FloatType()))

new_df.printSchema()
new_df.show()

### Agregar una columna con un valor constante

In [None]:
from pyspark.sql.functions import lit

new_df.withColumn("ConstantValue", lit(100)).show()

## Aplicar una función

### Combinar matemáticamente columnas mediante un string con la función deseada

In [None]:
from pyspark.sql.functions import expr
import numpy as np

In [None]:
data = [(3.0, 1.0, 0.0), (2.0, 2.0, np.pi/2), (4.0, 0.5, np.pi)]
schema = 'amp:float, omega:float, phase:float'

waves = spark.createDataFrame(data, schema)

In [None]:
t = 1
exp = f'amp * cos( omega * {t} + phase)'

In [None]:
exp

### El resultado se puede almacenar en una columna nueva o en una ya existente (en este caso una columna nueva llamada `value`)

In [None]:
waves.withColumn('value', expr(exp)).show()

### Checando que el resultado sea el correcto

In [None]:
t = 1
mat = np.array(data)
A = mat[:,0]; w = mat[:,1]; phi = mat[:,2]
A * np.cos(w * t + phi)

## Agrupar y agregación

In [None]:
df.groupBy?

In [None]:
df = spark.createDataFrame( [("Roberto", 15), ("Ana", 6), 
                             ("Roberto", 10), ("Ana", 6), 
                             ("Ana", 15)], 
                            'name:string, age:int')
df.show()

### La reducción en el paradigma Mapa-Reducción se puede obtener mediante `groupBy`.

#### Se da primero la llave para que se realize la agrupación y, posteriormente, se llama la función de reducción (la cual aquí se llama agregación)

In [None]:
df.groupBy('name').count().show()

In [None]:
df.groupBy('name').sum().show()

### Existen varias operaciones útiles

In [None]:
df.groupBy('name').avg().show()

df.groupBy('name').max().show()

df.groupBy('name').min().show()

In [None]:
from pyspark.sql.functions import std

In [None]:
df.groupby(df.name).agg({'age':'std'}).show()

### Podemos realizar varias agregaciones a la vez

In [None]:
from pyspark.sql import functions as F

In [None]:
df.groupby(df.name).agg(F.avg(df.age), F.max(df.age), F.min(df.age)).show()

### La llave es quién yo eliga

In [None]:
df.groupBy('age').count().show()

### Continuando con la analogía con llaves en Mapa-Reduccion, aquí podemos generar llaves con más de una columna, es decir, agrupar cuando dos columnas toman el mismo valor 

In [None]:
df.groupBy(['name', 'age']).count().show()

### También podemos llamar a las columnas mediante su número de columna; en este caso ¡las columnas empiezan con el número uno, no el cero!

In [None]:
df.groupBy(1).sum().show()

df.groupby([1, 2]).count().show()

df.groupBy([df.name, 2]).count().show()

### Si lo preferimos podemos utilizar la sintaxis de **SQL**

#### Primero registramos el **DataFrame** que queremos utilizar con `spark.sql`

In [None]:
# Registramos una vista temporal del DataFrame con el nombre que deseemos 
# (df_sql en este caso)
df.createOrReplaceTempView("df_sql")

#### Posteriormente podemos seleccionar columnas y aplicar filtros con **SQL**

In [None]:
spark.sql("SELECT name from df_sql").show()

In [None]:
spark.sql("SELECT age,name from df_sql").show()

In [None]:
spark.sql("SELECT * from df_sql").show()

In [None]:
spark.sql("SELECT * from df_sql WHERE age > 7").show()

#### También podemos aplicar expresiones a columnas empleando la sintaxis de **SQL**

In [None]:
df.selectExpr("3 * age", "2 * abs(age) as twice_age", "name").show()

In [None]:
df.selectExpr("name", "age <= 10 as kid").show()

## Ordenando por columnas

In [None]:
df.orderBy('age').show()

In [None]:
df.orderBy(df.age.desc()).show()

In [None]:
df.orderBy('age', 'name').show()

## Aplicaciones de **ML** 

In [None]:
data = [("Frankestein", 1.5, 0.8, 3.4, 9), 
        ("Alice", 2.9, 4.1, 0.4, 10),
        ("Beowulf", 0.8, 1.7, 2.4, 9),
        ("Moby_Dick", 3.9, 0.1, 2.6, 8),
        ("Romeo_Juliet", 0.9, 2.1, 4.4, 10),
        ("Dorian_Gray", 4.0, 1.2, 2.4, 8),
        ("Pride_Prejudice", 0.5, 0.3, 1.4, 7),
        ("Two_Cities", 5.9, 2.6, 2.1, 9)]

schema = "book: string, v1:float, v2:float, v3:float, value:int"

df_books = spark.createDataFrame(data, schema=schema)
df_books.show()

###  Crear vectores **feature** (mezclar múltiples columnas en un vector)

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
columns_to_mix = ['v1', 'v2', 'v3']
vec_assembler = VectorAssembler(inputCols=columns_to_mix, 
                                outputCol='features')

type(vec_assembler)

In [None]:
df_with_feat = vec_assembler.transform(df_books)
df_with_feat.show()

### Puedo almacenar la columna en un nuevo **DataFrame** aunque no será necesario para aplicar un modelo de **ML**

In [None]:
# cada renglon representa un vector feature
X = df_with_feat.select('features')
X.show(truncate=False)

### En **ML** se require dividir aleatoriamente los datos en: datos de entrenamiento (**train set**) y datos de prueba (**test set**). Los **DataFrames** cuentan con una función para lograr esto

In [None]:
#El seed es opcional 
train_set, test_set = df_with_feat.randomSplit([0.8, 0.2], seed=50)

In [None]:
train_set.show()
test_set.show()

### Es posible aplicar modelos de **ML** siempre y cuando instalemos el modulo correspondiente ()

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
model = LinearRegression(featuresCol='features', labelCol='value')

In [None]:
trained_model = model.fit(train_set)

In [None]:
print(model.coefficients)
print(model.intercept)

In [None]:
model.evaluate(test_set).predictions

In [None]:
spark.stop()