<a href="https://colab.research.google.com/github/dfercho100/-PruebaAutomatizacionChoucair/blob/main/Semana%203/TALLER%20-%20Extraer%2C%20transformar%20y%20cargar%20datos/TALLER%20-%20Extraer%2C%20transformar%20y%20cargar%20datos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extracción, transformación y carga de datos: `pyspark`

En este taller extraerás, transformarás y cargarás tablas, haciendo uso de `pyspark` para interactuar con bases de datos relacionales.

## Habilidades en práctica

Al realizar este taller podrás revisar tu progreso para:

**1.** Extraer y transformar tablas de bases de datos relacionales con operaciones de algebra relacional en `pyspark`. <br>
**2.** Crear y cargar tablas en bases de datos relacionales con SQL (_Structured Query Language_).

## Instrucciones

En cada uno de los siguientes ejercicios deberás escribir el código solicitado estrictamente en las celdas indicadas para ello, teniendo en cuenta las siguientes recomendaciones:

* No crear, eliminar o modificar celdas de este Notebook (salvo lo que se te indique), pues puede verse afectado el proceso de calificación automática.

* La calificación se realiza de manera automática con datos diferentes a los proporcionados en este taller. Por consiguiente, tu código debe funcionar para diferentes instancias de cada uno de los ejercicios; una instancia hace referencia a los posibles valores de los parámetros.

* La calificación de cada ejercicio depende del valor que retorne la función especificada en su enunciado. Por lo tanto, aunque implementes funciones adicionales, es escencial que utilices los nombres propuestos en los enunciados de los ejercicios para implementar la función definitiva.

## Ejercicios
En la siguente celda encuentras declarados los paquetes necesarios para el desarollo de este taller.

In [26]:
# Esta celda no es modificable

import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, functions

En la siguiente celda encuentras la inicialización de una sesión de `pyspark`. Puedes editar la configuración según tu criterio, pero recomendamos mantener los valores predefinidos.

In [27]:
# Esta celda SÍ es modificable
#debemos descargar los dos programas de la siguiente pagina:
#https://www.oracle.com/java/technologies/downloads/
#https://spark.apache.org/downloads.html
#https://github.com/steveloughran/winutils
#https://www.youtube.com/watch?v=wt2wM8C2SXA            super util

spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Instancia_Taller_PySpark") \
                    .config("spark.sql.warehouse.dir", "./Archivos/") \
                    .enableHiveSupport() \
                    .getOrCreate()
spark

Si en algún momento deseas restablecer el estado de Spark y borrar las bases de datos, puedes correr el siguiente código. Asegurate de reemplazar la ruta al directorio de tu base de datos.

In [28]:
# Esta celda SÍ es modificable

import shutil


# Esta instrucción permite borrar bases de datos persistentes.
# Si cambiaste el atributo `spark.sql.warehouse.dir` en la configuración
# de la sesión, debes reflejar el cambio en esta instrucción.

# shutil.rmtree('./Archivos/<nombre_db>.db/')

# Esta instrucción borra la base de datos temporal de PySpark.
# shutil.rmtree('./metastore_db/')

Harmonialpes, una empresa dedicada a la distribución al por mayor de harmónicas, ha tenido ventas favorables a lo largo del año. Con el gran volumen de harmónicas, el equipo de ventas se ha dado cuenta de que registrarlas en una hoja de Excel compartida no es una forma viable de hacerle seguimiento al desempeño del negocio y ha decidido migrar a un sistema de bases de datos relacionales. Tú, como experto en analítica y gobierno de datos, has sido encargado con la tarea de esta ambiciosa transformación tecnológica.

### Ejercicio 1


En la siguiente celda encuentras declarada la ruta relativa al archivo de Excel que almacena los datos de todas las órdenes de Harmonialpes, cada una con el respectivo cliente que la realiza y el agente de ventas encargado.

In [29]:
# No modifiques esta celda

ruta = r"Archivos/tabla_ventas.xlsx"


Implementa una función llamada `xlsx_a_dataframe` que reciba por parámetro una cadena de texto como la declarada en la celda anterior y que retorne un `DataFrame` de `pyspark`, resultado de leer el archivo. Debes revisar meticulósamente qué campos deben ser de qué tipo, ya que vas a encontrar valores numéricos, fechas y cadenas de texto.
# Nueva sección
Puedes utilizar métodos de `pandas` para leer los datos, pero no para otros ejercicios del taller.

La función debe retornar un `DataFrame` de `pyspark`.

Ejecuta tu función con la ruta definida como argumento y guarda el resultado en una variable global llamada `spark_ventas_df`.

In [36]:
def xlsx_a_dataframe(ruta):
    """
    Función que convierte un archivo .xlsx a un DataFrame de PySpark.
    :param ruta: Ruta del archivo .xlsx
    :return: DataFrame de PySpark
    """
    # Leer el archivo .xlsx usando pandas
    # If running in Google Colab, prepend '/content/' to the file path
    # if 'google.colab' in str(get_ipython()):
    #     ruta = '/content/' + ruta

    df_pandas = pd.read_excel(ruta, engine='openpyxl')

    # Convertir las columnas de fecha al formato correcto
    date_cols = df_pandas.select_dtypes(include=['datetime64']).columns
    for col in date_cols:
        df_pandas[col] = df_pandas[col].dt.to_pydatetime()

    # Convertir el DataFrame de pandas a un DataFrame de PySpark
    spark_df = SparkSession.builder.getOrCreate().createDataFrame(df_pandas)

    return spark_df

# Ejecutar la función y guardar el resultado en la variable global
# Replace with the actual absolute path to your file if it's not in the same directory as the notebook:
spark_ventas_df = xlsx_a_dataframe("Archivos/tabla_ventas.xlsx") # or "/your_actual_path/Archivos/tabla_ventas.xlsx"

  df_pandas[col] = df_pandas[col].dt.to_pydatetime()


In [37]:
## AUTO-CALIFICADOR

# Base variables
import pyspark
import datetime

ruta = r"Archivos/tabla_ventas.xlsx"

try:
    # Caso 1: no existe la función.
    try:
        xlsx_a_dataframe
        assert callable(xlsx_a_dataframe)
    except:
        raise NotImplementedError("No existe una función llamada xlsx_a_dataframe.",)

    # Caso 2: la función es interrumpida por errores durante su ejecución.
    try:
        resultado = xlsx_a_dataframe(ruta)
    except:
        raise RuntimeError("Tu función produce un error al ejecutarse.")

    # Caso 3: no retorna un DataFrame.
    assert isinstance(resultado, pyspark.sql.dataframe.DataFrame), f"Tu función debe retornar un objeto de tipo '{pyspark.sql.dataframe.DataFrame.__name__}'."

    # Caso 4: retorna un dataframe con cantidad de columnas errada
    assert len(resultado.columns) == 23, "Tu función retorna un DataFrame con cantidad de columnas errada."

    # Caso 5: devuelve un dataframe con cantidad de filas errada
    assert resultado.count() == 35, "Tu función retorna un DataFrame con cantidad de filas errada."

    # Caso 6: retorna valores no acertados
    expected_first_row = [200131, 900, 150, datetime.datetime(2008, 8, 26, 0, 0), 'SOD', 'C00012', 'Steven', 'San Jose', 'San Jose', 'USA', 1, 5000, 7000, 9000, 3000, 'KRFYGJK', 'A012', 'A012', 'Lucida', 'San Jose', 0.12, '044-52981425', 'United States']
    expected_last_row = [200124, 500, 100, datetime.datetime(2008, 6, 20, 0, 0), 'SOD', 'C00017', 'Srinivas', 'Bangalore', 'Bangalore', 'India', 2, 8000, 4000, 3000, 9000, 'AAAAAAB', 'A007', 'A007', 'Ramasundar', 'Bangalore', 0.15, '077-25814763', 'India']

    assert list(resultado.head(1)[0]) == expected_first_row, "Tu función retorna un DataFrame con valores distintos a los esperados."
    assert list(resultado.tail(1)[0]) == expected_last_row, "Tu función retorna un DataFrame con valores distintos a los esperados."

    # Caso 7: no guarda el resultado en la variable indicada
    try:
        spark_ventas_df
        assert isinstance(spark_ventas_df, pyspark.sql.dataframe.DataFrame)
    except:
        raise NotImplementedError("No existe un DataFrame llamado spark_ventas_df.",)

except:
    # Restaurar variable
    ruta = r"Archivos/tabla_ventas.xlsx"
    raise

finally:
    # Restaurar variable
    ruta = r"Archivos/tabla_ventas.xlsx"

print("Felicidades, realizaste este ejercicio correctamente.")

  df_pandas[col] = df_pandas[col].dt.to_pydatetime()


Felicidades, realizaste este ejercicio correctamente.


### Ejercicio 2

La tabla de ventas, en su estado actual, incluye información del cliente y el agente de ventas para cada orden y, por lo tanto, tiene una dimensión que es incómoda de manejar para los usuarios de los datos. También, cada cliente y cada agente está registrado en una o más ventas, lo que quiere decir que hay mucha información redundante en la tabla.

Haciendo uso de la variable que definiste, `spark_ventas_df`, crea una función llamada `desagregar_df` que reciba por parámetro un `DataFrame` con los mismos campos de la variable `spark_ventas_df` y retorne una tupla con tres nuevos `DataFrame` de `pyspark`. Los `DataFrame` deben contener las combinaciones únicas existentes de las columnas de cada categoría (orden, cliente, agente).

Almacena cada `DataFrame` del resultado de la función `desagregar_df` en las siguientes variables globales, según su descripción:

- `spark_ordenes_df`: las columnas que describen las órdenes de compra. Estas columnas tienen nombres de la forma `"Order_<campo>"`. <br><br>
    - La columna `"Order_Number"` define individualmente cada orden de compra.<br><br>
    - Para poder no perder información acerca de qué cliente realizó cada orden, debemos incluir en este `DataFrame` la columna `Customer_Code`.<br><br>
    - Para poder no perder información acerca de qué agente estuvo encargado de cada orden, debemos incluir en este `DataFrame` la columna `Agent_Code`.<br><br>
    - Este `DataFrame` no puede contener información adicional de los clientes ni de los agentes.<br><br>

- `spark_clientes_df`: las columnas que describen a los clientes. Estas columnas tienen nombres de la forma `"Customer_<campo>"`. <br><br>
    - La columna `"Customer_Code"` define individualmente a cada cliente.<br><br>
    - De cada cliente se encarga un único agente. Con el fin de respetar la relación de negocios entre los agentes y sus respectivos clientes, debemos incluir en este `DataFrame` la columna `Agent_Code`.<br><br>
    - Este `DataFrame` no puede contener información adicional de las ordenes ni de los agentes.<br><br>
    
- `spark_agentes_df`: las columnas que describen a los agentes. Estas columnas tienen nombres de la forma `"Agent_<campo>"`. <br><br>
    - La columna `"Agent_Code"` define individualmente a cada agente.<br><br>
    - Este `DataFrame` no puede contener información adicional de las ordenes ni de los clientes.
    
Asegúrate de ordenar las tablas por la columna que define los registros individualmente.

Ayuda: puedes utilizar el método `DataFrame.colRegex`.

In [None]:
# your code here

Py4JJavaError: An error occurred while calling o97.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (LAPTOP-75HTOMDH executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:705)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:749)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:673)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:615)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:572)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:530)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:705)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:749)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:673)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:615)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:572)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:530)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 31 more


In [None]:
## AUTO-CALIFICADOR

# Base variables
import pyspark
import datetime

# Caso 1: no existe la función.
try:
    desagregar_df
    assert callable(desagregar_df)
except:
    raise NotImplementedError("No existe una función llamada desagregar_df.",)

# Caso 2: la función es interrumpida por errores durante su ejecución.
try:
    resultado = desagregar_df(spark_ventas_df)
except:
    raise RuntimeError("Tu función produce un error al ejecutarse.")

# Caso 3: no retorna una tupla.
assert isinstance(resultado, tuple), f"Tu función debe retornar un objeto de tipo '{tuple.__name__}'."

# Caso 4: no retorna objetos tipo DataFrame en la tupla.
for i in range(3):
    assert isinstance(resultado[i], pyspark.sql.dataframe.DataFrame), f"Tu función debe retornar una {tuple.__name__} con elementos de tipo '{pyspark.sql.dataframe.DataFrame.__name__}'."

# Caso 5: retorna un dataframe con cantidad de columnas errada
assert len(resultado[0].columns) == 7, "Tu función retorna un DataFrame en la tupla con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."
assert len(resultado[1].columns) == 12, "Tu función retorna un DataFrame en la tupla con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."
assert len(resultado[2].columns) == 6, "Tu función retorna un DataFrame en la tupla con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."

# Caso 6: devuelve un dataframe con cantidad de filas errada
assert resultado[0].count() == 34, "Tu función retorna un DataFrame en la tupla con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes."
assert resultado[1].count() == 25, "Tu función retorna un DataFrame en la tupla con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes."
assert resultado[2].count() == 12, "Tu función retorna un DataFrame en la tupla con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes."

# Caso 7: los dataframe no contienen las columnas debidas
assert "Agent_Code" in resultado[0].columns, "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."
assert "Customer_Code" in resultado[0].columns, "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."
assert sum(i[:len("Order_")] == "Order_" for i in resultado[0].columns), "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."

assert "Agent_Code" in resultado[1].columns, "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."
assert sum(i[:len("Customer_")] == "Customer_" for i in resultado[1].columns), "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."

assert sum(i[:len("Agent_")] == "Agent_" for i in resultado[2].columns), "Tu función retorna un DataFrame en la tupla sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."

# Caso 8: retorna valores no acertados
expected_first_row = [200100, 1000, 600, datetime.datetime(2008, 8, 1, 0, 0), 'SOD', 'A003', 'C00013']
expected_last_row = [200135, 2000, 800, datetime.datetime(2008, 9, 16, 0, 0), 'SOD', 'A010', 'C00007']
assert list(resultado[0].head(1)[0]) == expected_first_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."
assert list(resultado[0].tail(1)[0]) == expected_last_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."

expected_first_row = ['C00001', 'Micheal', 'New York', 'New York', 'USA', 2, 3000, 5000, 2000, 6000, 'CCCCCCC', 'A008']
expected_last_row = ['C00025', 'Ravindran', 'Bangalore', 'Bangalore', 'India', 2, 5000, 7000, 4000, 8000, 'AVAVAVA', 'A011']
assert list(resultado[1].head(1)[0]) == expected_first_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."
assert list(resultado[1].tail(1)[0]) == expected_last_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."

expected_first_row = ['A001', 'Subbarao', 'Bangalore', 0.14, '077-12346674', 'India']
expected_last_row = ['A012', 'Lucida', 'San Jose', 0.12, '044-52981425', 'United States']
assert list(resultado[2].head(1)[0]) == expected_first_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."
assert list(resultado[2].tail(1)[0]) == expected_last_row, "Tu función retorna un DataFrame en la tupla con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."

# Caso 9: no existen las variables indicadas
try:
    spark_ordenes_df
    assert isinstance(spark_ordenes_df, pyspark.sql.dataframe.DataFrame)
except:
    raise NotImplementedError("No existe un DataFrame llamado spark_ordenes_df.",)

try:
    spark_clientes_df
    assert isinstance(spark_clientes_df, pyspark.sql.dataframe.DataFrame)
except:
    raise NotImplementedError("No existe un DataFrame llamado spark_clientes_df.",)

try:
    spark_agentes_df
    assert isinstance(spark_agentes_df, pyspark.sql.dataframe.DataFrame)
except:
    raise NotImplementedError("No existe un DataFrame llamado spark_agentes_df.",)

# Caso 10: no guarda el resultado en las variables acertadas
assert spark_ordenes_df.collect() == resultado[0].collect(), "La variable spark_ordenes_df no guarda el primer DataFrame de la tupla."
assert spark_clientes_df.collect() == resultado[1].collect(), "La variable spark_clientes_df no guarda el segundo DataFrame de la tupla."
assert spark_agentes_df.collect() == resultado[2].collect(), "La variable spark_agentes_df no guarda el tercer DataFrame de la tupla."

print("Felicidades, realizaste este ejercicio correctamente.")

### Ejercicio 3

En preparación para la transformación tecnológica, Harmonialpes ha estado recopilando más y más información para almacenar en bases de datos y aprovechar al máximo la inversión en infraestructura y tecnología. En aras de poder acomodar nuevos esquemas de datos y gran diversidad de información, han pedido una función que pueda crear nuevas bases de datos, en las cuales puedan almacenar permanentemente nuevas tablas y sus relaciones.

En la siguiente celda encuentras declarada una lista de cadenas de texto, cada una con el nombre de una base de datos que necesitan para almacenar su información.

In [None]:
# No modifiques esta celda

nombres_a_crear = ["ventas_db", "inventario_db", "activos_db"]

Implementa una función llamada `crear_varios_db` que reciba por parámetro una lista de cadenas de texto como la declarada en la celda anterior y que retorne un `DataFrame` de `pyspark`. El `DataFrame` debe tener una única columna llamada `"namespace"` cuyos valores sean las bases de datos en el directorio de archivos (no debe incluir `default`). La función no debe crear las bases de datos que ya existan en el directorio.

La función debe retornar un `DataFrame` de `pyspark`.

Ejecuta tu función con la lista de nombres a crear definida como argumento y guarda el resultado en una variable global llamada `spark_bases_de_datos_df`.

In [None]:
# your code here

NameError: name 'spark' is not defined

In [None]:
## AUTO-CALIFICADOR

# Base variables
import pyspark


nombres_a_crear = ["ventas_db", "inventario_db", "activos_db"]

try:
    # Caso 1: no existe la función.
    try:
        crear_varios_db
        assert callable(crear_varios_db)
    except:
        raise NotImplementedError("No existe una función llamada crear_varios_db.",)

    # Caso 2: la función es interrumpida por errores durante su ejecución.
    try:
        resultado = crear_varios_db(nombres_a_crear)
        nombres_a_crear = ["ventas_db", "inventario_db", "activos_db"]
    except:
        raise RuntimeError("Tu función produce un error al ejecutarse.")

    # Caso 3: no retorna un DataFrame.
    assert isinstance(resultado, pyspark.sql.dataframe.DataFrame), f"Tu función debe retornar un objeto de tipo '{pyspark.sql.dataframe.DataFrame.__name__}'."

    # Caso 4: retorna un dataframe con cantidad de columnas errada
    assert len(resultado.columns) == 1, "Tu función retorna un DataFrame con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."

    # Caso 5: devuelve un dataframe con cantidad de filas errada
    assert resultado.count() >= len(nombres_a_crear), "Tu función retorna un DataFrame con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes."

    # Caso 6: los dataframe no contienen las columnas debidas
    assert "namespace" in resultado.columns, "Tu función retorna un DataFrame sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."

    # Caso 7: retorna valores no acertados
    assert resultado.withColumn("was_requested", functions.col("namespace") \
                                         .isin(nombres_a_crear) \
                                         .cast("long")) \
                                         .agg(functions.sum("was_requested")).collect()[0][0] == len(nombres_a_crear), "Tu función retorna un DataFrame con valores distintos a los esperados. Asegúrate de no tener filas repetidas o faltantes."

    # Caso 8: no existen las variables indicadas
    try:
        spark_bases_de_datos_df
        assert isinstance(spark_bases_de_datos_df, pyspark.sql.dataframe.DataFrame)
    except:
        raise NotImplementedError("No existe un DataFrame llamado spark_bases_de_datos_df.",)

    # Caso 9: no crea las bases de datos
    data = [[i] for i in nombres_a_crear]
    assert set(spark.sql("SHOW DATABASES;").collect()).issuperset(set(spark.createDataFrame(data=data, schema=["namespace"]).collect())), "No has creado todas las bases de datos solicitadas."

except:
    nombres_a_crear = ["ventas_db", "inventario_db", "activos_db"]
    raise

nombres_a_crear = ["ventas_db", "inventario_db", "activos_db"]

print("Felicidades, realizaste este ejercicio correctamente.")

### Ejercicio 4

Ahora que existe la bases de datos `ventas_db`, podemos cargar nuestras tablas.

Implementa una función llamada `cargar_a_ventas_db` que no reciba parámetros, cargue las tablas `spark_ordenes_df`, `spark_clientes_df` y `spark_agentes_df` a la base de datos `ventas_db` y retorne un `DataFrame` de `pyspark`. El `DataFrame` debe tener una única columna llamada `"tableName"`, cuyos valores sean las tablas en la base de datos `ventas_db`. Los nombres de las tablas en la base de datos deben ser los mismos de las variables en `pyspark`. La función no debe crear las tablas que ya existan en la base de datos.

La función debe retornar un `DataFrame` de `pyspark`.

Ejecuta tu función y guarda el resultado en una variable global llamada `spark_tablas_df`.

In [None]:
# your code here


In [None]:
## AUTO-CALIFICADOR

# Base variables
import pyspark

# Caso 1: no existe la función.
try:
    cargar_a_ventas_db
    assert callable(cargar_a_ventas_db)
except:
    raise NotImplementedError("No existe una función llamada cargar_a_ventas_db.",)

# Caso 2: la función es interrumpida por errores durante su ejecución.
try:
    resultado = cargar_a_ventas_db()
except:
    raise RuntimeError("Tu función produce un error al ejecutarse.")

# Caso 3: no retorna un DataFrame.
assert isinstance(resultado, pyspark.sql.dataframe.DataFrame), f"Tu función debe retornar un objeto de tipo '{pyspark.sql.dataframe.DataFrame.__name__}'."

# Caso 4: retorna un dataframe con cantidad de columnas errada
assert len(resultado.columns) == 1, "Tu función retorna un DataFrame con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."

# Caso 5: devuelve un dataframe con cantidad de filas errada
assert resultado.count() == 3, "Tu función retorna un DataFrame con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes y que tu base de datos tenga las tablas estrictamente necesarias."

# Caso 6: los dataframe no contienen las columnas debidas
assert "tableName" in resultado.columns, "Tu función retorna un DataFrame sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar las columnas."

# Caso 7: retorna valores no acertados
tablas = ["spark_ordenes_df", "spark_clientes_df", "spark_agentes_df"]
assert resultado.withColumn("was_requested", functions.col("tableName") \
                                     .isin(tablas) \
                                     .cast("long")) \
                                     .agg(functions.sum("was_requested")).collect()[0][0] == len(tablas), "Tu función retorna un DataFrame con valores distintos a los esperados. Asegúrate de no tener filas repetidas o faltantes."

# Caso 8: no existen las variables indicadas
try:
    spark_tablas_df
    assert isinstance(spark_tablas_df, pyspark.sql.dataframe.DataFrame)
except:
    raise NotImplementedError("No existe un DataFrame llamado spark_tablas_df.",)

# Caso 9: no crea las tablas
data = [[i] for i in tablas]
assert set(spark.sql("SHOW TABLES FROM ventas_db;").select("tableName").collect()) == set(spark.createDataFrame(data=data, schema=["tableName"]).collect()), "Las tablas creadas no son estrictamente las solicitadas."


print("Felicidades, realizaste este ejercicio correctamente.")

### Ejercicio 5

Al equipo de ventas le interesa saber cuáles clientes corresponden a cada agente.

Implementa una función llamada `consulta_join` que no reciba parámetros, que retorne un `DataFrame` de `pyspark`. El `DataFrame` debe tener todas las columnas de las tablas `spark_agentes_df` y `spark_clientes_df`, de la base de datos `ventas_db`, de manera que para cada agente se reporten los clientes que le corresponden o `NULL` si no le corresponde ninguno. Debes ordenar el `DataFrame` por la columna `Customer_Code`.

La función debe retornar un `DataFrame` de `pyspark`.

Ejecuta tu función y guarda el resultado en una variable global llamada `spark_consulta_df`.

In [None]:
# your code here

In [None]:
## AUTO-CALIFICADOR

# Base variables
import pyspark

# Caso 1: no existe la función.
try:
    consulta_join
    assert callable(consulta_join)
except:
    raise NotImplementedError("No existe una función llamada consulta_join.",)

# Caso 2: la función es interrumpida por errores durante su ejecución.
try:
    resultado = consulta_join()
except:
    raise RuntimeError("Tu función produce un error al ejecutarse.")

# Caso 3: no retorna un DataFrame.
assert isinstance(resultado, pyspark.sql.dataframe.DataFrame), f"Tu función debe retornar un objeto de tipo '{pyspark.sql.dataframe.DataFrame.__name__}'."

# Caso 4: retorna un dataframe con cantidad de columnas errada
assert len(resultado.columns) == 17, "Tu función retorna un DataFrame con cantidad de columnas errada. Revisa que no tengas columnas duplicadas o faltantes."

# Caso 5: devuelve un dataframe con cantidad de filas errada
assert resultado.count() == 25, "Tu función retorna un DataFrame con cantidad de filas errada. Revisa que no tengas filas duplicadas o faltantes."

# Caso 6: los dataframe no contienen las columnas debidas
columnas = ['Agent_Code', 'Agent_Name', 'Agent_Working_Area', 'Agent_Commission', 'Agent_Phone_No', 'Agent_Country', 'Customer_Code', 'Customer_Name', 'Customer_City', 'Customer_Working_Area', 'Customer_Country', 'Customer_Grade', 'Customer_Opening_Amount', 'Customer_Receive_Amount', 'Customer_Payment_Amount', 'Customer_Outstanding_Amount', 'Customer_Phone_No']
assert resultado.columns == columnas, "Tu función retorna un DataFrame sin una o más columnas necesarias o con nombre equivocado. Asegúrate de no renombrar ni reordenar las columnas."

# Caso 7: retorna valores no acertados
expected_first_row = ['A008', 'Alford', 'New York', 0.12, '044-25874365', 'United States', 'C00001', 'Micheal', 'New York', 'New York', 'USA', 2, 3000, 5000, 2000, 6000, 'CCCCCCC']
expected_last_row = ['A011', 'Ravi Kumar', 'Bangalore', 0.15, '077-45625874', 'India', 'C00025', 'Ravindran', 'Bangalore', 'Bangalore', 'India', 2, 5000, 7000, 4000, 8000, 'AVAVAVA']

assert list(resultado.head(1)[0]) == expected_first_row, "Tu función retorna un DataFrame con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."
assert list(resultado.tail(1)[0]) == expected_last_row, "Tu función retorna un DataFrame con valores distintos a los esperados. Asegurate de no alterar el orden de los registros únicos en la tabla."

# Caso 8: no existen las variables indicadas
try:
    spark_consulta_df
    assert isinstance(spark_consulta_df, pyspark.sql.dataframe.DataFrame)
except:
    raise NotImplementedError("No existe un DataFrame llamado spark_consulta_df.",)

print("Felicidades, realizaste este ejercicio correctamente.")

## Referencias


SparkBy{Examples}. Spark with Python (PySpark) Tutorial For Beginners. Recuperado el 12 de Agosto de 2022 de:
https://sparkbyexamples.com/pyspark-tutorial/

Apache PySpark. pyspark.sql.DataFrame. Recuperado el 12 de Agosto de 2022 de: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html

w3resource. SQL Table. Recuperado el 20 de Agosto de 2022 de: https://www.w3resource.com/sql/sql-table.php

## Créditos

**Autor(es)**: Alejandro Mantilla Redondo, Diego Alejandro Cely

**Fecha última actualización:** 14/09/2022