In [1]:
"""
-Arman Bazarchi-
Formatted_Metadata
Here we ensure the format of only text metadata we had in persistent_landing and move to formatted-zone bucket.
we normalize the data and remove any incomplete data.
connects to minIO, creates formatted-zone bucket, raises an error if pesistant_landing or temporal-zone does not exist.
read data from persistent and normilize then store in formatted-zone, 
here we save a csv file, a .parquet file, a .json file, a schema summary of our text metadata for different uses in future.
it avoids storing duplicate data in formatted-zone, also removes any temporal file in local storage.

"""

from minio import Minio
import pandas as pd
import io, json, os, re, shutil
from datetime import datetime

# ==============================
# 1. Configuration
# ==============================
MINIO_ENDPOINT = "localhost:9000"
ACCESS_KEY = "admin"
SECRET_KEY = "password123"
LANDING_ZONE = "landing-zone"
PERSIST_PREFIX = "persistent_landing"
FORMATTED_ZONE = "formatted-zone"


#  Connect to MinIO
client = Minio(
    MINIO_ENDPOINT,
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    secure=False
)
# break if temporal landing or landing-zone does not exist
if not client.bucket_exists(LANDING_ZONE):
    sys.exit("❌ ERROR: Root bucket 'Landing' does not exist in MinIO.")

persistent_objects = list(client.list_objects(LANDING_ZONE, prefix=f"{PERSIST_PREFIX}/", recursive=False))
if not persistent_objects:
    raise Exception("❌ Required prefix '{PERSIST_PREFIX}/' not found inside 'temporal-landing' bucket.")

# Ensure formatted-zone bucket exists
if not client.bucket_exists(FORMATTED_ZONE):
    client.make_bucket(FORMATTED_ZONE)
    print(f"✅ Created Formatted Zone bucket: {FORMATTED_ZONE}")
else:
    print(f"✅ Formatted Zone bucket already exists: {FORMATTED_ZONE}")

# ==============================
# 2. List all metadata files in Persistent Landing
# ==============================
metadata_objects = [
    obj.object_name for obj in client.list_objects(LANDING_ZONE, prefix=f"{PERSIST_PREFIX}/metadata/", recursive=True)
    if obj.object_name.endswith(".csv") or obj.object_name.endswith(".json")
]

if not metadata_objects:
    raise Exception("⚠️ No metadata files found in Persistent Landing Zone.")

print(f"📂 Found {len(metadata_objects)} metadata files to process.")

# ==============================
# 3. Read all metadata files from Persistent Landing
# ==============================
all_dfs = []

for obj_name in metadata_objects:
    print(f"🔍 Reading: {obj_name}")
    response = client.get_object(LANDING_ZONE, obj_name)
    data = response.read()
    response.close()
    response.release_conn()

    try:
        if obj_name.endswith(".csv"):
            df = pd.read_csv(io.BytesIO(data))
        elif obj_name.endswith(".json"):
            json_data = json.load(io.BytesIO(data))
            df = pd.json_normalize(json_data)
        else:
            continue

        all_dfs.append(df)
    except Exception as e:
        print(f"⚠️ Error reading {obj_name}: {e}")
        continue

if not all_dfs:
    raise Exception("⚠️ No valid metadata could be loaded.")

# ==============================
# 4. Normalize schema
# ==============================
target_columns = [
    "uuid", "kingdom", "phylum", "class", "order", "family",
    "genus", "species", "scientific_name", "common",
    "persistent_path","formatted_path", "image_url"
]

for i, df in enumerate(all_dfs):
    for col in target_columns:
        if col not in df.columns:
            df[col] = None
    all_dfs[i] = df[target_columns]

# Combine all Persistent metadata into one DataFrame
persistent_df = pd.concat(all_dfs, ignore_index=True)

# Generate formatted_path same as persistent_path
persistent_df["formatted_path"] = persistent_df["persistent_path"].str.replace(
    f"{PERSIST_PREFIX}/images", "images", regex=False
)

# ==============================
# 5. Check for existing general metadata in Formatted Zone
# ==============================
general_metadata_files = [
    obj.object_name for obj in client.list_objects(FORMATTED_ZONE, prefix="metadata/", recursive=True)
    if re.match(r"metadata/all_metadata_.*\.csv", obj.object_name)
]

if general_metadata_files:
    # Take the latest general metadata CSV
    general_metadata_files.sort(reverse=True)
    latest_metadata_file = general_metadata_files[0]
    local_existing = "temp_existing_metadata.csv"

    # Download existing metadata
    client.fget_object(FORMATTED_ZONE, latest_metadata_file, local_existing)
    general_df = pd.read_csv(local_existing)

    # Delete the old file from formatted-zone
    client.remove_object(FORMATTED_ZONE, latest_metadata_file)
    # Remove old general formatted files
    general_files_prefix = "metadata/all_metadata"
    schema_files_prefix = "metadata/schema_summary"
    # List all objects in formatted-zone/metadata
    for obj in client.list_objects(FORMATTED_ZONE, prefix="metadata/", recursive=True):
        if obj.object_name.startswith(general_files_prefix) or obj.object_name.startswith(schema_files_prefix):
            client.remove_object(FORMATTED_ZONE, obj.object_name)
            print(f"🗑️ Removed old file: {obj.object_name}")

    # Cleanup local temp
    os.remove(local_existing)
    print(f"✅ Loaded and removed existing general metadata with {len(general_df)} rows")
else:
    # No general metadata exists yet
    general_df = pd.DataFrame(columns=target_columns)
    print("⚠️ No existing general metadata found, creating new one")


# ==============================
# 6. Merge new rows and avoid duplicates
# ==============================
new_rows = persistent_df[~persistent_df["uuid"].isin(general_df["uuid"])]
if not new_rows.empty:
    updated_df = pd.concat([general_df, new_rows], ignore_index=True)
    print(f"✅ Adding {len(new_rows)} new rows to general metadata, total now: {len(updated_df)}")
else:
    updated_df = general_df
    print("⚠️ No new rows to add; general metadata is up to date")


#  Schema summary
schema_summary = pd.DataFrame({
    "column_name": updated_df.columns,
    "dtype": [str(updated_df[c].dtype) for c in updated_df.columns],
    "missing_values": [updated_df[c].isna().sum() for c in updated_df.columns]
})

# ==============================
# 7. Save unified outputs
# ==============================
timestamp = datetime.now().strftime("%Y_%m_%d_%H:%M")
os.makedirs("temp_formatted", exist_ok=True)

local_csv = "temp_formatted/all_metadata.csv"
local_parquet = "temp_formatted/all_metadata.parquet"
local_json = "temp_formatted/all_metadata.json"
local_schema = "temp_formatted/schema_summary.csv"

updated_df.to_csv(local_csv, index=False)
updated_df.to_parquet(local_parquet, index=False)
updated_df.to_json(local_json, orient="records", lines=True)
schema_summary.to_csv(local_schema, index=False)

# Upload to MinIO
client.fput_object(FORMATTED_ZONE, f"metadata/all_metadata_{timestamp}.csv", local_csv, content_type="text/csv")
client.fput_object(FORMATTED_ZONE, f"metadata/all_metadata_{timestamp}.parquet", local_parquet, content_type="application/octet-stream")
client.fput_object(FORMATTED_ZONE, f"metadata/all_metadata_{timestamp}.json", local_json, content_type="application/json")
client.fput_object(FORMATTED_ZONE, f"metadata/schema_summary_{timestamp}.csv", local_schema, content_type="text/csv")

# Cleanup
os.remove(local_csv)
os.remove(local_parquet)
os.remove(local_json)
os.remove(local_schema)
shutil.rmtree("temp_formatted")

print("✅ Formatted metadata updated successfully")


✅ Formatted Zone bucket already exists: formatted-zone
📂 Found 1 metadata files to process.
🔍 Reading: persistent_landing/metadata/Animalia_Squamata_metadata_2025_10_20_00:04.csv
⚠️ No existing general metadata found, creating new one
✅ Adding 4932 new rows to general metadata, total now: 4932
✅ Formatted metadata updated successfully
