In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
spark=SparkSession.builder.appName("Spark DataFrames").getOrCreate()

In [0]:
#VAMOS A CREAR UN DATAFRAME
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (8, "HHH", "dept3", 3700),
    (9, "III", "dept3", 4500),
    (10, "JJJ", "dept5", 3400)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]


In [0]:

df_emp=spark.createDataFrame(emp,["id","name","dept","salary"])
df_dept=spark.createDataFrame(dept,["id","name"])

In [0]:
#MOSTRAMOS
df_emp.display()

In [0]:
df_dept.display()

In [0]:
df_dept.dtypes

#### OPERACIONES BASICAS CON DATAFRAMES

#####count
Cuenta el número de filas

In [0]:
df_emp.count()   #cuenta el numero de registros de un dataframe

In [0]:
df_dept.count()

In [0]:
df_emp.select("dept").filter(col("dept")=="dept1").count() #cuenta los departamentos que son dept1

In [0]:
df_emp.select("dept").distinct().count()  #cuenta los departamentos que son diferentes

#### COLUMNS

In [0]:
df_emp.columns  #muestra las columnas del dataframe


In [0]:
df_emp.dtypes  #nos muestra los tipos de datos de las columnas del DFrame

In [0]:
df_emp.schema  # esto nos permite ver el schema del dataframe

In [0]:
df_emp.printSchema()

### select
* Seleccione columnas del DataFrame

In [0]:
df_emp.limit(3).display()

In [0]:
df_emp.select(col("name").alias("nombre"),col("dept").alias("departamento")).display()   #para hacer un select y alias a las columnas del dataframe

### filter

* Filtrar las filas según alguna condición.
* Intentemos encontrar las filas con id = 1.
* Hay diferentes formas de especificar la condición.

In [0]:
df_emp.filter(col("id")==1).display()  # EL REGISTRO CON EL ID 1

In [0]:
df_emp.filter(df_emp["id"]==1).display()

In [0]:
df_emp.filter(df_emp.id==1).display()

### drop
* Elimina una columna en particular

In [0]:
df_emp.drop(col("name")).display()

### Aggregations
* Podemos usar la función groupBy para agrupar los datos y luego usar la función "agg" para realizar la agregación de datos agrupados.

In [0]:
df_emp.groupby(col("dept"))\
.agg(sum("salary").alias("salario_total")).display()  #suma total de departamentos 

In [0]:
df_emp.limit(3).display()

In [0]:
df_emp.groupby(col("dept"))\
    .agg(max("salary").alias("salario_maximo"),
         min("salary").alias("salario_minimo")
         ).display()
    


In [0]:

df_emp.agg(max("salary")).display()  #esto nos permite ver el salario maximo

In [0]:
df_emp.agg(min("salary")).display()  #este nos permite ver el salario minimo

### Sorting

* Ordena los datos según el "salario". De forma predeterminada, la clasificación se realizará en orden ascendente.

In [0]:
df_emp.sort("salary").display()

In [0]:
df_emp.sort(desc("salary")).display()

In [0]:
display(df_emp.sort(desc("dept"), asc("salary")).limit(6))

### Columnas derivadas
* Podemos usar la función "withColumn" para derivar la columna en función de las columnas existentes ...

In [0]:
df_emp.withColumn("bonus",col("salary")*0.1).display()

In [0]:
# WITHCOLUMNREDAMED
df_emp.withColumnRenamed("salary","salario").display()

### Joins

* Podemos realizar varios tipos de combinaciones en múltiples DataFrames.

In [0]:
df_emp.limit(3).display()
df_dept.limit(3).display()

In [0]:
#INNER JOIN
df_emp.join(df_dept,df_emp["dept"]==df_dept["id"],"inner")\
    .select(df_emp["name"],df_emp["salary"],df_dept["name"]).display()

In [0]:
#LEFT OUTER JOIN
df_emp.join(df_dept,df_emp["dept"]==df_dept["id"],"left_outer")\
    .select(df_emp["name"],df_emp["salary"],df_dept["name"]).display()

In [0]:
#RIGHT OUTER JOIN
df_emp.join(df_dept,df_emp["dept"]==df_dept["id"],"right_outer")\
    .select(df_emp["name"],df_emp["salary"],df_dept["name"]).display()

In [0]:
#FULL OUTER JOIN
df_emp.join(df_dept,df_emp["dept"]==df_dept["id"],"full_outer")\
    .select(df_emp["name"],df_emp["salary"],df_dept["name"]).display()

### Consultas SQL
* Ejecución de consultas tipo SQL.
* También podemos realizar análisis de datos escribiendo consultas similares a SQL. Para realizar consultas similares a SQL, necesitamos registrar el DataFrame como una Vista temporal.

In [0]:
df_empleados=spark.sql("SELECT *FROM dbricks_udemy_course.default.empleados")

In [0]:
df_empleados.createOrReplaceTempView("temp_empleados")


In [0]:
%sql
-- ESTA ES UNA MANERA DE CREAR UNA VISTA TEMPOTAL
CREATE TEMPORARY VIEW v_empleados_01
AS SELECT *FROM dbricks_udemy_course.default.empleados

In [0]:
%sql
select * from v_empleados_01

In [0]:
spark.sql("SELECT *FROM temp_empleados").display()

In [0]:
spark.sql("SELECT *FROM temp_empleados where id=1").display()

In [0]:
df_v_empleados=spark.sql("SELECT *FROM temp_empleados")

In [0]:
df_v_empleados.dtypes

In [0]:
df_cast=df_v_empleados.withColumn("salario",col("salario").cast("String"))

In [0]:
df_cast.printSchema() #como vemos ya cambio el tipo de dato

### Leyendo la tabla HIVE como DataFrame

In [0]:
df.write.saveAsTable("DB_NAME.TBL_NAME")

## También podemos seleccionar el argumento "modo" con overwrite", "append", "error" etc.
df.write.saveAsTable("DB_NAME.TBL_NAME", mode="overwrite")

# De forma predeterminada, la operación guardará el DataFrame como una tabla interna / administrada de HIVE


### Crea un DataFrame a partir de un archivo CSV
* Podemos crear un DataFrame usando un archivo CSV y podemos especificar varias opciones como un separador, encabezado, esquema, inferSchema y varias otras opciones.

In [0]:
 df = spark.read.csv("path_to_csv_file", sep="|", header=True, inferSchema=True)

### Guardar un DataFrame como un archivo CSV

In [0]:
df.write.csv("path_to_CSV_File", sep="|", header=True, mode="overwrite")

In [0]:
df_categorias=spark.read.csv("dbfs:/FileStore/categorias.csv",header=True)

In [0]:
df_categorias.limit(3).display()

In [0]:
#Una manera de cambiar el nombre a las columnas  es:
df_categorias.withColumnRenamed("categoria_id","id")\
    .withColumnRenamed("nombre_categoria","nombre")\
        .withColumnRenamed("descripcion","descripcion").display()


In [0]:
from pyspark.sql.functions import col#otra manera es 

df_categorias.select(col("categoria_id").alias("id"),col("nombre_categoria").alias("nombre"),col("descripcion").alias("descripcion")).display()

In [0]:
df_prueba=spark.read.csv("path",header=True,sep="|",inferSchema=True)
df_prueba.write.csv("path",header=True,sep="|")