# DF CONCEPTOS PRINCIPALES

Los conceptos core son aquellos que nos permiten realizar transformaciones con metodos y funcionalidades

In [5]:
# definir sesión de spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sesion007").getOrCreate()

## DF READER

DataFrameReader es una API fluida para describir la fuente de datos de entrada que se utilizará para "cargar" datos desde una fuente de datos externa (por ejemplo, archivos, tablas, JDBC o Dataset[String])

sintaxis:
```python
spark.read.[options(key, value)].format
```
references: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html



In [None]:
# lectura desde pandas
import pandas as pd

pd.read_csv("https://gitlab.com/luisvasv/public/-/raw/master/datasets/004.mock.data/001.dependents.csv")

In [None]:

from pyspark import SparkFiles
# usando spark

# csv referencia: https://spark.apache.org/docs/latest/sql-data-sources-csv.html

url: str = "https://gitlab.com/luisvasv/public/-/raw/master/datasets/004.mock.data/001.dependents.csv"
spark.sparkContext.addFile(url)

spark_uri: str = "file://"+SparkFiles.get("001.dependents.csv")
print(spark_uri)
# Nota: para datos almacenados en en file store no son necesario este paso

In [None]:
# forma 1
spark.read.option("sep", ",").option("header", True).csv(spark_uri)


In [None]:
# forma 2
spark.read.csv(spark_uri, header=True, sep=",")

In [None]:
# forma 3
spark.read.format("csv").option("sep", ",").option("header", True).load(spark_uri)

In [None]:
# configurando esquema forma #1
spark.read.option("sep", ",").option("header", True).option("inferSchema", True).csv(spark_uri)


In [None]:
# configurando esquema forma #2
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email_dependents", StringType(), True),
    StructField("age", IntegerType(), True)
])

spark.read.csv(spark_uri, header=True, sep=",", schema=schema)

In [None]:
# configurando esquema forma #3
spark.read.schema(schema).format("csv").option("sep", ",").option("header", True).load(spark_uri)

In [None]:
# configurando esquema forma #4
sql_ddl = "first_name STRING, last_name STRING, email_dependents STRING, age INT"
spark.read.schema(sql_ddl).format("csv").option("sep", ",").option("header", True).load(spark_uri)

## DF WRITER
DataFrameWriter es una API en PySpark que proporciona métodos para escribir datos de un DataFrame en varios formatos de almacenamiento. Permite guardar los datos en sistemas de archivos, bases de datos y otros almacenes de datos compatibles. 



In [None]:
# datos para evaluar
df_from_uri = spark.read.schema(sql_ddl).format("csv").option("sep", ",").option("header", True).load(spark_uri)
df_from_uri.show()

In [None]:
# 1. forma elemental

df_from_uri.write.csv("file:///tmp/unal.csv", header=False)

In [None]:
%sh 
ls /tmp/unal.csv

In [None]:
%sh
cat /tmp/unal.csv/part-00000-tid-8222788517052140540-21f50e54-2c5c-4aaf-98bf-e285e6434977-7-1-c000.csv

In [None]:
# 2. forma básica

df_from_uri.write.format("csv").option("sep", ";").option("header", True).save("file:///tmp/unal2.csv")

In [None]:
# 3. forma para guardar como una tabla
# overwrite = Sobrescribe cualquier tabla existente con el mismo nombre. Si la tabla no existe, la crea.
# append: Añade datos a la tabla existente. Si la tabla no existe, la crea.

modo: str = "overwrite"
df_from_uri.write.mode(modo).saveAsTable("unal.users")

In [None]:
%sql

show databases;

In [None]:
%sql
describe unal.users;

## TRANSFORMACIONES CORE

as transformaciones de DataFrame en PySpark se refieren a las operaciones que se pueden aplicar a un DataFrame para modificar, filtrar, agregar o manipular los datos de alguna manera. Las transformaciones en PySpark son operaciones lazy, lo que significa que no se ejecutan de inmediato cuando se definen, sino que se aplican de manera acumulativa y se ejecutan solo cuando se llama a una acción que requiere resultados concretos (como `show()`,` count()`, `collect()`, etc.).

Las transformaciones tambien pueden verse como:

<img 
     src="https://i.postimg.cc/tgJPjJ3s/001-clase.png" 
     alt="reporte" 
     border="0"
/>



Nota: En la siguiente sección nos centraremos en cargar datos desde una tabla, ya que de ofros formatos fue visto previamente


### CARGAR DESDE UNA BD

Se enfoca en esta parte, ya que es uno de los usos mas comunes

#### TABLE

In [None]:
# selecciona la tabla
spark.table("unal.users")

#### READ

In [None]:
spark.read.table("unal.users")

#### SQL

In [None]:
spark.sql("SELECT * FROM unal.users")

### SELECCIONAR DATOS

#### TABLE

In [None]:
# cargamos la tabla de usuarios

df = spark.table("unal.users")

#### SELECT

In [None]:
# forma 
df.select("first_name", "age").show()

#### TIPO PANDAS

In [None]:
df[["first_name", "age"]].show()

#### TIPO ATRIBUTO

In [None]:
df.select(df.first_name, df.age).show()

#### COL

se usa principalmente para:

- seleccionar
- alias
- crear nuevas columnas
- castear datos
- filtrado de datos

In [None]:
from pyspark.sql.functions import col

In [None]:
df.select(col("first_name"), col("age")).show()

#### ALIAS

In [None]:
df.select(col("first_name").alias("user_full_name"), col("age")).show()

#### CAST

In [None]:
df.select(
    col("first_name").alias("user_full_name"),
    col("age").cast("float").alias("older")
).printSchema()

### FILTRANDO DATOS

#### WHERE

In [None]:
df.where(col("age") > 12).show()

#### FILTER SIMPLE

In [None]:
df.filter(col("age") > 12).show()

#### FILTER MULTIPLE (IGUAL PARA WHERE)

En PySpark, los filtros se aplican a DataFrames para seleccionar filas que cumplan ciertas condiciones. Para construir filtros más complejos, se utilizan operadores lógicos como `AND`, `OR` y `NOT`. Estos operadores permiten combinar expresiones booleanas para especificar criterios de filtrado más detallados.

Nota: al tener condiciones multiples, cada una se debe almacenar en `()`

##### AND (&)

In [None]:
df.where((col("age") > 15) & (col("first_name") == "Alvis")).show()


##### OR (|)

In [None]:
df.where((col("age") > 15) | (col("first_name") == "Alvis")).show()

##### NOT(~)



In [None]:
df.filter(~(col("age") > 20)).show()

### AGRUPANDO DATOS

In [None]:
from pyspark.sql.functions import sum, max, min, avg, count

data = [
  ("producto_A", 10, 100),
  ("Producto_B", 5, 80),
  ("producto_A", 7, 120),
  ("Producto_B", 3, 90),
  ("producto_A", 15, 110)
]
columns = ["producto", "cantidad", "precio"]
dfg = spark.createDataFrame(data, columns)

In [None]:
dfg.groupBy("producto").agg(sum("cantidad").alias("cantidad_total"), sum("precio").alias("pretio_total")).show()

In [None]:
# analisis por producto
dfg.groupBy("producto") \
    .agg(sum("cantidad").alias("cantidad_total"), 
    avg("precio").alias("precio_promedio"),
    min("precio").alias("precio_promedio"),
    max("precio").alias("precio_maximo"),
    count("producto").alias("total_ventas")
).show()

### TRANSFORMACIONES DE DATOS

## Funciones de Agregación
- `avg(column)`: Calcula el promedio de los valores de una columna.
- `count(column)`: Cuenta el número de valores en una columna.
- `max(column)`: Encuentra el valor máximo en una columna.
- `min(column)`: Encuentra el valor mínimo en una columna.
- `sum(column)`: Suma los valores de una columna.

## Funciones de Cadena
- `concat(*cols)`: Concatena varias columnas en una sola columna de cadena.
- `lower(column)`: Convierte los valores de una columna a minúsculas.
- `upper(column)`: Convierte los valores de una columna a mayúsculas.
- `substr(column, start, length)`: Extrae una subcadena de una columna de cadena.
- `length(column)`: Devuelve la longitud de la cadena en una columna.

## Funciones de Fecha y Hora
- `current_date()`: Devuelve la fecha actual.
- `current_timestamp()`: Devuelve la marca de tiempo actual.
- `datediff(end, start)`: Calcula la diferencia en días entre dos fechas.
- `date_add(start, days)`: Suma un número específico de días a una fecha.
- `date_sub(start, days)`: Resta un número específico de días a una fecha.
- `year(column)`: Extrae el año de una fecha.
- `month(column)`: Extrae el mes de una fecha.
- `dayofmonth(column)`: Extrae el día del mes de una fecha.

## Funciones Matemáticas
- `abs(column)`: Devuelve el valor absoluto de una columna.
- `round(column, scale)`: Redondea los valores de una columna a un número específico de decimales.
- `sqrt(column)`: Calcula la raíz cuadrada de los valores de una columna.
- `pow(column, exp)`: Calcula la potencia de los valores de una columna.

## Funciones Condicionales
- `when(condition, value)`: Devuelve un valor cuando la condición es verdadera.
- `otherwise(value)`: Especifica un valor alternativo cuando la condición es falsa (usado junto con `when`).

## Funciones de Ventana
- `row_number()`: Asigna un número de fila secuencial a cada fila dentro de una partición de un DataFrame.
- `rank()`: Asigna un rango a cada fila dentro de una partición de un DataFrame.

## Funciones de Orden Superior
- `array(*cols)`: Crea una columna de tipo array.
- `explode(column)`: Expande una columna de tipo array en filas individuales.
- `map_keys(column)`: Devuelve una columna de tipo array de las claves de un mapa.
- `map_values(column)`: Devuelve una columna de tipo array de los valores de un mapa.


In [None]:
dft = dfg.groupBy("producto") \
    .agg(sum("cantidad").alias("cantidad_total"), 
    avg("precio").alias("precio_promedio"),
    min("precio").alias("precio_promedio"),
    max("precio").alias("precio_maximo"),
    count("producto").alias("total_ventas")
)

#### CREAR NUEVA COLUMNA

In [None]:
from pyspark.sql.functions import lit, col, split, explode

In [None]:
# valor harcodeado , usamos lit
dft.withColumn("xxx", lit("ABC")).show()

In [None]:
# valor computado
dft.withColumn("xxx", col("cantidad_total") /2).show()

#### RENOMBRAR COLUMNA

In [None]:
dft.withColumn(
    "xxx",
    col("cantidad_total") /2
    ).withColumnRenamed("xxx", "my_column").show()


#### ELIMINAR COLUMNA

In [None]:
aaa = dft.withColumn(
    "xxx",
    col("cantidad_total") /2
    ).withColumn("yyy", lit("ABC")).withColumnRenamed("xxx", "my_column")
aaa.drop("yyy").show()

In [None]:
df.show()


#### EXPLODE

In [None]:

df.withColumn("array_emails",  split(col("email_dependents"), ",")).printSchema()
              

In [None]:
xyz = df.withColumn("array_emails",  explode(split(col("email_dependents"), ",")))
xyz.show()


#### UDF

In [None]:
from pyspark.sql.types import MapType, StringType, IntegerType
from pyspark.sql.functions import udf

def size_email(value):
  size_message = len(value)
  upper_cases = len([letter for letter in value if letter.isupper()])
  lower_cases = len([letter for letter in value if letter.islower()])
  digit_cases = len([letter for letter in value if letter.isdigit()])

  return {
    "size": size_message, 
    "upperc" :upper_cases, 
    "lowerc": lower_cases, 
    "digitc": digit_cases
  }

propia_udf = udf(size_email, MapType(StringType(), IntegerType()))

In [None]:
zzz = xyz.withColumn("msmp", propia_udf(col("array_emails")))
zzz.show()

In [None]:
zzz.select(
  zzz.array_emails,
  zzz.msmp.size, 
    zzz.msmp.upperc, 
    zzz.msmp.lowerc, 
    col("msmp.digitc").alias("digitc")
).show()