In [0]:
%sql
CREATE DATABASE IF NOT EXISTS db_bronze;

In [0]:
%sql
DROP TABLE IF EXISTS db_bronze.hired_employees;
CREATE TABLE db_bronze.hired_employees(
  id bigint,
  name string,
  datetime string,
  department_id bigint,
  job_id bigint
);

DROP TABLE IF EXISTS db_bronze.departments;
CREATE TABLE db_bronze.departments(
  id bigint,
  department string
);

DROP TABLE IF EXISTS db_bronze.jobs;
CREATE TABLE db_bronze.jobs(
  id bigint,
  job string
)

In [0]:
%python
# coding: utf-8

# ||********************************************************************************************************
# || PROYECTO   		: POC -CHALLENGE GLOBLANT 
# || NOMBRE     		: challenge.py
# || TABLA DESTINO	: db_bronze.hired_employees
# ||                  db_bronze.departments 
# ||                  db_bronze.jobs
# || TABLAS FUENTES	: departments.csv
# ||                  hired_employees.csv
# ||                  jobs.csv
# || OBJETIVO   		: ETL - big data migrati
# || TIPO       		: pyspark
# || REPROCESABLE	  : NA
# || SCHEDULER		  : NA
# || JOB  		      : NA
# || VERSION   DESARROLLADOR           FECHA        DESCRIPCION
# || 1.1       ALEXIS DAVILA        21/03/23     Creacion del proceso
# *************************************************************************************************************

###
 # @section Import
 ##

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

###
 # @section configuracion de recursos
 ##
spark = SparkSession.builder.\
appName("CSV to Database").\
config("spark.driver.cores","8").\
config("spark.driver.memoryOverhead","20g").\
config("spark.driver.memory", "16g").\
config("spark.executor.cores","15").\
config("spark.executor.memory","20g").\
config("spark.executor.memoryOverhead","20g").\
config("spark.dynamicAllocation.enabled","true").\
config("spark.dynamicAllocation.maxExecutors","20").\
config("spark.debug.maxToStringFields", "100").\
config("spark.shuffle.service.enabled","true").\
config("spark.sql.shuffle.partitions", "500").\
config("spark.default.parallelism", "500").\
config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.join.preferSortMergeJoin","false").\
config("spark.maxRemoteBlockSizeFetchToMem","2147483135").\
config("spark.shuffle.spill.compress","true").\
config("spark.shuffle.compress","true").\
config("spark.ui.enabled","true").\
enableHiveSupport().\
getOrCreate()

###
 # @section funciones
 ##


def insert_csv_to_db(csv_file,table_name,nombre_columnas):
    """"
    Esta función inserta data de archivos csv a una base de datos especificando las columnas.
    :param csv_file: archivos csv
    :param table_name: nombre y esquema de la tabla en donde se inserta la data
    :param nombre_columnas: la lista de columnas de la tabla
    """
    # Lee la tabla y guarda su contenido en un DataFrame
    df = spark.read.format("csv").option("header", "false").option("inferSchema","True").load(csv_file)
    nombres_columnas_final = nombre_columnas
    #Asigna las columnas en el dataframe
    df = df.toDF(*nombres_columnas_final)
    # Guarda el DataFrame en formato delta en la la tabla especificada
    df.write.format("delta").mode("append").option("batchsize", "1000").option("isolationLevel", "NONE").option("numPartitions", "10").option("mergeSchema", "true").saveAsTable(table_name)

def backup_table(table_name, backup_path, backup_format="avro"):
    """
    Esta función crea una copia de seguridad de una tabla en formato AVRO y la guarda en el sistema de archivos.
    :param table_name: nombre de la tabla a respaldar
    :param backup_path: ruta donde se guardará el archivo de respaldo
    :param backup_format: formato en que se guardará el archivo de respaldo. Por defecto es "avro".
    """
    # Lee la tabla y guarda su contenido en un DataFrame
    df = spark.table(table_name)
    # Guarda el DataFrame en formato AVRO en la ruta especificada
    df.write.format(backup_format).save(backup_path)
    
def restore_table(table_name, backup_path, backup_format="avro"):
    """
    Esta función restaura una tabla a partir de su copia de seguridad en formato AVRO.
    :param table_name: nombre de la tabla a restaurar
    :param backup_path: ruta donde se encuentra el archivo de copia de seguridad
    :param backup_format: formato del archivo de copia de seguridad. Por defecto es "avro".
    """
    # Lee el archivo de copia de seguridad en formato AVRO y carga su contenido en un DataFrame
    df = spark.read.format(backup_format).load(backup_path)
    # Escribe el contenido del DataFrame en la tabla especificada
    df.write.mode("overwrite").saveAsTable(table_name)

def main():
    
    csv_file_1 = "https://raw.githubusercontent.com/alexis18daes/databricks_challenge_repo/dev/hired_employees.csv" 
    csv_file_2 = "https://raw.githubusercontent.com/alexis18daes/databricks_challenge_repo/dev/departments.csv"
    csv_file_3 = "https://raw.githubusercontent.com/alexis18daes/databricks_challenge_repo/dev/jobs.csv"
    
    table_name_1 = "db_bronze.hired_employees"
    table_name_2 = "db_bronze.departments"
    table_name_3 = "db_bronze.jobs"
    
    nombre_columnas_1 = ["id","name","datetime","department_id","job_id"]
    nombre_columnas_2 = ["id","name","department"]
    nombre_columnas_3 = ["id","job"]
    
    insert_csv_to_db(csv_file_1,table_name_1,nombre_columnas_1)
    insert_csv_to_db(csv_file_2,table_name_2,nombre_columnas_2)
    insert_csv_to_db(csv_file_3,table_name_3,nombre_columnas_3)
    
#Ejecucion
main()

spark.stop()

#Salida
exit()

