El siguiente notebook de databricks tiene como objetivo la  generacion datos ficticios para el area de departamentos, puestos de trabajo y empleados haciendo uso de pySpark para la integracion con el cluster de spark y databricks, Faker para la generacion de datos dummies,mysql-connector-python y SQLAlchemy para la conexion a una base de datos MYSQL.
Una vez generados estos datos ficticios estos son cargados al sistema de almacenamiento en la nube de azure Blob Storage donde son alamcenados en formato parquet.
Una vez almacenados esto datos en formato parquet son leidos y convertidos a pandas con el fin de ser insertados en batch a la base de datos MYSQL


A continuacion instalamos las librerias a utilizar,
Faker: genracion de datos Dummies
mysql-connector y SQLAlchemy se utilizan con el fin de establecer la conexion a la base de datos


In [None]:
%pip install faker
%pip install mysql-connector-python
%pip install SQLAlchemy

Python interpreter will be restarted.
Collecting faker
  Downloading Faker-24.14.0-py3-none-any.whl (1.8 MB)
Installing collected packages: faker
Successfully installed faker-24.14.0
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting mysql-connector-python
  Downloading mysql_connector_python-8.3.0-cp39-cp39-manylinux_2_17_x86_64.whl (21.5 MB)
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-8.3.0
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting SQLAlchemy
  Downloading SQLAlchemy-2.0.29-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
Collecting greenlet!=0.4.17
  Downloading greenlet-3.0.3-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (614 kB)
Collecting typing-extensions>=4.6.0
  Downloading typing_extensions-4.11.0-py3-none-any.whl (34 kB)
Installing collected packages: typing-extensions, greenlet, SQLAlchemy
  Attempting uninstal

Aquí importamos las librerias  necesarias para nuestro script. Utilizamos pyspark.sql para trabajar con Spark DataFrame, Faker para generar datos dummies y random para generar números aleatorios.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
from faker import Faker
import random

fake = Faker()

En esta seccion creamos una sesión de Spark utilizando SparkSession.builder.getOrCreate(), donde esta sesion proporciona una sesion de trabajo con el cluster de Spark.
de igual manera configuramos la conexion al Azure Blob Storage, esto con el fin de poder acceder ay escribir los datos en los containers

In [None]:
session = SparkSession.builder.getOrCreate()

session.conf.set(
    "s.azure.account.key.cwsazure.blob.core.windows.net",
    "oBRTYU3douji12**//==sZTcudddTlQaJdaM6UPZUk5oQ0rGR..-XVVWIpusvHvMBTliZOVwL+AStLxdUwQ=="
)

Se crea un dataframe de pyspark llamado departmentsdf el cual contiene una serie de datos ficticios acerca de los departamentos o areas presentes en una empresa, usamos un list comprehension para iterar los departamentos ficticios y asi enumerarlos y crear el dataframe con los campos de dep_id y deo_name, donde:

* dep_id : representa el id del departamento
* dep_nam : representa el nombre del departamento

In [None]:


departments = ["Human Resources", "Finance", "Marketing", "Operations", "Sales"]
departments_generate = [(i+1, department) for i, department in enumerate(departments)]
departmentsdf = spark.createDataFrame(departments_generate, ["dep_id", "dep_name"])

In [None]:
departmentsdf.show()

+------+---------------+
|dep_id|       dep_name|
+------+---------------+
|     1|Human Resources|
|     2|        Finance|
|     3|      Marketing|
|     4|     Operations|
|     5|          Sales|
+------+---------------+



Se crea un dataframe de pyspark llamado jobsdf  el cual contiene una serie de datos ficticios acerca de los diferentes puestos de trabajo que hay  presentes en una empresa, en esta ocacion creamos una lista la cual contiene tuplas con los datos de cada puesto de trabajo, los datos presentes en estas tuplas son job_id, job_name, min_salary, max_salary donde:

* job_id : representa el id del puesto de trabajo
* job_name : representa el nombre del puesto de trabajo
* min_salary : representa el salario minimo del puesto de trabajo
* max_salary : representa el salario maximo del puesto de trabajo

In [None]:
jobs = [(1, "Manager", 50000, 80000),
             (2, "Developer", 40000, 60000),
             (3, "Designer", 35000, 50000),
             (4, "Analyst", 45000, 70000),
             (5, "Assistant", 30000, 35000)]
jobsdf = spark.createDataFrame(jobs, ["job_id", "job_name", "min_salary", "max_salary"])

Se crea un dataframe llamado employeesdf, el cual utiliza una list comprehension con el fin de crear una lista de 100 tuplas  donde cada tupla representa un empleado, el cual cuenta con los registros de employee_id, department_id,job_id, first_name, last_name, email,phone_number, contract_date,salary  donde :
* employee_id : representa el id del empleado
* department_id  : representa el id del departamento al cual pertenece el empleado
* job_id : representa el id del puesto de trabajo al cual pertenece el empleado
* first_name : representa el primer nombre del empleado, este first_name es generado mediante la libreria fake el cual trae data dummi
* last_name : representa el segundo  nombre del empleado, este last_name es generado mediante la libreria fake el cual trae data dummi
* email : representa el email del empleado, este email es generado mediante la libreria fake el cual trae data dummi
* phone_number  : representa el numero de telefono del empleado,este numero de telefono es generado mediante la libreria fake la cual trae data dummi 
* contract_date  : representa fecha de contratacion del empleado, esta fecha es generado mediante la libreria fake la cual trae data dummi de fechas
* salary : representa el salario que gana el empleado, este salario es generado de manera aleatoria con un rango de valores


In [None]:

n = 100
data = [(k+1,
        random.randint(1, len(departments)),
        random.randint(1, len(jobs)),
        fake.first_name(),
        fake.last_name(),
        fake.email(),
        fake.phone_number(),
        fake.date_between(start_date='-5y', end_date='today'),                   
        random.randint(30000, 80000)) for k in range(n)]

employeesdf = spark.createDataFrame(data, ["employee_id", "department_id","job_id", "first_name", "last_name", "email",
                                                       "phone_number", "contract_date","salary"])

In [None]:
employeesdf.show()

+-----------+-------------+------+----------+---------+--------------------+--------------------+-------------+------+
|employee_id|department_id|job_id|first_name|last_name|               email|        phone_number|contract_date|salary|
+-----------+-------------+------+----------+---------+--------------------+--------------------+-------------+------+
|          1|            5|     3|   Brandon|   Miller|ashleywilcox@exam...|   361.292.3713x6066|   2020-07-08| 54413|
|          2|            1|     1|      John|  Osborne|jimenezbrandon@ex...|  752-398-5954x03932|   2020-09-16| 56586|
|          3|            2|     3|   Micheal|    Brown| amypham@example.com| (342)281-4004x61433|   2021-01-21| 79661|
|          4|            3|     4|      Dean|    Smith|hamiltontom@examp...|        365-210-9127|   2023-05-21| 42101|
|          5|            4|     2|      Mark| Holloway|ycampbell@example...|        301-353-2266|   2024-04-11| 63936|
|          6|            4|     5|      Adam|  A

Escribimos los dataframes generados en formato parquet en el servicio de almacenamiento de azure blob storage

In [None]:
employeesdf.write.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-employees/")
jobsdf.write.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-jobs/")
departmentsdf.write.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-departments/")


Leemos los archivos tipo parquet en formato de dataframes de pyspark, luego los ordenamos por su id y por ultimo los convertimos a dataframes de pandas esto con el fin de hacer un proceso de insersion de datos en batch a una base de datos MySQL

In [None]:
dfemployees = session.read.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-employees/").orderBy(col("employee_id")).toPandas()
dfjobs = session.read.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-jobs/").orderBy(col("job_id")).toPandas()
dfdepartments = session.read.parquet("wasbs://data-bronze@cwsazure.blob.core.windows.net/data-departments/").orderBy(col("dep_id")).toPandas()



Definimos las credenciales de acceso a la Base de datos

In [None]:
user = 'admin'
password = 'd/¿12la*Qc1tr__.K'
name_bd = 'dwh_tp'
name_tableEmp = 'employees'
name_tableJob = 'jobs'
name_tableDep = 'departments'

Generamos una url de conexion hacia la base de datos de MySQL

In [None]:
url = f"mysql+mysqlconnector://{user}:{password}@test-facapi-da12sol. 2A122*w3e__wbr2*y.mx-eastus-1.mysql.amazonaws.com:3306/{name_bd}"


Escribimos los dataframes de pandas en la base de datos de MySQL utilizando el metodo to_sql(), donde definimos la cadena de conexion, el nombre de la tabla que va a recibir la informacion, el parametro if_exists='replace' e index=False los cuales indican que si las tablas ya existen estas  deben ser reemplazadas, y que no se debe agregar el índice de fila como una columna en la base de datos, este proceso lo realizamos para los 3 dataframes que fueron leidos desde el azure blob storage 


In [None]:
dfemployees.to_sql(name_tableEmp, url, if_exists='replace', index=False)

Out[8]: 100

In [None]:
dfjobs.to_sql(name_tableJob, url, if_exists='replace', index=False)

Out[11]: 5

In [None]:
dfdepartments.to_sql(name_tableDep, url, if_exists='replace', index=False)

Out[13]: 5