#### Montaje del mount

In [0]:
#config rápida
account_name = "datalakeprojectrebrickab"
container_name = "raw"
account_key = ""

mount_point = f"/mnt/{container_name}"

if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{account_name}.blob.core.windows.net",
        mount_point = mount_point,
        extra_configs = {f"fs.azure.account.key.{account_name}.blob.core.windows.net": account_key}
    )
    print("Montaje realizado exitosamente")
else:
    print(f"El punto {mount_point} ya está montado")

El punto /mnt/raw ya está montado


In [0]:
#configuracion de parametros que vienen de ADF
dbutils.widgets.text("Year", "")
dbutils.widgets.text("Month", "")
dbutils.widgets.text("Day", "")

year = dbutils.widgets.get("Year")
month = dbutils.widgets.get("Month")
day = dbutils.widgets.get("Day")

#### Carga de datos

In [0]:
file_path = f"/mnt/raw/rebrickable/LEGO_Catalog_Database/year={year}/month={month}/day={day}/"
csv_files = [f for f in dbutils.fs.ls(file_path) if f.name.endswith('.csv')]

dataframes = {}

for file_info in csv_files:
    file_name = file_info.name.replace('.csv','')
    file_path = file_info.path
    
    dataframes[file_name] = spark.read.csv(file_path, header=True, inferSchema=True)

    print(f"DataFrame '{file_name}' cargado con {dataframes[file_name].count()} filas")

DataFrame 'colors' cargado con 272 filas
DataFrame 'elements' cargado con 102068 filas
DataFrame 'inventories' cargado con 42124 filas
DataFrame 'inventory_minifigs' cargado con 23331 filas
DataFrame 'inventory_parts' cargado con 1352622 filas
DataFrame 'inventory_sets' cargado con 4686 filas
DataFrame 'minifigs' cargado con 15479 filas
DataFrame 'part_categories' cargado con 67 filas
DataFrame 'part_relationships' cargado con 32987 filas
DataFrame 'parts' cargado con 57647 filas
DataFrame 'sets' cargado con 24849 filas
DataFrame 'themes' cargado con 476 filas


In [0]:
file_path = f"/mnt/raw/rebrickable/oltp_db/year={year}/month={month}/day={day}/"
csv_files = [f for f in dbutils.fs.ls(file_path) if f.name.endswith('.txt')]

dataframes1 = {}

for file_info in csv_files:
    file_name = file_info.name.replace('.txt','')
    file_path = file_info.path
    
    dataframes1[file_name] = spark.read.csv(file_path, header=True, inferSchema=True)

    print(f"DataFrame '{file_name}' cargado con {dataframes1[file_name].count()} filas")

DataFrame 'dbo.order_details' cargado con 1000 filas
DataFrame 'dbo.orders' cargado con 1000 filas
DataFrame 'dbo.reviews' cargado con 1000 filas
DataFrame 'dbo.sets' cargado con 5937 filas
DataFrame 'dbo.shipments' cargado con 1000 filas
DataFrame 'dbo.users' cargado con 1000 filas


#### Transformaciones

In [0]:
for df in dataframes.keys():
    print(f'nombre dataframe: {df}')
    dataframes[df].printSchema()

nombre dataframe: colors
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- rgb: string (nullable = true)
 |-- is_trans: boolean (nullable = true)
 |-- num_parts: integer (nullable = true)
 |-- num_sets: integer (nullable = true)
 |-- y1: integer (nullable = true)
 |-- y2: integer (nullable = true)

nombre dataframe: elements
root
 |-- element_id: long (nullable = true)
 |-- part_num: string (nullable = true)
 |-- color_id: integer (nullable = true)
 |-- design_id: integer (nullable = true)

nombre dataframe: inventories
root
 |-- id: integer (nullable = true)
 |-- version: integer (nullable = true)
 |-- set_num: string (nullable = true)

nombre dataframe: inventory_minifigs
root
 |-- inventory_id: integer (nullable = true)
 |-- fig_num: string (nullable = true)
 |-- quantity: integer (nullable = true)

nombre dataframe: inventory_parts
root
 |-- inventory_id: integer (nullable = true)
 |-- part_num: string (nullable = true)
 |-- color_id: integer (nullabl

In [0]:
for df in dataframes.keys():
    dataframes[df]=dataframes[df].dropna().dropDuplicates()


dataframes['colors']=dataframes['colors'].drop('y1','y2')
dataframes['inventory_parts']=dataframes['inventory_parts'].drop('img_url')
dataframes['minifigs']=dataframes['minifigs'].drop('img_url')
dataframes['sets']=dataframes['sets'].drop('img_url')


In [0]:
for df in dataframes1.keys():
    print(f'nombre dataframe: {df}')
    dataframes1[df].printSchema()
    

nombre dataframe: dbo.order_details
root
 |-- order_detail_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- set_num: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

nombre dataframe: dbo.orders
root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- total_amount: double (nullable = true)

nombre dataframe: dbo.reviews
root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- set_num: string (nullable = true)
 |-- rating: integer (nullable = true)

nombre dataframe: dbo.sets
root
 |-- set_num: string (nullable = true)
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- theme_id: integer (nullable = true)
 |-- num_parts: integer (nullable = true)
 |-- img_url: string (nullable = true)

nombre dataframe: dbo.shipments
root
 |-- ship_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- city_ship: str

In [0]:
import pyspark.sql.functions as F

for df in dataframes1.keys():
    dataframes1[df]=dataframes1[df].dropna().dropDuplicates()

dataframes1['dbo.order_details']=dataframes1['dbo.order_details'].filter(F.col('price')> 0)


#aqui borre lo de crear esa columna de fechas en dbo.orders_details, habria q ver si la columna esta en formato date...



dataframes1['dbo.orders']=dataframes1['dbo.orders'].drop('total_amount') #total_amount no corresponde a la suma price*quantity (lo correcto esta en la tabla order_details)
dataframes1['dbo.reviews']=dataframes1['dbo.reviews'].filter((F.col('rating')>= 1) & (F.col('rating')<= 5))
dataframes1['dbo.sets']=dataframes1['dbo.sets'].drop('img_url')
dataframes1['dbo.shipments'] = dataframes1['dbo.shipments'].withColumn("country_ship",
    F.when(F.col("country_ship").isin(["Canada", "Mexico"]), "USA")
     .otherwise(F.col("country_ship")))



#### Escribir resultados en silver layer

In [0]:
#config rápida
account_name = "datalakeprojectrebrickab"
container_name = "silver"
account_key = ""

mount_point = f"/mnt/{container_name}"

if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{account_name}.blob.core.windows.net",
        mount_point = mount_point,
        extra_configs = {f"fs.azure.account.key.{account_name}.blob.core.windows.net": account_key}
    )
    print("Montaje realizado exitosamente")
else:
    print(f"El punto {mount_point} ya está montado")

El punto /mnt/silver ya está montado


In [0]:
for df in dataframes.keys():
    dataframes[df].write.mode('overwrite').format('delta').save(f"/mnt/silver/rebrickable/LEGO_Catalog_Database_cleaned/{df}")

for df in dataframes1.keys():
    dataframes1[df].write.mode('overwrite').format('delta').option("mergeSchema", "true").save(f"/mnt/silver/rebrickable/oltp_db_cleaned/{df}")


#aqui es para un full load, para un incremental load en la oltp deberia decir appendd o algo asi... en mode('overwrite)