<a href="https://colab.research.google.com/github/elephant-xyz/photo-meta-data-ai/blob/main/Mining_Photo_MetaData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


#Photo Mining Process



In [None]:
# @title Step 1: Upload .env

In [None]:
# @title Step 2: Upload county-results.csv

In [None]:
# @title Step 3: Upload seed-results.csv


In [None]:
# @title Step 4: Upload your images as a .zip file named with the parcel ID, like 52434205310037080.zip.

In [62]:
from ctypes import c_void_p
# @title Step 2: Prepare
# @title  {"vertical-output":true}
import os
import subprocess
import shutil
import re
import sys
import csv
import json
import requests
from pathlib import Path


def fetch_schema_cids():
    """Fetch the seed and county schema CIDs from the schema manifest API"""
    manifest_url = "https://lexicon.elephant.xyz/json-schemas/schema-manifest.json"

    try:
        response = requests.get(manifest_url, timeout=30)
        response.raise_for_status()

        manifest_data = response.json()

        schema_cids = {}

        # Extract Seed data group CID
        if "Seed" in manifest_data:
            seed_cid = manifest_data["Seed"]["ipfsCid"]
            schema_cids["seed"] = seed_cid

        # Extract County data group CID
        if "County" in manifest_data:
            county_cid = manifest_data["County"]["ipfsCid"]
            schema_cids["county"] = county_cid

        return schema_cids

    except Exception as e:
        print(f"Error fetching schema manifest: {e}")
        raise

def extract_images(parcel_id):
    """Extract JPG files from parcel zip, skip macOS files"""
    zip_path = f"/content/{parcel_id}.zip"
    extract_path = f"images/{parcel_id}"
    temp_path = f"/tmp/{parcel_id}_temp"

    if not os.path.exists(zip_path):
        return None

    os.makedirs(extract_path, exist_ok=True)
    os.makedirs(temp_path, exist_ok=True)

    try:
        # Extract only JPG files, exclude macOS files
        subprocess.run([
            'unzip', '-j', '-o', zip_path, '-d', temp_path,
            '*.jpg', '*.jpeg', '*.JPG', '*.JPEG',
            '-x', '__MACOSX/*', '*.DS_Store'
        ], capture_output=True, text=True)

        extracted_count = 0
        if os.path.exists(temp_path):
            for file in os.listdir(temp_path):
                if file.lower().endswith(('.jpg', '.jpeg')):
                    source = os.path.join(temp_path, file)
                    target = os.path.join(extract_path, file)
                    if os.path.exists(source):
                        shutil.copy2(source, target)
                        extracted_count += 1

        if os.path.exists(temp_path):
            shutil.rmtree(temp_path)

        return extract_path if extracted_count > 0 else None

    except Exception:
        if os.path.exists(temp_path):
            shutil.rmtree(temp_path)
        return None


def ensure_directory(file_path):
    """Ensure the directory for the file exists"""
    directory = os.path.dirname(file_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)


def create_parcel_folder(parcel_id):
    # Create folder name based on parcel_id
    clean_parcel_id = re.sub(r"[^\w\-_]", "_", str(parcel_id))
    folder_name = f"output/{clean_parcel_id}"
    ensure_directory(folder_name + "/")
    return folder_name


def install_photo_meta_data_ai():
    """Install photo-meta-data-ai package from GitHub"""
    try:
        result = subprocess.run([
            sys.executable, '-m', 'pip', 'install',
            '--force-reinstall', '--no-cache-dir',
            'git+https://github.com/elephant-xyz/photo-meta-data-ai.git'
        ], capture_output=True, text=True, timeout=300)

        with open('/content/install_log.txt', 'w') as f:
            f.write("STDOUT:\n")
            f.write(result.stdout)
            f.write("\nSTDERR:\n")
            f.write(result.stderr)
            f.write(f"\nReturn code: {result.returncode}")

        return result.returncode == 0

    except subprocess.TimeoutExpired:
        return False
    except Exception:
        return False


def copy_images_to_output():
   """Copy images from images/<parcel_id>/ to output/<parcel_id>/"""
   try:
       source_dir = "/content/images"
       output_dir = "/content/output"

       if os.path.exists(source_dir):
           shutil.copytree(source_dir, output_dir, dirs_exist_ok=True)
           return True
       return False
   except:
       return False


def extract_datacids_from_csv(csv_path):
    """Extract all dataCIDs from a CSV file."""
    datacids = []

    with open(csv_path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            if 'dataCid' in row and row['dataCid']:
                datacid = row['dataCid'].strip()
                if datacid:
                    datacids.append(datacid)

    return datacids


def traverse_and_download(cids, destination_folder):
    """
    Download CIDs and all related CIDs by traversing relationships.
    Creates folder structure: destination_folder/cid/ for each CID in the list.

    Args:
        cids (list): List of root CIDs to start traversing from
        destination_folder (str): The destination folder name
    """
    total_downloaded = 0

    for cid in cids:
        downloaded = set()

        # Create folder structure for this CID
        main_folder = Path(f"{destination_folder}/{cid}")
        main_folder.mkdir(parents=True, exist_ok=True)

        def traverse_cid(current_cid):
            """Recursively fetch CID and traverse relationships"""
            if current_cid in downloaded:
                return



            try:
                response = requests.get(f"https://ipfs.io/ipfs/{current_cid}", timeout=30)
                data = response.json()
                # Save the JSON file
                file_path = main_folder / f"{current_cid}.json"
                with open(file_path, 'w') as f:
                    json.dump(data, f, indent=2)

                downloaded.add(current_cid)

                # Look for relationships
                if "relationships" in data:
                    for key, value in data["relationships"].items():
                        if value is None:
                            # Skip null relationships
                            continue
                        elif isinstance(value, dict) and "/" in value:
                            # Single CID reference
                            related_cid = value["/"]
                            traverse_cid(related_cid)
                        elif isinstance(value, list):
                            # Array of CID references
                            for i, item in enumerate(value):
                                if isinstance(item, dict) and "/" in item:
                                    related_cid = item["/"]
                                    traverse_cid(related_cid)

                # Look for "from" and "to" fields
                for field in ["from", "to"]:
                    if field in data:
                        value = data[field]
                        if isinstance(value, dict) and "/" in value:
                            # Single CID reference
                            related_cid = value["/"]
                            traverse_cid(related_cid)
                        elif isinstance(value, list):
                            # Array of CID references
                            for item in value:
                                if isinstance(item, dict) and "/" in item:
                                    related_cid = item["/"]
                                    traverse_cid(related_cid)

            except Exception as e:
                print(f"Error downloading CID {current_cid}: {e}")

        # Start traversal for this root CID
        traverse_cid(cid)

        total_downloaded += len(downloaded)



import json
from pathlib import Path

import json
import os
from pathlib import Path

import json
import os
from pathlib import Path

def rename_and_update_references(cids, parent_folder):
    """
    Rename JSON files to meaningful names and update all CID references to relative paths.
    Keep the root CID filename unchanged. Can process single CID or list of CIDs.

    Args:
        cids (str or list): Single CID or list of CIDs to process
        parent_folder (str): The parent folder path (e.g., 'seed')
    """
    # Convert single CID to list for uniform processing
    if isinstance(cids, str):
        cids = [cids]

    total_processed = 0

    for cid in cids:

        # Construct the full folder path
        folder = Path(f"{parent_folder}/{cid}")

        if not folder.exists():
            continue

        # The root file should match the CID
        root_file = folder / f"{cid}.json"

        if not root_file.exists():
            continue

        # Track CID to new filename mapping for this folder
        cid_to_filename = {}
        processed_files = set()

        # Track naming counters to handle duplicates
        name_counters = {}

        # Root CID keeps its original name
        cid_to_filename[cid] = f"{cid}.json"

        def get_meaningful_name(file_cid, relationship_key=None, file_data=None, index=None):
          """Generate meaningful filename based on content or relationship"""
          base_name = None

          # Always prioritize content-based naming for specific types
          if file_data:
              if "parcel_identifier" in file_data:
                  base_name = "property"
              elif "parcel_id" in file_data:
                  base_name = "property_seed"
              elif "full_address" in file_data:
                  base_name = "address_data"
              elif "from" in file_data or "to" in file_data:
                  base_name = "connection"
              elif "label" in file_data and "Seed" in file_data["label"]:
                  schema_cids = fetch_schema_cids()
                  base_name = schema_cids["seed"]
              elif "label" in file_data and "County" in file_data["label"]:
                  schema_cids = fetch_schema_cids()
                  base_name = schema_cids["seed"]

          # Only use relationship key if no content pattern matched
          if not base_name and relationship_key:
              if index is not None:
                  base_name = f"{relationship_key}_{index}"
              else:
                  base_name = relationship_key

          # Final fallback to shortened CID
          if not base_name:
              base_name = file_cid[:8]

          # Handle duplicate names by adding counter
          if base_name in name_counters:
              name_counters[base_name] += 1
              final_name = f"{base_name}_{name_counters[base_name]}"
          else:
              name_counters[base_name] = 0
              final_name = base_name
          return f"{final_name}.json"

        def process_file(file_cid, relationship_key=None, index=None):
            """Process a single file and its references"""
            if file_cid in processed_files:
                return cid_to_filename.get(file_cid)

            file_path = folder / f"{file_cid}.json"
            if not file_path.exists():
                return None

            # Load file content
            with open(file_path, 'r') as f:
                data = json.load(f)

            # Determine new filename (skip if it's the root CID)
            new_filename = get_meaningful_name(file_cid, relationship_key, data, index)
            cid_to_filename[file_cid] = new_filename

            processed_files.add(file_cid)

            # Process relationships in the current file
            if "relationships" in data:
                for key, value in data["relationships"].items():
                    if value is None:
                        # Skip null relationships
                        continue
                    elif isinstance(value, dict) and "/" in value:
                        # Single CID reference
                        related_cid = value["/"]
                        if not related_cid.startswith("./"):  # Only process actual CIDs, not already converted paths
                            process_file(related_cid, key)
                    elif isinstance(value, list):
                        # Multiple CID references
                        for i, item in enumerate(value):
                            if isinstance(item, dict) and "/" in item:
                                related_cid = item["/"]
                                if not related_cid.startswith("./"):  # Only process actual CIDs
                                    process_file(related_cid, key, i)

            # Process "from" and "to" fields
            for field in ["from", "to"]:
                if field in data:
                    value = data[field]
                    if isinstance(value, dict) and "/" in value:
                        related_cid = value["/"]
                        if not related_cid.startswith("./"):  # Only process actual CIDs
                            process_file(related_cid, field)
                    elif isinstance(value, list):
                        for i, item in enumerate(value):
                            if isinstance(item, dict) and "/" in item:
                                related_cid = item["/"]
                                if not related_cid.startswith("./"):  # Only process actual CIDs
                                    process_file(related_cid, field, i)

            return cid_to_filename.get(file_cid)

        # Start processing from root file
        process_file(cid)

        # Now update all references and rename files
        for file_cid, new_filename in cid_to_filename.items():
            old_file_path = folder / f"{file_cid}.json"
            new_file_path = folder / new_filename

            if not old_file_path.exists():
                continue

            # Load and update file content
            with open(old_file_path, 'r') as f:
                data = json.load(f)

            # Update references in this file
            if "relationships" in data:
                for key, value in data["relationships"].items():
                    if isinstance(value, dict) and "/" in value:
                        # Single CID reference
                        referenced_cid = value["/"]
                        if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                            data["relationships"][key] = {"/": f"./{cid_to_filename[referenced_cid]}"}
                    elif isinstance(value, list):
                        # Multiple CID references
                        updated_list = []
                        for item in value:
                            if isinstance(item, dict) and "/" in item:
                                referenced_cid = item["/"]
                                if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                                    updated_list.append({"/": f"./{cid_to_filename[referenced_cid]}"})
                                else:
                                    updated_list.append(item)
                            else:
                                updated_list.append(item)
                        data["relationships"][key] = updated_list

            # Update "from" and "to" fields
            for field in ["from", "to"]:
                if field in data:
                    value = data[field]
                    if isinstance(value, dict) and "/" in value:
                        referenced_cid = value["/"]
                        if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                            data[field] = {"/": f"./{cid_to_filename[referenced_cid]}"}
                    elif isinstance(value, list):
                        updated_list = []
                        for item in value:
                            if isinstance(item, dict) and "/" in item:
                                referenced_cid = item["/"]
                                if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                                    updated_list.append({"/": f"./{cid_to_filename[referenced_cid]}"})
                                else:
                                    updated_list.append(item)
                            else:
                                updated_list.append(item)
                        data[field] = updated_list

            # Save updated content to new file
            with open(new_file_path, 'w') as f:
                json.dump(data, f, indent=2)

            # Remove old file if name changed
            if old_file_path != new_file_path:
                old_file_path.unlink()

        total_processed += len(processed_files)




def main():
    parcel_id = "52434205310037080" # @param {"type":"string"}

    extract_images(parcel_id)
    create_parcel_folder(parcel_id)
    install_photo_meta_data_ai()
    copy_images_to_output()
    datacid = extract_datacids_from_csv("seed-results.csv")
    countydatacid = extract_datacids_from_csv("county-results.csv")
    traverse_and_download(datacid, "seed")
    traverse_and_download(countydatacid, "county")
    rename_and_update_references(datacid, "seed")
    rename_and_update_references(countydatacid, "county")



if __name__ == "__main__":
    main()



In [63]:
# @title Step 5: Transform
! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv
import os
import shutil
import zipfile
from pathlib import Path
from datetime import datetime



def get_photo_cid_and_html_link(path="photo-results.csv"):
    """Get photo CID and HTML link from upload results CSV"""
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["dataGroupCid"], first_row["htmlLink"]


def has_submit_errors(path="submit_errors.csv"):
    """
    Returns True if submit_errors.csv has at least one row (after header).
    """
    try:
        with open(path, newline='', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            return next(reader, None) is not None
    except FileNotFoundError:
        return False



def process_photo_data(input_dir="output", property_filename="my_property.json"):
    """Step 1: Process photo data"""
    os.chdir("/content")

    try:
        cmd = f"process-photo-data --input-dir {input_dir} --property-filename {property_filename}"
        subprocess.run(cmd, shell=True, check=True)
        return True
    except:
        return False

def copy_group_to_output(output_folder="output", seed_folder="seed", rename_to_cid=True):

    if not os.path.exists(output_folder) or not os.path.exists(seed_folder):
        return False

    try:
        # Get all parcel ID folders in output
        output_parcel_folders = [f for f in os.listdir(output_folder)
                               if os.path.isdir(os.path.join(output_folder, f))]

        # Get all CID folders in seed
        seed_cid_folders = [f for f in os.listdir(seed_folder)
                          if os.path.isdir(os.path.join(seed_folder, f))]

        # Create mapping by reading JSON files in seed folders
        parcel_to_cid_mapping = {}

        for cid in seed_cid_folders:
            cid_folder_path = os.path.join(seed_folder, cid)

            # Look for JSON files in the CID folder
            for file in os.listdir(cid_folder_path):
                if file.endswith('.json'):
                    json_file_path = os.path.join(cid_folder_path, file)
                    try:
                        with open(json_file_path, 'r') as f:
                            data = json.load(f)

                        # Look for parcel_id or parcel_identifier in the JSON data
                        parcel_id = None
                        if 'parcel_id' in data:
                            parcel_id = str(data['parcel_id']).strip()
                        elif 'parcel_identifier' in data:
                            parcel_id = str(data['parcel_identifier']).strip()

                        if parcel_id:
                            # Normalize parcel ID by removing dashes
                            normalized_parcel_id = parcel_id.replace('-', '')
                            if normalized_parcel_id:
                                parcel_to_cid_mapping[normalized_parcel_id] = cid
                                break
                    except:
                        continue

        # Process each parcel folder
        for parcel_id in output_parcel_folders:
            # Normalize the parcel folder name (remove dashes) to match our mapping
            normalized_parcel_id = parcel_id.replace('-', '')

            # Find corresponding CID
            corresponding_cid = parcel_to_cid_mapping.get(normalized_parcel_id)

            if corresponding_cid and corresponding_cid in seed_cid_folders:
                # Paths
                output_parcel_path = os.path.join(output_folder, parcel_id)
                seed_cid_path = os.path.join(seed_folder, corresponding_cid)

                # Copy all content from seed CID folder to output parcel folder
                for item in os.listdir(seed_cid_path):
                    source_path = os.path.join(seed_cid_path, item)
                    dest_path = os.path.join(output_parcel_path, item)

                    if os.path.isfile(source_path):
                        shutil.copy2(source_path, dest_path)
                    elif os.path.isdir(source_path):
                        if os.path.exists(dest_path):
                            shutil.rmtree(dest_path)
                        shutil.copytree(source_path, dest_path)

                # Rename parcel ID folder to CID if requested
                if rename_to_cid:
                    new_output_cid_path = os.path.join(output_folder, corresponding_cid)

                    if not os.path.exists(new_output_cid_path):
                        os.rename(output_parcel_path, new_output_cid_path)
                    else:
                        # If CID folder already exists, merge content and remove parcel folder
                        for item in os.listdir(output_parcel_path):
                            source_path = os.path.join(output_parcel_path, item)
                            dest_path = os.path.join(new_output_cid_path, item)

                            if os.path.isfile(source_path):
                                shutil.copy2(source_path, dest_path)
                            elif os.path.isdir(source_path):
                                if os.path.exists(dest_path):
                                    shutil.rmtree(dest_path)
                                shutil.copytree(source_path, dest_path)

                        shutil.rmtree(output_parcel_path)

        return True

    except Exception:
        return False

def run_validate_and_upload():
    """Run validation and upload process"""
    try:

        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photo-results.csv"],
            stdout=subprocess.DEVNULL,    # hide stdout
            stderr=subprocess.PIPE,       # capture stderr
            check=True,
            text=True                     # stderr as string
        )

        # If there are recorded errors - stop execution


        # Otherwise - read results
        photo_group_cid, html_link = get_photo_cid_and_html_link()
        print("✅ Transform done\n")
        print(f"Photo group CID: {photo_group_cid}\n")
        print(f"HTML link: {html_link}")
        return True

    except subprocess.CalledProcessError as e:
        # handle command execution errors
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        if e.stderr:
            print(e.stderr.strip(), file=sys.stderr)
        return False
    except Exception as e:
        print(f"❌ Validation and upload failed: {str(e)}")
        return False


def main():
    """Main processing pipeline"""
    #os.chdir("/content")
    process_photo_data("output","property_seed.json")
    copy_group_to_output("output","county",False)
    copy_group_to_output("output","seed")
    run_validate_and_upload()

if __name__ == "__main__":
    main()




✅ Transform done

Photo group CID: bafkreicmbnr6u6onlqyrhewewzzbil54rpveyknbvlwudx56zclyapmsp4

HTML link: http://dweb.link/ipfs/bafybeicmgbjxqidphyedaxjesztjauzqzphpviaklbbwi5yuolby6cddgq


In [64]:
# @title Step 6: Validate
! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_photo_cid_and_html_link(path="/content/photo-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["dataGroupCid"], first_row["htmlLink"]


def has_submit_errors(path="/content/submit_errors.csv"):
    """
    Повертає True, якщо у файлі submit_errors.csv є хоча б один рядок (після заголовку).
    """
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_validate_and_upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photo-results.csv"],
            stdout=subprocess.DEVNULL,    # ховаємо stdout
            stderr=subprocess.PIPE,       # ловимо stderr у буфер
            check=True,
            text=True                     # stderr як рядок
        )


        # Інакше — читаємо результати
        seed_group_cid, html_link = get_photo_cid_and_html_link()
        print("✅ Validate done\n")
        print(f"Photo group CID: {seed_group_cid}\n")
        print(f"HTML link: {html_link}")

    except subprocess.CalledProcessError as e:
        # обробка помилок виконання команди
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_validate_and_upload()

✅ Validate done

Photo group CID: bafkreicmbnr6u6onlqyrhewewzzbil54rpveyknbvlwudx56zclyapmsp4

HTML link: http://dweb.link/ipfs/bafybeiafdnhn3am7tdaufpyvonn5d5alcfkgdjgpggt3ts2eyjnhghrlom


In [None]:
# @title Step 4: Upload
! pip3 install python-dotenv requests -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv

import requests


def get_photo_info(path="photo-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")

        second_row = next(reader, None)
        if second_row is None:
            raise ValueError("CSV file has only one row")
        return second_row


def has_submit_errors(path="submit_errors.csv"):
    """
    Повертає True, якщо у файлі submit_errors.csv є хоча б один рядок (після заголовку).
    """
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def count_upload_records(path="photo-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return sum(1 for _ in reader)


def collect_data_ipfs_links(data_cid):
    print(data_cid)
    # Handle data_cid as either a single CID or a list of CIDs
    if isinstance(data_cid, list):
        photo_data_links = [f"https://ipfs.io/ipfs/{cid}" for cid in data_cid]
        # Use the first CID to get the structure data
        photo_data = requests.get(photo_data_links[0]).json()
    else:
        photo_data_links = f"https://ipfs.io/ipfs/{data_cid}"
        photo_data = requests.get(photo_data_links).json()

    # Extract property seed CID from the correct relationship
    property_photo_cid = photo_data["relationships"]["property_seed_has_file"][0]["/"]  # Access first item in list
    property_photo_link = f"https://ipfs.io/ipfs/{property_photo_cid}"  # Fixed variable name

    # Get property seed data
    property_seed_data = requests.get(property_photo_link).json()  # Fixed variable name

    # Extract property and address CIDs
    property_cid, address_cid = property_seed_data["from"]["/"], property_seed_data["to"]["/"]

    # Create links
    property_link = f"https://ipfs.io/ipfs/{property_cid}"
    address_link = f"https://ipfs.io/ipfs/{address_cid}"

    # Return the correct variables (photo_data_links can be a string or list)
    return photo_data_links, property_photo_link, property_link, address_link


def run_validate_and_upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photo-results.csv"],
            stdout=subprocess.DEVNULL,    # ховаємо stdout
            stderr=subprocess.PIPE,       # ловимо stderr у буфер
            check=True,
            text=True,
        )

        photo_info = get_photo_info()
        photo_group_cid, data_cid, html_link = photo_info["dataGroupCid"], photo_info["dataCid"], photo_info["htmlLink"]

        files_uploaded = count_upload_records()
        data_ipfs_links = collect_data_ipfs_links(data_cid)
        photo_data_links, property_photo_link, property_link, address_link = data_ipfs_links

        print("✅ Upload done\n")
        print(f"{files_uploaded} files uploaded\n")

        print(f"Photo group CID: {photo_group_cid}\n")
        print(f"HTML link: {html_link}\n")

        # Handle photo_data_links as either string or list
        if isinstance(photo_data_links, list):
            print(f"Photo data IPFS links:")
            for i, link in enumerate(photo_data_links, 1):
                print(f"  Photo {i}: {link}")
        else:
            print(f"Photo data IPFS link: {photo_data_links}")

        print(f"Relationship IPFS link: {property_photo_link}")
        print(f"Property seed IPFS link: {property_link}")
        print(f"Unnormalized address IPFS link: {address_link}")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_validate_and_upload()

In [60]:
!rm -r output seed county

rm: cannot remove 'output': No such file or directory


In [None]:
# @title Step 5: Submit

! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_transaction_hash(path="transaction-status.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["transactionHash"]


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_submit_to_contract():
    try:
        subprocess.run(
            [
                "npx", "-y", "@elephant-xyz/cli", "submit-to-contract", "photo-results.csv",
                "--from-address", "0xefAd08946612A15d5De8D4Db7fc03556b6424075",
                "--api-key", "f7e18cf6-5d07-4e4a-ae23-f27b812614e6",
                "--domain", "oracles-69c46050.staircaseapi.com",
                "--oracle-key-id", "7ad26e0b-67c9-4c2f-95a2-2792c7db5ac7",
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Submit failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        transaction_hash = get_transaction_hash()
        transaction_link = f"https://polygonscan.com/tx/{transaction_hash}"

        print("✅ Submit done\n")
        print(f"Transaction link: {transaction_link}")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_submit_to_contract()

✅ Submit done

Transaction link: https://polygonscan.com/tx/0xb64df442346f8102a8756c83069ccf2af5db83362e40cac18f50261a2ff5df80


#Photo Metadata Mining Process


In [None]:
# @title Step 1: Prepare
# @title  {"vertical-output":true}
import os
import subprocess
import shutil
import re
import sys
import csv
import json
import requests
from pathlib import Path
from ctypes import c_void_p



def cleanup_folders(base_path="/content"):
    """
    Remove specified folders and their contents.

    Args:
        base_path (str): Base path where folders are located (default: "/content")
    """
    folders_to_remove = ["output", "images", "seed", "county", "county-data"]

    for folder in folders_to_remove:
        folder_path = os.path.join(base_path, folder)
        if os.path.exists(folder_path):
            shutil.rmtree(folder_path)

def extract_images(parcel_id):
    """Extract JPG files from parcel zip, skip macOS files"""
    zip_path = f"/content/{parcel_id}.zip"
    extract_path = f"images/{parcel_id}"
    temp_path = f"/tmp/{parcel_id}_temp"

    if not os.path.exists(zip_path):
        return None

    os.makedirs(extract_path, exist_ok=True)
    os.makedirs(temp_path, exist_ok=True)

    try:
        # Extract only JPG files, exclude macOS files
        subprocess.run([
            'unzip', '-j', '-o', zip_path, '-d', temp_path,
            '*.jpg', '*.jpeg', '*.JPG', '*.JPEG',
            '-x', '__MACOSX/*', '*.DS_Store'
        ], capture_output=True, text=True)

        extracted_count = 0
        if os.path.exists(temp_path):
            for file in os.listdir(temp_path):
                if file.lower().endswith(('.jpg', '.jpeg')):
                    source = os.path.join(temp_path, file)
                    target = os.path.join(extract_path, file)
                    if os.path.exists(source):
                        shutil.copy2(source, target)
                        extracted_count += 1

        if os.path.exists(temp_path):
            shutil.rmtree(temp_path)

        return extract_path if extracted_count > 0 else None

    except Exception:
        if os.path.exists(temp_path):
            shutil.rmtree(temp_path)
        return None


def ensure_directory(file_path):
    """Ensure the directory for the file exists"""
    directory = os.path.dirname(file_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)


def create_parcel_folder(parcel_id):
    # Create folder name based on parcel_id
    clean_parcel_id = re.sub(r"[^\w\-_]", "_", str(parcel_id))
    folder_name = f"output/{clean_parcel_id}"
    ensure_directory(folder_name + "/")
    return folder_name


def install_photo_meta_data_ai():
    """Install photo-meta-data-ai package from GitHub"""
    try:
        result = subprocess.run([
            sys.executable, '-m', 'pip', 'install',
            '--force-reinstall', '--no-cache-dir',
            'git+https://github.com/elephant-xyz/photo-meta-data-ai.git'
        ], capture_output=True, text=True, timeout=300)

        with open('/content/install_log.txt', 'w') as f:
            f.write("STDOUT:\n")
            f.write(result.stdout)
            f.write("\nSTDERR:\n")
            f.write(result.stderr)
            f.write(f"\nReturn code: {result.returncode}")

        return result.returncode == 0

    except subprocess.TimeoutExpired:
        return False
    except Exception:
        return False


def copy_images_to_output():
   """Copy images from images/<parcel_id>/ to output/<parcel_id>/"""
   try:
       source_dir = "/content/images"
       output_dir = "/content/output"

       if os.path.exists(source_dir):
           shutil.copytree(source_dir, output_dir, dirs_exist_ok=True)
           return True
       return False
   except:
       return False


def extract_datacids_from_csv(csv_path):
    """Extract all dataCIDs from a CSV file."""
    datacids = []

    with open(csv_path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            if 'dataCid' in row and row['dataCid']:
                datacid = row['dataCid'].strip()
                if datacid:
                    datacids.append(datacid)

    return datacids


def traverse_and_download(cids, destination_folder):
    """
    Download CIDs and all related CIDs by traversing relationships.
    Creates folder structure: destination_folder/cid/ for each CID in the list.

    Args:
        cids (list): List of root CIDs to start traversing from
        destination_folder (str): The destination folder name
    """
    total_downloaded = 0

    for cid in cids:
        downloaded = set()

        # Create folder structure for this CID
        main_folder = Path(f"{destination_folder}/{cid}")
        main_folder.mkdir(parents=True, exist_ok=True)

        def traverse_cid(current_cid):
            """Recursively fetch CID and traverse relationships"""
            if current_cid in downloaded:
                return



            try:
                response = requests.get(f"https://ipfs.io/ipfs/{current_cid}", timeout=30)
                data = response.json()

                # Save the JSON file
                file_path = main_folder / f"{current_cid}.json"
                with open(file_path, 'w') as f:
                    json.dump(data, f, indent=2)

                downloaded.add(current_cid)

                # Look for relationships
                if "relationships" in data:
                    for key, value in data["relationships"].items():
                        if value is None:
                            # Skip null relationships
                            continue
                        elif isinstance(value, dict) and "/" in value:
                            # Single CID reference
                            related_cid = value["/"]
                            traverse_cid(related_cid)
                        elif isinstance(value, list):
                            # Array of CID references
                            for i, item in enumerate(value):
                                if isinstance(item, dict) and "/" in item:
                                    related_cid = item["/"]
                                    traverse_cid(related_cid)

                # Look for "from" and "to" fields
                for field in ["from", "to"]:
                    if field in data:
                        value = data[field]
                        if isinstance(value, dict) and "/" in value:
                            # Single CID reference
                            related_cid = value["/"]
                            traverse_cid(related_cid)
                        elif isinstance(value, list):
                            # Array of CID references
                            for item in value:
                                if isinstance(item, dict) and "/" in item:
                                    related_cid = item["/"]
                                    traverse_cid(related_cid)

            except Exception as e:
                print(f"Error downloading CID {current_cid}: {e}")

        # Start traversal for this root CID
        traverse_cid(cid)

        total_downloaded += len(downloaded)



import json
from pathlib import Path

import json
import os
from pathlib import Path

import json
import os
from pathlib import Path

def rename_and_update_references(cids, parent_folder):
    """
    Rename JSON files to meaningful names and update all CID references to relative paths.
    Keep the root CID filename unchanged. Can process single CID or list of CIDs.

    Args:
        cids (str or list): Single CID or list of CIDs to process
        parent_folder (str): The parent folder path (e.g., 'seed')
    """
    # Convert single CID to list for uniform processing
    if isinstance(cids, str):
        cids = [cids]

    total_processed = 0

    for cid in cids:

        # Construct the full folder path
        folder = Path(f"{parent_folder}/{cid}")

        if not folder.exists():
            continue

        # The root file should match the CID
        root_file = folder / f"{cid}.json"

        if not root_file.exists():
            continue

        # Track CID to new filename mapping for this folder
        cid_to_filename = {}
        processed_files = set()

        # Track naming counters to handle duplicates
        name_counters = {}

        # Root CID keeps its original name
        cid_to_filename[cid] = f"{cid}.json"

        def get_meaningful_name(file_cid, relationship_key=None, file_data=None, index=None):
          """Generate meaningful filename based on content or relationship"""
          base_name = None

          # Always prioritize content-based naming for specific types
          if file_data:
              if "parcel_identifier" in file_data:
                  base_name = "property"
              elif "parcel_id" in file_data:
                  base_name = "property_seed"
              elif "space_type" in file_data:
                  base_name = "layout"
              elif "full_address" in file_data:
                  base_name = "address_data"
              elif "from" in file_data or "to" in file_data:
                  base_name = "connection"

          # Only use relationship key if no content pattern matched
          if not base_name and relationship_key:
              if index is not None:
                  base_name = f"{relationship_key}_{index}"
              else:
                  base_name = relationship_key

          # Final fallback to shortened CID
          if not base_name:
              base_name = file_cid[:8]

          # Handle duplicate names by adding counter
          if base_name in name_counters:
              name_counters[base_name] += 1
              final_name = f"{base_name}_{name_counters[base_name]}"
          else:
              name_counters[base_name] = 0
              final_name = base_name

          return f"{final_name}.json"

        def process_file(file_cid, relationship_key=None, index=None):
            """Process a single file and its references"""
            if file_cid in processed_files:
                return cid_to_filename.get(file_cid)

            file_path = folder / f"{file_cid}.json"
            if not file_path.exists():
                return None

            # Load file content
            with open(file_path, 'r') as f:
                data = json.load(f)

            # Determine new filename (skip if it's the root CID)
            if file_cid != cid:
                new_filename = get_meaningful_name(file_cid, relationship_key, data, index)
                cid_to_filename[file_cid] = new_filename

            processed_files.add(file_cid)

            # Process relationships in the current file
            if "relationships" in data:
                for key, value in data["relationships"].items():
                    if value is None:
                        # Skip null relationships
                        continue
                    elif isinstance(value, dict) and "/" in value:
                        # Single CID reference
                        related_cid = value["/"]
                        if not related_cid.startswith("./"):  # Only process actual CIDs, not already converted paths
                            process_file(related_cid, key)
                    elif isinstance(value, list):
                        # Multiple CID references
                        for i, item in enumerate(value):
                            if isinstance(item, dict) and "/" in item:
                                related_cid = item["/"]
                                if not related_cid.startswith("./"):  # Only process actual CIDs
                                    process_file(related_cid, key, i)

            # Process "from" and "to" fields
            for field in ["from", "to"]:
                if field in data:
                    value = data[field]
                    if isinstance(value, dict) and "/" in value:
                        related_cid = value["/"]
                        if not related_cid.startswith("./"):  # Only process actual CIDs
                            process_file(related_cid, field)
                    elif isinstance(value, list):
                        for i, item in enumerate(value):
                            if isinstance(item, dict) and "/" in item:
                                related_cid = item["/"]
                                if not related_cid.startswith("./"):  # Only process actual CIDs
                                    process_file(related_cid, field, i)

            return cid_to_filename.get(file_cid)

        # Start processing from root file
        process_file(cid)

        # Now update all references and rename files
        for file_cid, new_filename in cid_to_filename.items():
            old_file_path = folder / f"{file_cid}.json"
            new_file_path = folder / new_filename

            if not old_file_path.exists():
                continue

            # Load and update file content
            with open(old_file_path, 'r') as f:
                data = json.load(f)

            # Update references in this file
            if "relationships" in data:
                for key, value in data["relationships"].items():
                    if isinstance(value, dict) and "/" in value:
                        # Single CID reference
                        referenced_cid = value["/"]
                        if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                            data["relationships"][key] = {"/": f"./{cid_to_filename[referenced_cid]}"}
                    elif isinstance(value, list):
                        # Multiple CID references
                        updated_list = []
                        for item in value:
                            if isinstance(item, dict) and "/" in item:
                                referenced_cid = item["/"]
                                if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                                    updated_list.append({"/": f"./{cid_to_filename[referenced_cid]}"})
                                else:
                                    updated_list.append(item)
                            else:
                                updated_list.append(item)
                        data["relationships"][key] = updated_list

            # Update "from" and "to" fields
            for field in ["from", "to"]:
                if field in data:
                    value = data[field]
                    if isinstance(value, dict) and "/" in value:
                        referenced_cid = value["/"]
                        if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                            data[field] = {"/": f"./{cid_to_filename[referenced_cid]}"}
                    elif isinstance(value, list):
                        updated_list = []
                        for item in value:
                            if isinstance(item, dict) and "/" in item:
                                referenced_cid = item["/"]
                                if referenced_cid in cid_to_filename and not referenced_cid.startswith("./"):
                                    updated_list.append({"/": f"./{cid_to_filename[referenced_cid]}"})
                                else:
                                    updated_list.append(item)
                            else:
                                updated_list.append(item)
                        data[field] = updated_list

            # Save updated content to new file
            with open(new_file_path, 'w') as f:
                json.dump(data, f, indent=2)

            # Remove old file if name changed
            if old_file_path != new_file_path:
                old_file_path.unlink()

        total_processed += len(processed_files)

import json
import shutil
from pathlib import Path

import json
import shutil
from pathlib import Path

def copy_cid_to_parcel_structure(source_folder, target_folder_prefix):
    """
    Copy folders from folder/CID/data structure to folder-parcel/parcelid/data structure.

    Args:
        source_folder (str): Source folder containing CID subfolders (e.g., 'seed')
        target_folder_prefix (str): Target folder prefix (e.g., 'seed-parcel')
    """
    source_path = Path(source_folder)

    if not source_path.exists():
        return

    processed_count = 0
    error_count = 0

    # Iterate through all CID folders in source
    for cid_folder in source_path.iterdir():
        if not cid_folder.is_dir():
            continue

        cid = cid_folder.name

        # Look for the root JSON file (should match CID name)
        root_file = cid_folder / f"{cid}.json"

        if not root_file.exists():
            error_count += 1
            continue

        try:
            # Load root file to find parcel_id
            with open(root_file, 'r') as f:
                root_data = json.load(f)

            # Extract parcel_id - check multiple possible field names
            parcel_id = None
            for field in ['parcel_id', 'parcel_identifier', 'parcelId', 'parcelIdentifier']:
                if field in root_data:
                    parcel_id = root_data[field]
                    break

            # If not in root, check for property.json or files with parcel info
            if not parcel_id:
                # Look for property.json or files containing parcel info
                for json_file in cid_folder.glob("*.json"):
                    if json_file.name == f"{cid}.json":
                        continue  # Skip root file, already checked

                    try:
                        with open(json_file, 'r') as f:
                            data = json.load(f)

                        for field in ['parcel_id', 'parcel_identifier', 'parcelId', 'parcelIdentifier']:
                            if field in data:
                                parcel_id = data[field]
                                break

                        if parcel_id:
                            break
                    except (json.JSONDecodeError, Exception) as e:
                        continue

            if not parcel_id:
                error_count += 1
                continue

            # Remove hyphens from parcel_id for folder name
            clean_parcel_id = str(parcel_id).replace('-', '')

            # Create target folder structure
            target_folder = Path(f"{target_folder_prefix}")
            target_cid_folder = target_folder / clean_parcel_id

            # Create target directory if it doesn't exist
            target_cid_folder.mkdir(parents=True, exist_ok=True)

            # Copy all files from source CID folder to target parcel folder
            for item in cid_folder.iterdir():
                target_item = target_cid_folder / item.name

                if item.is_file():
                    shutil.copy2(item, target_item)
                elif item.is_dir():
                    shutil.copytree(item, target_item, dirs_exist_ok=True)

            processed_count += 1

        except Exception as e:
            error_count += 1
            continue

    return processed_count, error_count



def main():
    parcel_id = "52434205310037080" # @param {"type":"string"}
    cleanup_folders()
    extract_images(parcel_id)
    create_parcel_folder(parcel_id)
    install_photo_meta_data_ai()
    copy_images_to_output()
    datacid = extract_datacids_from_csv("seed-results.csv")
    countydatacid = extract_datacids_from_csv("county-results.csv")
    traverse_and_download(datacid, "seed")
    traverse_and_download(countydatacid, "county")
    rename_and_update_references(datacid, "seed")
    rename_and_update_references(countydatacid, "county")
    copy_cid_to_parcel_structure("county","county-data")



if __name__ == "__main__":
    main()

In [None]:
# @title Step 2: Transform
! pip3 install python-dotenv -q
import subprocess
from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv
import os
import shutil
import zipfile
from pathlib import Path
from datetime import datetime


def copy_csv(source_path, destination_path):
    """
    Copy a CSV file from source to destination.

    Args:
        source_path (str): Path to the source CSV file
        destination_path (str): Path to the destination CSV file

    Returns:
        bool: True if successful, False otherwise
    """
    try:
        # Check if source file exists
        if not os.path.exists(source_path):
            print(f"Error: Source file '{source_path}' does not exist.")
            return False

        # Create destination directory if it doesn't exist
        dest_dir = os.path.dirname(destination_path)
        if dest_dir and not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        # Copy the file
        shutil.copy2(source_path, destination_path)
        print(f"Successfully copied '{source_path}' to '{destination_path}'")
        return True

    except Exception as e:
        print(f"Error copying file: {e}")
        return False

def get_photo_cid_and_html_link(path="photometadata-results.csv"):
    """Get photo CID and HTML link from upload results CSV"""
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["dataGroupCid"], first_row["htmlLink"]


def has_submit_errors(path="submit_errors.csv"):
    """
    Returns True if submit_errors.csv has at least one row (after header).
    """
    try:
        with open(path, newline='', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            return next(reader, None) is not None
    except FileNotFoundError:
        return False



def process_photo_data(input_dir="output", property_filename="my_property.json"):
    """Step 1: Process photo data"""
    os.chdir("/content")

    try:
        cmd = f"process-photo-data --input-dir {input_dir} --property-filename {property_filename}"
        subprocess.run(cmd, shell=True, check=True)
        return True
    except:
        return False

import os
import shutil
import json

def copy_group_to_output(output_folder="output", seed_folder="seed", rename_to_cid=True):

    if not os.path.exists(output_folder) or not os.path.exists(seed_folder):
        return False

    try:
        # Get all parcel ID folders in output
        output_parcel_folders = [f for f in os.listdir(output_folder)
                               if os.path.isdir(os.path.join(output_folder, f))]

        # Get all CID folders in seed
        seed_cid_folders = [f for f in os.listdir(seed_folder)
                          if os.path.isdir(os.path.join(seed_folder, f))]

        # Create mapping by reading JSON files in seed folders
        parcel_to_cid_mapping = {}

        for cid in seed_cid_folders:
            cid_folder_path = os.path.join(seed_folder, cid)

            # Look for JSON files in the CID folder
            for file in os.listdir(cid_folder_path):
                if file.endswith('.json'):
                    json_file_path = os.path.join(cid_folder_path, file)
                    try:
                        with open(json_file_path, 'r') as f:
                            data = json.load(f)

                        # Look for parcel_id or parcel_identifier in the JSON data
                        parcel_id = None
                        if 'parcel_id' in data:
                            parcel_id = str(data['parcel_id']).strip()
                        elif 'parcel_identifier' in data:
                            parcel_id = str(data['parcel_identifier']).strip()

                        if parcel_id:
                            # Normalize parcel ID by removing dashes
                            normalized_parcel_id = parcel_id.replace('-', '')
                            if normalized_parcel_id:
                                parcel_to_cid_mapping[normalized_parcel_id] = cid
                                break
                    except:
                        continue

        # Process each parcel folder
        for parcel_id in output_parcel_folders:
            # Normalize the parcel folder name (remove dashes) to match our mapping
            normalized_parcel_id = parcel_id.replace('-', '')

            # Find corresponding CID
            corresponding_cid = parcel_to_cid_mapping.get(normalized_parcel_id)

            if corresponding_cid and corresponding_cid in seed_cid_folders:
                # Paths
                output_parcel_path = os.path.join(output_folder, parcel_id)
                seed_cid_path = os.path.join(seed_folder, corresponding_cid)

                # Copy all content from seed CID folder to output parcel folder
                for item in os.listdir(seed_cid_path):
                    source_path = os.path.join(seed_cid_path, item)
                    dest_path = os.path.join(output_parcel_path, item)

                    if os.path.isfile(source_path):
                        shutil.copy2(source_path, dest_path)
                    elif os.path.isdir(source_path):
                        if os.path.exists(dest_path):
                            shutil.rmtree(dest_path)
                        shutil.copytree(source_path, dest_path)

                # Rename parcel ID folder to CID if requested
                if rename_to_cid:
                    new_output_cid_path = os.path.join(output_folder, corresponding_cid)

                    if not os.path.exists(new_output_cid_path):
                        os.rename(output_parcel_path, new_output_cid_path)
                    else:
                        # If CID folder already exists, merge content and remove parcel folder
                        for item in os.listdir(output_parcel_path):
                            source_path = os.path.join(output_parcel_path, item)
                            dest_path = os.path.join(new_output_cid_path, item)

                            if os.path.isfile(source_path):
                                shutil.copy2(source_path, dest_path)
                            elif os.path.isdir(source_path):
                                if os.path.exists(dest_path):
                                    shutil.rmtree(dest_path)
                                shutil.copytree(source_path, dest_path)

                        shutil.rmtree(output_parcel_path)

        return True

    except Exception:
        return False



def run_validate_and_upload():
    """Run validation and upload process"""
    try:

        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photometadata-results.csv"],
            stdout=subprocess.DEVNULL,    # hide stdout
            stderr=subprocess.PIPE,       # capture stderr
            check=True,
            text=True                     # stderr as string
        )

        # If there are recorded errors - stop execution


        # Otherwise - read results
        photo_group_cid, html_link = get_photo_cid_and_html_link()
        print("✅ Transform done\n")
        print(f"Photo group CID: {photo_group_cid}\n")
        print(f"HTML link: {html_link}")
        return True

    except subprocess.CalledProcessError as e:
        # handle command execution errors
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        if e.stderr:
            print(e.stderr.strip(), file=sys.stderr)
        return False
    except Exception as e:
        print(f"❌ Validation and upload failed: {str(e)}")
        return False
def run_shell_commands():
    """
    Execute shell commands: bucket-manager, unzip-county-data, upload-to-s3

    Returns:
        bool: True if all commands succeeded, False if any failed
    """
    commands = [
        "bucket-manager",
        "upload-to-s3",
        "photo-categorizer",
        "ai-analyzer --local-folders --parallel-categories --all-properties --county-data-dir ./county-data ",
        "fix-schema-validation"
    ]

    for command in commands:
        try:
            subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
        except subprocess.CalledProcessError:
            return False
        except Exception:
            return False

    return True

def main():
    """Main processing pipeline"""
    os.chdir("/content")
    run_shell_commands()
    copy_group_to_output("output","county",False)
    copy_group_to_output("output","seed")
    run_validate_and_upload()

if __name__ == "__main__":
    main()


✅ Transform done

Photo group CID: bafkreih226p5vjhx33jwgq7trblyplfw7yhkununuuahgpfok3hnh5mjwq

HTML link: http://dweb.link/ipfs/bafybeid3unil7orix7cqadb6xpkt7m74bhb5xwez22k2jkyhxw7mvwtunm


In [None]:
# @title Step 3: Validate
! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_photo_cid_and_html_link(path="/content/photometadata-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["dataGroupCid"], first_row["htmlLink"]


def has_submit_errors(path="/content/submit_errors.csv"):
    """
    Повертає True, якщо у файлі submit_errors.csv є хоча б один рядок (після заголовку).
    """
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_validate_and_upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photo-results.csv"],
            stdout=subprocess.DEVNULL,    # ховаємо stdout
            stderr=subprocess.PIPE,       # ловимо stderr у буфер
            check=True,
            text=True                     # stderr як рядок
        )


        # Інакше — читаємо результати
        photometa_group_cid, html_link = get_photo_cid_and_html_link()
        print("✅ Validate done\n")
        print(f"Photometadata group CID: {photometa_group_cid}\n")
        print(f"HTML link: {html_link}")

    except subprocess.CalledProcessError as e:
        # обробка помилок виконання команди
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_validate_and_upload()

✅ Validate done

Photometadata group CID: bafkreih226p5vjhx33jwgq7trblyplfw7yhkununuuahgpfok3hnh5mjwq

HTML link: http://dweb.link/ipfs/bafybeicr3coifenubbuawewkrpzonm44z4strnumegkjpsmcqlupvvhr7a


In [None]:
# @title Step 4: Upload
! pip3 install python-dotenv requests -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv

import requests


def get_photo_info(path="photometadata-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row


def has_submit_errors(path="submit_errors.csv"):
    """
    Повертає True, якщо у файлі submit_errors.csv є хоча б один рядок (після заголовку).
    """
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def count_upload_records(path="photometadata-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return sum(1 for _ in reader)


def collect_data_ipfs_links(data_cid):
    # Handle data_cid as either a single CID or a list of CIDs
    if isinstance(data_cid, list):
        photo_data_links = [f"https://ipfs.io/ipfs/{cid}" for cid in data_cid]
        # Use the first CID to get the structure data
        photo_data = requests.get(photo_data_links[0]).json()
    else:
        photo_data_links = f"https://ipfs.io/ipfs/{data_cid}"
        photo_data = requests.get(photo_data_links).json()

    # Extract property seed CID from the correct relationship
    property_photo_cid = photo_data["relationships"]["property_has_file"][0]["/"]  # Access first item in list
    property_photo_link = f"https://ipfs.io/ipfs/{property_photo_cid}"  # Fixed variable name

    # Get property seed data
    property_seed_data = requests.get(property_photo_link).json()  # Fixed variable name

    # Extract property and address CIDs
    property_cid, address_cid = property_seed_data["from"]["/"], property_seed_data["to"]["/"]

    # Create links
    property_link = f"https://ipfs.io/ipfs/{property_cid}"
    address_link = f"https://ipfs.io/ipfs/{address_cid}"

    # Return the correct variables (photo_data_links can be a string or list)
    return photo_data_links, property_photo_link, property_link, address_link


def run_validate_and_upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate-and-upload", "output", "--output-csv", "photometadata-results.csv"],
            stdout=subprocess.DEVNULL,    # ховаємо stdout
            stderr=subprocess.PIPE,       # ловимо stderr у буфер
            check=True,
            text=True,
        )

        photo_info = get_photo_info()
        photo_group_cid, data_cid, html_link = photo_info["dataGroupCid"], photo_info["dataCid"], photo_info["htmlLink"]

        files_uploaded = count_upload_records()

        data_ipfs_links = collect_data_ipfs_links(data_cid)
        photo_data_links, property_photo_link, property_link, address_link = data_ipfs_links

        print("✅ Upload done\n")
        print(f"{files_uploaded} files uploaded\n")

        print(f"Photometadata group CID: {photo_group_cid}\n")
        print(f"HTML link: {html_link}\n")

        # Handle photo_data_links as either string or list
        if isinstance(photo_data_links, list):
            print(f"Photo data IPFS links:")
            for i, link in enumerate(photo_data_links, 1):
                print(f"  Photo {i}: {link}")
        else:
            print(f"Photo data IPFS link: {photo_data_links}")

        print(f"Relationship IPFS link: {property_photo_link}")
        print(f"Property IPFS link: {property_link}")


    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_validate_and_upload()

✅ Upload done

1 files uploaded

Photometadata group CID: bafkreih226p5vjhx33jwgq7trblyplfw7yhkununuuahgpfok3hnh5mjwq

HTML link: http://dweb.link/ipfs/bafybeigmqhc3xnnrhuob6t7xv4b7bffe35rxgc6zb37u7ftcqzmirrixke

Photo data IPFS link: https://ipfs.io/ipfs/bafkreie7yignhnctd3i3h5n2pff3xhnq72x4c5i4yzcw244pdvbil77w6u
Relationship IPFS link: https://ipfs.io/ipfs/bafkreia3do6r7w6tgcvcjbyviz77tnoymg3pcjs7rpxztsgppc75ytkkm4
Property IPFS link: https://ipfs.io/ipfs/bafkreifb6el25q5wl4n5gsiwdojmmxn67m6ez3xmjtprwz4cww7m7kwjvu


In [None]:
# @title Step 5: Submit

! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_transaction_hash(path="transaction-status.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["transactionHash"]


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_submit_to_contract():
    try:
        subprocess.run(
            [
                "npx", "-y", "@elephant-xyz/cli", "submit-to-contract", "photometadata-results.csv",
                "--from-address", "0xefAd08946612A15d5De8D4Db7fc03556b6424075",
                "--api-key", "f7e18cf6-5d07-4e4a-ae23-f27b812614e6",
                "--domain", "oracles-69c46050.staircaseapi.com",
                "--oracle-key-id", "7ad26e0b-67c9-4c2f-95a2-2792c7db5ac7",
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Submit failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        transaction_hash = get_transaction_hash()
        transaction_link = f"https://polygonscan.com/tx/{transaction_hash}"

        print("✅ Submit done\n")
        print(f"Transaction link: {transaction_link}")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_submit_to_contract()

✅ Submit done

Transaction link: https://polygonscan.com/tx/0xade1227d4e36fcac2b57775c82deca01b63842e3d8bcbceaae77599aab637a76
