In [None]:
%run ../notebooks/includes/Copy-Datasets

In [None]:
# Variables
env = dbutils.widgets.get("environment")  # o la que definas
dataset_bookstore = spark.conf.get("dataset.bookstore")

# Customers (JSON)
spark.sql(f"""
CREATE OR REPLACE TABLE dev_mayoral.bronze_uc_{env}.customers AS
SELECT * 
FROM json.`{dataset_bookstore}/customers-json`
""")

# Orders (Parquet)
spark.sql(f"""
CREATE OR REPLACE TABLE dev_mayoral.bronze_uc_{env}.orders AS
SELECT * 
FROM parquet.`{dataset_bookstore}/orders`
""")

# Books unparsed (CSV)
spark.sql(f"""
CREATE OR REPLACE TABLE dev_mayoral.bronze_uc_{env}.books_unparsed AS
SELECT * 
FROM csv.`{dataset_bookstore}/books-csv`
""")


In [None]:
# Crear vista temporal en Python
books_tmp_vw = spark.read.csv(
    f"{dataset_bookstore}/books-csv/export_*.csv",
    header=True,
    sep=";",
    schema="book_id STRING, title STRING, author STRING, category STRING, price DOUBLE"
)
books_tmp_vw.createOrReplaceTempView("books_tmp_vw")

# Crear tabla final
spark.sql(f"""
CREATE OR REPLACE TABLE dev_mayoral.bronze_uc_{env}.books AS
SELECT * 
FROM books_tmp_vw
""")


In [None]:
from pyspark.sql.functions import expr, col, concat, split, rand, element_at, array
from pyspark.sql.types import StringType

# get_url UDF
def get_url(email):
    return f"https://www.{email.split('@')[1]}"

spark.udf.register(f"get_url_{env}", get_url, StringType())

# get_country UDF
countries = ["EC", "IT", "TR", "ES", "CO", "CH", "NL", "IN", "BR"]
def get_country():
    import random
    return random.choice(countries)

spark.udf.register(f"get_country_{env}", get_country, StringType())

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import random

# -------------------------------
# Definir env y dataset_bookstore
# -------------------------------
env = dbutils.widgets.get("environment")  # Por ejemplo 'dev' o 'prod'
dataset_bookstore = spark.conf.get("dataset.bookstore")

# -------------------------------
# UDF: get_url (seguro para valores nulos)
# -------------------------------
def get_url(email):
    if email is None:
        return None
    return f"https://www.{email.split('@')[1]}"

spark.udf.register(f"get_url_{env}", get_url, StringType())

# -------------------------------
# UDF: get_country (aleatorio)
# -------------------------------
countries = ["EC", "IT", "TR", "ES", "CO", "CH", "NL", "IN", "BR"]

def get_country():
    return random.choice(countries)

spark.udf.register(f"get_country_{env}", get_country, StringType())

# -------------------------------
# Crear/actualizar tabla customers con iso_code
# -------------------------------
spark.sql(f"""
CREATE OR REPLACE TABLE dev_mayoral.bronze_uc_{env}.customers AS
SELECT *, get_country_{env}() AS iso_code
FROM dev_mayoral.bronze_uc_{env}.customers
""")

# -------------------------------
# Seleccionar emails y dominios usando get_url
# -------------------------------
spark.sql(f"""
SELECT email, get_url_{env}(email) AS domain
FROM dev_mayoral.bronze_uc_{env}.customers
""").show()


In [None]:
spark.sql(f"""
INSERT OVERWRITE TABLE dev_mayoral.bronze_uc_{env}.orders
SELECT * FROM parquet.`{dataset_bookstore}/orders`
""")
