### Trabajo de spark desde cero

Apache Spark es una plataforma de procesamiento de datos distribuida diseñada para manejar grandes volúmenes de información de manera eficiente. Permite realizar análisis de datos, procesamiento en tiempo real, aprendizaje automático y manejo de datos estructurados y no estructurados. Spark es ampliamente utilizado en entornos de big data por su velocidad, escalabilidad y facilidad de uso, facilitando tareas como la transformación, agregación y visualización de datos.

In [0]:
%sql
select * from samples.bakehouse.sales_customers
limit 5

In [0]:
df = spark.read.table("samples.bakehouse.sales_customers")
display(df)

In [0]:
df_select = df.select("*")
df_select.display()

## Uso de la función distinct

In [0]:
df_distinc = df_select.select("country").distinct()
display(df_distinc)


## Uso de Where y Filtros

In [0]:
from pyspark.sql.functions import col ## para poder usar el parametro "col"

df_filtrado = df_select.filter(
    (col("country") == "Japan") &
     (col("gender") == "female"))
display(df_filtrado.limit(5))

### Uso de Order By

In [0]:
df_filtrado = df_select.filter(
    (col("country") == "Japan") &
    (col("gender") == "female")
).orderBy(col("first_name").asc())
display(df_filtrado.limit(5))

### Crear columnas

In [0]:
from pyspark.sql.functions import col, concat, lit

df_columnas = df_select.withColumn(
    "full_name", concat(col("first_name"), lit(" "), col("last_name"))
)
display(df_columnas.limit(5))

### Group by + aggregation

In [0]:
df_group = df_select.groupBy(col("country")).count()
display(df_group)

In [0]:
### Otra forma con orden descendente

from pyspark.sql.functions import count

df_grouped = (
    df_select
    .groupBy("country")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())    
)

display(df_grouped)

### Uso de Joins

In [0]:
df_sales = spark.read.table("samples.bakehouse.sales_transactions")

df_join = (df
           .join(df_sales, on = "customerID", how = "inner")
           .select(
               concat(col("first_name"), lit(" "), col("last_name")).alias("full_name"),
               col("country"),
               col("gender"),
               col("product"),
               col("quantity"),
               col("paymentMethod"),
               col("dateTime")
           )
)
display(df_join)


In [0]:
### Casteo (Transformar el formato de las variables)

df_casteo = df_select.select(
  col("customerID").cast("string"),
  col("first_name").cast("string"),
  col("last_name").cast("string"),
  col("email_address").cast("string"),
  col("phone_number").cast("string"),
  col("address").cast("string"),
  col("city").cast("string"),
  col("state").cast("string"),
  col("country").cast("string"),
  col("continent").cast("string"),
  col("postal_zip_code").cast("string"),
  col("gender").cast("string")
)

print(df_casteo.printSchema())






In [0]:
### para crear una nueva columna
df_casteo = df_casteo.withColumn("Nueva_Columna", lit("Nueva columna")
                                    .cast("string"))
display(df_casteo)



In [0]:
### Uso de drop
df_casteo_drop = df_casteo.drop("Nueva_Columna")
display(df_casteo_drop.limit(5))