## Se inicia la conexión a la base de datos mediante un conector jdbc

In [0]:
jdbcHostname = "sqlservergrupo1.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "dbRetail"
jdbcUsername = "serveradmin"
jdbcPassword = "Passwordgrupo1!"
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbcUrl=f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPassword}"

#### Sección donde se importan las librerías o funciones utilizadas

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql import *
import pandas as pd
import random



### Se inician las lecturas de las tablas desde la base de datos

#### Se lee la tabla Categoría desde la base de datos

In [0]:
# Lectura de la tabla dbo.Categoria
categoria_table = (spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", "dbo.Categoria")
  .load()
)

In [0]:
# Vemos su esquema
categoria_table.printSchema()

root
 |-- Cod_Categoria: string (nullable = true)
 |-- Categoria: string (nullable = true)



In [0]:
# Guardamos la tabla Categoría, leída desde la base de datos, como una tabla en el entorno de Databricks
if 'categoria' not in sqlContext.tableNames('default'):
    categoria_table.write.saveAsTable('Categoria')

In [0]:
%sql
select * from Categoria

Cod_Categoria,Categoria
1,Bicicleta
2,Componente
3,Prenda
4,Accesorio


In [0]:
# Vemos su esquema
categoria_table.printSchema()

root
 |-- Cod_Categoria: string (nullable = true)
 |-- Categoria: string (nullable = true)



In [0]:
display(categoria_table)

Cod_Categoria,Categoria
1,Bicicleta
2,Componente
3,Prenda
4,Accesorio


#### Se lee la tabla Producto desde la base de datos

In [0]:
# Lectura de la tabla dbo.Producto
producto_table = (spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", "dbo.Producto")
  .load()
)

In [0]:
producto_table.printSchema()

root
 |-- Cod_Producto: string (nullable = true)
 |-- Producto: string (nullable = true)
 |-- Cod_SubCategoria: string (nullable = true)
 |-- Color: string (nullable = true)



In [0]:
# Guardamos la tabla Producto, leída desde la base de datos, como una tabla en el entorno de Databricks
if 'producto' not in sqlContext.tableNames('default'):
    producto_table.write.saveAsTable('Producto')

#### Se lee la tabla SubCategoria desde la base de datos

In [0]:
# Lectura de la tabla dbo.SubCategoria
subcategoria_table = (spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", "dbo.SubCategoria")
  .load()
)

In [0]:
subcategoria_table.printSchema()

root
 |-- Cod_SubCategoria: string (nullable = true)
 |-- SubCategoria: string (nullable = true)
 |-- Cod_Categoria: string (nullable = true)



In [0]:
# Guardamos la tabla Subcategoría, leída desde la base de datos, como una tabla en el entorno de Databricks
if 'subcategoria' not in sqlContext.tableNames('default'):
    subcategoria_table.write.saveAsTable('Subcategoria')

#### Se lee la tabla Sucursales desde la base de datos

In [0]:
# Lectura de la tabla dbo.VentasInternet
sucursales_table = (spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", "dbo.Sucursales")
  .load()
)

In [0]:
sucursales_table.printSchema()

root
 |-- Cod_Sucursal: string (nullable = true)
 |-- Cod_Sucursal_PK: string (nullable = true)
 |-- Sucursal: string (nullable = true)
 |-- Latitud: string (nullable = true)
 |-- Longitud: string (nullable = true)



In [0]:
# Guardamos la tabla Sucursales, leída desde la base de datos, como una tabla en el entorno de Databricks
if 'sucursales' not in sqlContext.tableNames('default'):
    sucursales_table.write.saveAsTable('Sucursales')

### Se crean las tablas intermedias

In [0]:
%sql
-- Tabla intermedia Producto_Unico
DROP TABLE IF EXISTS Producto_Unico;
CREATE TABLE Producto_Unico (
  Cod_Producto bigint GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
  Producto VARCHAR(50),
  Cod_Subcategoria INT,
  Cod_Categoria INT
  );

In [0]:
%sql
-- Tabla intermedia donde vamos a relacionar los productos con sus sucursales y su stock
DROP TABLE IF EXISTS Productos_Sucursales;
CREATE TABLE Productos_Sucursales (
  Cod_Producto INT,
  Cod_Sucursal INT,
  Stock INT,
  Stock_Inicial INT
  );

### Se llenan las tablas intermedias previamente creadas

In [0]:
%sql
-- Guardamos la consulta, en un view productos_distintos, para tenerla a mano y poder utilizarla más adelante
DROP VIEW IF EXISTS productos_distintos;
CREATE VIEW productos_distintos AS
SELECT MIN(Cod_Producto) AS Cod_Producto, Producto, MIN(Cod_SubCategoria) AS Cod_SubCategoria, COUNT(Producto) AS Stock_Inicial FROM Producto
GROUP BY Producto
ORDER BY Cod_Producto;

In [0]:
%sql
-- Guardamos la consulta, en una view productos_unicos_joined, para tenerla a mano y poder utilizarla más adelante
DROP VIEW IF EXISTS productos_unicos_joined;
CREATE VIEW productos_unicos_joined AS
SELECT t1.Cod_Producto, t1.Producto, t1.Cod_SubCategoria, t2.Cod_Categoria
FROM productos_distintos AS t1
INNER JOIN Subcategoria AS t2
ON t1.Cod_SubCategoria = t2.Cod_SubCategoria
ORDER BY t1.Cod_Producto;

In [0]:
%sql
-- Rellenamos la tabla de Producto_Unico, que ya creamos, con el contenido de la view anteior (productos_unicos_joined)
INSERT INTO Producto_Unico (Producto, Cod_SubCategoria, Cod_Categoria)
SELECT Producto, Cod_SubCategoria, Cod_Categoria FROM productos_unicos_joined;

num_affected_rows,num_inserted_rows
237,237


In [0]:
%sql
-- Select para verificar se que insertó todo bien
SELECT * FROM Producto_Unico;

Cod_Producto,Producto,Cod_Subcategoria,Cod_Categoria
1,"Casco deportivo: 100, rojo",31,4
2,"Casco deportivo: 100, negro",31,4
3,"Calcetines para bicicleta de montaña, M",23,3
4,"Calcetines para bicicleta de montaña, G",23,3
5,"Casco deportivo: 100, azul",31,4
6,"Jersey con logotipo de manga larga, P",21,3
7,"Jersey con logotipo de manga larga, M",21,3
8,"Jersey con logotipo de manga larga, G",21,3
9,"Jersey con logotipo de manga larga, SG",21,3
10,"Cuadro de carretera GA: rojo, 62",14,2


In [0]:
%sql
-- Verificamos como poblar la tabla, se realiza el producto cartesiano entre cod_producto y sucursal y se inicializa el stock en 0
-- Producto cartesiano: aparea cada producto con cada sucursal
SELECT t1.Cod_Producto, t2.Cod_Sucursal, 0 AS Stock FROM Producto_Unico AS t1
CROSS JOIN (SELECT * FROM Sucursales WHERE Cod_Sucursal_PK <> 0) AS t2

Cod_Producto,Cod_Sucursal,Stock
1,1,0
1,2,0
1,3,0
1,4,0
1,5,0
1,6,0
1,7,0
1,8,0
1,9,0
1,10,0


In [0]:
%sql
-- Insertamos el producto cartesiano en la tabla
INSERT INTO Productos_Sucursales
SELECT t1.Cod_Producto, t2.Cod_Sucursal, 0 AS Stock, 0 as Stock_Inicial FROM Producto_Unico AS t1
CROSS JOIN (SELECT * FROM Sucursales WHERE Cod_Sucursal_PK <> 0) AS t2;

num_affected_rows,num_inserted_rows
2370,2370


In [0]:
%sql
-- Se realiza un JOIN entre la view productos_distintos con la tabla intermedia Productos_sucursales para poder poblar el stock de productos sucursales
SELECT t1.Cod_Producto, t2.Stock_Inicial
FROM Producto_Unico AS t1
INNER JOIN productos_distintos AS t2
ON t1.Producto = t2.Producto
ORDER BY t1.Cod_Producto;

Cod_Producto,Stock_Inicial
1,3
2,3
3,1
4,1
5,3
6,3
7,3
8,3
9,3
10,3


In [0]:
%sql
-- Creamos la view anterior, productos_stock_inicial, con la consulta anterior
DROP VIEW IF EXISTS productos_stock_inicial;
CREATE VIEW productos_stock_inicial AS
SELECT t1.Cod_Producto, t2.Stock_Inicial
FROM Producto_Unico AS t1
INNER JOIN productos_distintos AS t2
ON t1.Producto = t2.Producto
ORDER BY t1.Cod_Producto;

In [0]:
# Se crean dataframes de PySpark con las Manage Tables de Databricks
productos_stock_inicial = sqlContext.table('productos_stock_inicial')
productos_sucursales = sqlContext.table('Productos_Sucursales')
productos_stock_inicial.show(5)
productos_sucursales.show(5)

+------------+-------------+
|Cod_Producto|Stock_Inicial|
+------------+-------------+
|           1|            3|
|           2|            3|
|           3|            1|
|           4|            1|
|           5|            3|
+------------+-------------+
only showing top 5 rows

+------------+------------+-----+-------------+
|Cod_Producto|Cod_Sucursal|Stock|Stock_Inicial|
+------------+------------+-----+-------------+
|           1|           1|    0|            0|
|           1|           2|    0|            0|
|           1|           3|    0|            0|
|           1|           4|    0|            0|
|           1|           5|    0|            0|
+------------+------------+-----+-------------+
only showing top 5 rows



In [0]:
# Se convierten los dataframe Pyspark en Pandas
productos_stock_inicial_pd = productos_stock_inicial.toPandas()
productos_sucursales_pd = productos_sucursales.toPandas()

In [0]:
# Se itera sobre cada una de las filas de productos sucursales
for index, row in productos_sucursales_pd.iterrows():
    # A la columna stock de productos sucursales se le asigna un número random entre 0 y el conteo de items duplicados (conteo de items en la tabla de productos)
    row['Stock'] = random.randint(0,productos_stock_inicial_pd[productos_stock_inicial_pd['Cod_Producto'] == row['Cod_Producto']]['Stock_Inicial'].values[0])


In [0]:
productos_sucursales_pd['Stock_Inicial'] = productos_sucursales_pd['Stock'] 

In [0]:
display(productos_sucursales_pd)

Cod_Producto,Cod_Sucursal,Stock,Stock_Inicial
1,1,2,2
1,2,3,3
1,3,2,2
1,4,3,3
1,5,1,1
1,6,3,3
1,7,3,3
1,8,3,3
1,9,1,1
1,10,1,1


In [0]:
%sql
DROP TABLE IF EXISTS Productos_Sucursales_Lleno;

In [0]:
productos_sucursales = spark.createDataFrame(productos_sucursales_pd)
productos_sucursales.write.saveAsTable('Productos_Sucursales_Lleno')

### Se escriben las tablas intermedias llenas en la Azure Database

#### Tabla intermedia llena Producto_Unico

In [0]:
# Se lee la tabla intermedia Producto_Unico, y se guarda el resultado en el dataframe_producto_unico
#dataframe_producto_unico = sqlContext.sql('select * from Producto_Unico')

In [0]:
# Se guarda el dataframe_producto_unico, que contiene la tabla intermedia llena, en la base de datos mediante el conector jdbc
# df_produnico_todb = DataFrameWriter(dataframe_producto_unico)
# df_produnico_todb.jdbc(url=jdbcUrl, table= "Producto_Unico", mode ="overwrite")

#### Tabla intermedia llena Productos_Sucursales

In [0]:
# Se guarda el productos_sucursales_pd, que contiene la tabla intermedia llena, en la base de datos mediante el conector jdbc
# df_prodsucursales_todb = DataFrameWriter(productos_sucursales_pd)
# df_prodsucursales_todb.jdbc(url=jdbcUrl, table= "Productos_Sucursales", mode ="overwrite")

#### Se arma el df_tablasintermedias con cada una de las tablas intermedias llenas para luego escribirlos en el ADLS

In [0]:
# df_tablasintermedias = []
# df_tablasintermedias.append(dataframe_producto_unico)
# df_tablasintermedias.append(productos_sucursales_pd)

## Cargando las tablas intermedias llenas en Azure Data Lake Storage (Opcional si no se hace el Copy Data)

In [0]:
# Cargar las tablas en formato CSV dentro del Azure Data Lake Storage
#datalake_container = 'sink'
#datalake_account_name = ''
#datalake_access_key = ''

# Escribimos la tabla intermedia llena Producto_Unico
#df_tablasintermedias[0].toPandas().to_csv(f'abfs://{datalake_container}@{datalake_account_name}.dfs.core.windows.net/Producto_Unico.csv',storage_options = {'account_key': #datalake_access_key} ,index=False)

# Escribimos la tabla intermedia llena Productos_Sucursales
#df_tablasintermedias[1].toPandas().to_csv(f'abfs://{datalake_container}@{datalake_account_name}.dfs.core.windows.net/Productos_Sucursales.csv',storage_options = {'account_key': datalake_access_key} ,index=False)

## Cargar las tablas en la base de datos

In [0]:
# Tabla local de databricks
Producto_Unico_DB = f'''
CREATE TABLE IF NOT EXISTS Producto_Unico_DB
USING org.apache.spark.sql.jdbc
OPTIONS (
    url '{jdbcUrl}',
    dbtable '{"dbo.Producto_Unico"}',
    user '{jdbcUsername}',
    password '{jdbcPassword}',
    loginTimeout = 120
    ) '''

#print(createLocalTable)
spark.sql(Producto_Unico_DB)

Out[36]: DataFrame[]

In [0]:
%sql
INSERT OVERWRITE TABLE Producto_Unico_DB SELECT * FROM Producto_Unico_DB where 1=2;

INSERT INTO Producto_Unico_DB
SELECT * FROM Producto_Unico;

In [0]:
# Tabla local de databricks
Productos_Sucursales_DB = f'''
CREATE TABLE IF NOT EXISTS Productos_Sucursales_DB
USING org.apache.spark.sql.jdbc
OPTIONS (
    url '{jdbcUrl}',
    dbtable '{"dbo.Productos_Sucursales"}',
    user '{jdbcUsername}',
    password '{jdbcPassword}',
    loginTimeout = 120
    ) '''

#print(createLocalTable)
spark.sql(Productos_Sucursales_DB)

Out[48]: DataFrame[]

In [0]:
%sql
INSERT OVERWRITE TABLE Productos_Sucursales_DB SELECT * FROM Productos_Sucursales_DB where 1=2;

INSERT INTO Productos_Sucursales_DB
SELECT * FROM Productos_Sucursales_Lleno ORDER BY Cod_Producto;