# Landing Zone

This notebook contains the different steps involved steps in the the extraction of data and its storage in the landing zone of our data management pipeline. Particularly, the following scripts are responsible of the following tasks:
1. Environment setup, such as the preparation of the data lake (based on MinIO)
2. Obtaining of data from datasources
3. Raw data storaging into the temporal landing 
4. Data shipment from temporal landing to persistent landing

## Environment Setup
Before starting to get data from datasources it is needed to prepare our temporal landing. As said before, we will be using MinIO, an S3-compatible object storage implementation, as a data lake to store data as it comes. Before continuing, ensure that a MinIO instance is up and running, which can be done easily with Docker Compose (see the main [README](../../README.md) file for more information).

First of all, we will connect to the MinIo instance, create a bucket for the landing zone and a subfolder that will correspond to the temporal landing. To interact with MinIO programatically we will use the [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-creating-buckets.html) Python library.

In [40]:
import boto3

access_key_id = "minioadmin"
secret_access_key = "minioadmin"

minio_url = 'http://localhost:9000'

minio_client = boto3.client('s3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=secret_access_key,
    endpoint_url=minio_url
)

bucket = "landing-zone"
try:
    minio_client.head_bucket(Bucket=bucket)
    print(f"Bucket '{bucket}' already exists")
except ClientError:
    print(f"Creando bucket: {bucket}")
    minio_client.create_bucket(Bucket=bucket)


Bucket 'landing-zone' already exists


In [24]:
import pandas as pd

# Login using e.g. `huggingface-cli login` to access this dataset
df = pd.read_json("hf://datasets/Moaaz55/skin_cancer_questions_answers/dataset.json", lines=True)

def limpiar_dataset(df, columna='Answer'):
    # Convertir a string y limpiar
    df_temp = df.copy()
    df_temp[columna] = df_temp[columna].astype(str)
    
    # filter valid answers.
    df_limpio = df_temp[
        df_temp[columna].notna() &
        (df_temp[columna].str.strip() != '') &
        (df_temp[columna].str.strip() != 'nan') &
        (df_temp[columna].str.strip() != 'None') &
        (df_temp[columna].str.strip() != 'null') &
        (df_temp[columna].str.len() > 10)  # Mínimo 10 caracteres
    ]
    
    print(f" Limpieza completada:")
    print(f"  Original: {len(df)} filas")
    print(f"  Limpio: {len(df_limpio)} filas")
    print(f"  Eliminadas: {len(df) - len(df_limpio)} filas")
    
    return df_limpio


df = limpiar_dataset(df)
df = df.sample(n=100, random_state=42)
df_text = df
df_text = df.apply(lambda row: f"Q: {row['Question']}\nA: {row['Answer']}\n", axis=1)
df_text
with open("preguntas_respuestas.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(df_text.tolist()))


 Limpieza completada:
  Original: 798 filas
  Limpio: 757 filas
  Eliminadas: 41 filas


In [8]:
# CONVERT THE TEXT TO AUDIO WITH THE TT'S LIBRARY
import os
import io
import hashlib
from datetime import datetime
from gtts import gTTS
from mutagen.mp3 import MP3

# CONFIGRATION PARAMETERS
OUT_DIR = "output_audio"
LANG = "es"
TEXT_COL = "Answer"

# CREATE AUDIO AND METADATA DIRECTORIES
def ensure_dirs(root):
    audio_dir = os.path.join(root, "audio")
    metadata_dir = os.path.join(root, "metadata")
    os.makedirs(audio_dir, exist_ok=True)
    os.makedirs(metadata_dir, exist_ok=True)
    return audio_dir, metadata_dir

# FUNCTION TO GENERATE MD5 HASH (AVOID DUPLICATES)
def md5(s: str) -> str:
    return hashlib.md5(s.encode("utf-8")).hexdigest()

# CONVERT TEXT TO AUDIO BYTES
def tts_bytes(text: str, lang: str) -> bytes:
    buf = io.BytesIO()
    gTTS(text=text, lang=lang).write_to_fp(buf)
    buf.seek(0)
    return buf.read()

# GET MP3 DURATION
def mp3_duration(b: bytes):
    try:
        return float(MP3(io.BytesIO(b)).info.length)
    except Exception:
        return None

# MAIN FUNCTION TO CONVERT ANSWERS TO AUDIO
def answers_to_audio(df: pd.DataFrame):
    audio_dir, meta_dir = ensure_dirs(OUT_DIR)
    df_out = df.copy()

    # Add columns for audio metadata
    df_out["Answer_audio_relpath"] = None
    df_out["Answer_duration_sec"] = None
    df_out["Answer_size_bytes"] = None
    df_out["Answer_text_md5"] = None

    total = len(df_out)
    for i, text in enumerate(df_out[TEXT_COL].astype(str)):
        if not text.strip():
            continue

        h = md5(text)
        filename = f"answer_{h}.mp3"
        abspath = os.path.join(audio_dir, filename)
        relpath = os.path.join("audio", filename)

        # Only generate if it doesn't exist
        if not os.path.exists(abspath):
            mp3 = tts_bytes(text, LANG)
            with open(abspath, "wb") as f:
                f.write(mp3)
            size = len(mp3)
            dur = mp3_duration(mp3)
        else:
            with open(abspath, "rb") as f:
                data = f.read()
            size = len(data)
            dur = mp3_duration(data)

        df_out.at[i, "Answer_audio_relpath"] = relpath.replace("\\", "/")
        df_out.at[i, "Answer_duration_sec"] = dur
        df_out.at[i, "Answer_size_bytes"] = size
        df_out.at[i, "Answer_text_md5"] = h

        if (i+1) % 50 == 0 or i+1 == total:
            print(f"[{i+1}/{total}] {relpath} ({dur:.2f}s)")

    # Save enriched dataset
    today = datetime.utcnow().strftime("%Y-%m-%d")
    out_latest = os.path.join(meta_dir, "answers_dataset-latest.parquet")
    out_dated = os.path.join(meta_dir, f"answers_dataset-{today}.parquet")

    df_out.to_parquet(out_latest, index=False)
    df_out.to_parquet(out_dated, index=False)

    print("\n Listo")
    print("Audios →", os.path.abspath(audio_dir))
    print("Nuevo dataset →", out_latest)
    return df_out

print(" Funciones de conversión a audio cargadas correctamente")

 Funciones de conversión a audio cargadas correctamente


In [55]:
df_amb_audio = answers_to_audio(df)

[50/100] audio/answer_052634ca40f8ef4a37421a33775ca14d.mp3 (3.79s)
[100/100] audio/answer_3c4ffb08352ccac2ed9478f99c0c2418.mp3 (3.55s)

 Listo
Audios → /Users/carlesaguilera/Desktop/ADSDB/notebooks/landing_zone/output_audio/audio
Nuevo dataset → output_audio/metadata/answers_dataset-latest.parquet


In [56]:
df_amb_audio
# LET'S CLEAN THE DATA, AND FINALLY TAKE THE ONES THAT ARE REALLY COMPLETED.
df_amb_audio = df_amb_audio.dropna()
df_amb_audio

# PRERPARE AUDIOS FOR THE PUSH TO MINIO. 

audio = df_amb_audio["Answer_audio_relpath"]
audio

97    audio/answer_04fa859c680f344018eade336e06fef5.mp3
78    audio/answer_1f8e49d8f875f9c122ef1c8fce7e9682.mp3
54    audio/answer_58d0f1f405489d61fbf7cf9eb7936942.mp3
39    audio/answer_46bbf2aaac121735d55561672a1d228a.mp3
66    audio/answer_e08a77c07bc54b170cb7cbf96b8fd0ac.mp3
33    audio/answer_c0d2ee7dffc8995b2719e6f57bc69bff.mp3
77    audio/answer_22138b57e53bc9f98ea9f00f2c3a2e09.mp3
10    audio/answer_e4ad6acc62ab285c11023019a6f8072a.mp3
81    audio/answer_676ccac3f2890a63d633a797fe7f831d.mp3
31    audio/answer_03c43a00e72c214fe7d11957835e4445.mp3
65    audio/answer_5b3af1fe3cb2959b0abeb4d4711b2bb7.mp3
76    audio/answer_45a2d7224bf20a670e1216ae9abce958.mp3
72    audio/answer_3c91257eeb46e6e23997462e35cb29d2.mp3
63    audio/answer_55dfef06dbc3f01fb0f6156d00a7afaa.mp3
49    audio/answer_052634ca40f8ef4a37421a33775ca14d.mp3
7     audio/answer_49737cdeb9605126ae4a323d5a946c88.mp3
23    audio/answer_cb21df667b30b341660a32aee586f632.mp3
Name: Answer_audio_relpath, dtype: object

In [17]:
df_amb_audio['Answer_audio_relpath']
total_respuestas = len(df_amb_audio)
audios_generados = df_amb_audio['Answer_audio_relpath'].notna().sum()
respuestas_vacias = df_amb_audio['Answer_audio_relpath'].isna().sum()

print(f"📊 Statistics:")
print(f"  Total respuestas: {total_respuestas}")
print(f"  Audios generados: {audios_generados}")
print(f"  Respuestas vacías: {respuestas_vacias}")
print(f"  Tasa de éxito: {audios_generados/total_respuestas*100:.1f}%")

📊 Estadísticas:
  Total respuestas: 17
  Audios generados: 17
  Respuestas vacías: 0
  Tasa de éxito: 100.0%


In [None]:
from transformers import AutoModelForVision2Seq, AutoProcessor
model_id = "abaryan/DrDiag_qwen2vl_Ham10000"

In [2]:
from datasets import load_dataset
# Login using e.g. `huggingface-cli login` to access this dataset
ds = load_dataset("abaryan/ham10000_bbox")

  from .autonotebook import tqdm as notebook_tqdm


In [51]:

# HERE WE'RE GOING TO GET THE DATA FROM THE DATA SET OF HAM10000.

# THE PREPARATION CONSISTS OF GETTING THE IMAGES AND THE TABULAR DATA AND SPLIT THEM TO DIFFERENT LOCATIONS IN MINIO.

data = ds['train']  #HERE WE GET THE DATA FROM THE DATA SET, IN THAT CASE DUE TO THE DATASET HAVE TO PARTS.
                    #ONE FROM THE TRAIN AND THE OTHER FROM THE TEST, WE'LL USE THE TRAIN.
# NOW WE GET THE IMAGES AND THE TABULAR DATA.
images = data['image'] # HERE WE GET THE IMAGES FROM THE DATASET.
tabular_data = data.remove_columns('image') # HERE WE GET THE TABULAR DATA FROM THE DATASET.

# NOW WE PRINT THE FIRST 3 IMAGES AND THE FIRST 3 ROWS OF THE TABULAR DATA.

# NOW WE'RE GOING TO PUSH THE DATA TO MINIO.
images


Column([<PIL.PngImagePlugin.PngImageFile image mode=RGB size=600x450 at 0x1300B1540>, <PIL.PngImagePlugin.PngImageFile image mode=RGB size=600x450 at 0x1300B15D0>, <PIL.PngImagePlugin.PngImageFile image mode=RGB size=600x450 at 0x1300B1600>, <PIL.PngImagePlugin.PngImageFile image mode=RGB size=600x450 at 0x1300B1630>, <PIL.PngImagePlugin.PngImageFile image mode=RGB size=600x450 at 0x1300B14E0>])

In [34]:
# =====================================================
# DATA LAKE EN MINIO - ESTRUCTURA ORGANIZADA
# =====================================================

import boto3
import pandas as pd
import os
import json
from io import BytesIO
from PIL import Image
import io

minio_client = boto3.client('s3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=secret_access_key,
    endpoint_url=minio_url
)

print("Cliente MinIO configurado correctamente")



Cliente MinIO configurado correctamente


In [41]:
bucket = "landing-zone"
try:
    minio_client.head_bucket(Bucket=bucket)
    print(f"Bucket '{bucket}' already exists.")
except ClientError:
    print(f"Creando bucket: {bucket}")
    minio_client.create_bucket(Bucket=bucket)
    

Bucket 'landing-zone' ya existe


In [42]:
minio_client.upload_file(Filename="preguntas_respuestas.txt", Bucket= bucket, Key = "preguntas_respuestas_bucket.txt")

In [None]:
tabular_data = tabular_data.to_pandas()

In [49]:


tabular_data = tabular_data.to_csv(index=False).encode("utf-8")

minio_client.put_object( 
    Bucket = "landing-zone",
    Key="tabular_data.csv",
    Body = io.BytesIO(tabular_data)
)

{'ResponseMetadata': {'RequestId': '186CEDD4BB4EA361',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'etag': '"ec7d7119b1ed058dbb28471f7070c9e4"',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-checksum-crc32': '+iWcPg==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '186CEDD4BB4EA361',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '1015',
   'x-ratelimit-remaining': '1015',
   'x-xss-protection': '1; mode=block',
   'date': 'Thu, 09 Oct 2025 20:50:52 GMT'},
  'RetryAttempts': 0},
 'ETag': '"ec7d7119b1ed058dbb28471f7070c9e4"',
 'ChecksumCRC32': '+iWcPg==',
 'ChecksumType': 'FULL_OBJECT'}

In [53]:
import io

for i, img in enumerate(images):
    img_bytes = io.BytesIO()
    img.save(img_bytes, format="PNG") 
    img_bytes.seek(0)

    image_name = f"imagen_{i}.png"

    minio_client.put_object(
        Bucket="landing-zone",
        Key=f"{image_name}",
        Body=img_bytes
    )


In [59]:
import os
from tqdm import tqdm

bucket = "landing-zone"


try:
    minio_client.head_bucket(Bucket=bucket)
    print(f"Bucket '{bucket}' ja existeix")
except:
    print(f"Creant bucket: {bucket}")
    minio_client.create_bucket(Bucket=bucket)

# Iterar sobre cada ruta d'àudio del DataFrame
for path in tqdm(df_amb_audio["Answer_audio_relpath"], desc="Pujant àudios"):
    if os.path.exists(path):  # comprovar que el fitxer realment existeix
        audio_name = os.path.basename(path)  # ex: answer_04fa859c6....mp3

        # Pujar l'àudio directament al bucket
        minio_client.upload_file(
            Filename=path,
            Bucket=bucket,
            Key=f"{audio_name}"
        )
    else:
        print(f"Fitxer no trobat: {path}")

print("Tots els àudios pujats correctament a MinIO")


Bucket 'landing-zone' ja existeix


Pujant àudios: 100%|██████████| 17/17 [00:00<00:00, 130.24it/s]

Tots els àudios pujats correctament a MinIO



