<a href="https://colab.research.google.com/github/elephant-xyz/notebook/blob/main/Mining_County.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#County Mining process

In [None]:
# @title Step 1: Upload .env

In [None]:
# @title Step 2: Upload seed_output.zip
import zipfile
import os
import shutil

def extract_seed_results(zip_path, output_path):
    # 1. Create a temporary extraction folder
    temp_dir = "./temp_unzip"
    os.makedirs(temp_dir, exist_ok=True)

    try:
        # 2. Extract all files
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(temp_dir)

        # 3. Find seed-results.csv inside extracted files
        target_file = None
        for root, _, files in os.walk(temp_dir):
            for f in files:
                if f == output_path:
                    target_file = os.path.join(root, f)
                    break
            if target_file:
                break

        if not target_file:
            raise FileNotFoundError("seed-results.csv not found inside the zip archive")

        # 4. Move seed-results.csv into current working directory
        shutil.move(target_file, output_path)
        print(f"✔ seed-results.csv extracted to {output_path}")

    finally:
        # 5. Clean up: delete everything else
        shutil.rmtree(temp_dir, ignore_errors=True)


extract_seed_results("seed_output.zip", "seed.csv")
extract_seed_results("seed_output.zip", "seed-results.csv")


✔ seed-results.csv extracted to seed.csv
✔ seed-results.csv extracted to seed-results.csv


In [None]:
# @title Step 3: Prepare
from dataclasses import dataclass
from pathlib import Path
from typing import Any, NotRequired, TypedDict, cast

import json
import logging
import shutil
import tempfile
import zipfile
from urllib.parse import urlencode

import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ---------- Logging ----------
logger = logging.getLogger(__name__)
if not logger.handlers:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")


# ---------- Types ----------
class SourceHttpRequest(TypedDict, total=False):
    method: str
    url: str
    multiValueQueryString: dict[str, Any]
    headers: dict[str, str]
    body: Any
    json: Any


class PropertySeed(TypedDict, total=False):
    parcel_id: NotRequired[str]
    request_identifier: NotRequired[str]
    source_http_request: SourceHttpRequest


@dataclass(frozen=True)
class PackagerConfig:
    input_zip: Path
    output_zip: Path
    timeout_sec: float = 30.0
    retries: int = 3
    backoff_factor: float = 1.6
    status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504)


# ---------- Public API ----------
class PropertyZipPackager:
    """
    Importable packager:
      - Unpacks input ZIP with seed_output/
      - Reads seed_output/property_seed.json
      - Executes described HTTP request
      - Writes HTML to seed_output/<parcel_id>.html (or <request_identifier>.html)
      - Repackages as output ZIP
    """

    def __init__(self, config: PackagerConfig, session: Session | None = None) -> None:
        self.cfg = config
        self.session = session or self._build_session()

    def run(self) -> Path:
        """
        Execute the pipeline. Returns the output_zip path.
        Raises on validation/network/IO errors.
        """
        self._validate_paths()

        with tempfile.TemporaryDirectory(prefix="prop_zip_packager_") as tmpdir:
            workdir = Path(tmpdir)
            self._extract_input_zip(workdir)

            seed_dir = workdir / "seed_output"
            seed = self._load_property_seed(seed_dir / "property_seed.json")

            html_text = self._fetch_html_from_seed(seed)

            # ---- derive filename from IDs
            file_stem = self._resolve_identifier(seed)
            out_html_path = seed_dir / f"{file_stem}.html"
            out_html_path.write_text(html_text, encoding="utf-8")
            logger.info("Wrote HTML -> %s", out_html_path.relative_to(workdir).as_posix())

            self._write_output_zip(workdir)
            logger.info("Output ZIP written -> %s", self.cfg.output_zip.resolve())

        return self.cfg.output_zip

    # ---------- Helpers ----------
    def _build_session(self) -> Session:
        retry = Retry(
            total=self.cfg.retries,
            connect=self.cfg.retries,
            read=self.cfg.retries,
            status=self.cfg.retries,
            backoff_factor=self.cfg.backoff_factor,
            status_forcelist=self.cfg.status_forcelist,
            allowed_methods={"GET", "POST"},
            raise_on_status=False,
            respect_retry_after_header=True,
        )
        adapter = HTTPAdapter(max_retries=retry)
        s = requests.Session()
        s.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/124.0 Safari/537.36"
            )
        })
        s.mount("http://", adapter)
        s.mount("https://", adapter)
        return s

    def _validate_paths(self) -> None:
        if not self.cfg.input_zip.exists():
            raise FileNotFoundError(f"Input ZIP not found: {self.cfg.input_zip}")
        self.cfg.output_zip.parent.mkdir(parents=True, exist_ok=True)

    def _extract_input_zip(self, workdir: Path) -> None:
        logger.info("Extracting input ZIP: %s", self.cfg.input_zip)
        with zipfile.ZipFile(self.cfg.input_zip, "r") as zf:
            zf.extractall(workdir)

        seed_dir = workdir / "seed_output"
        if not seed_dir.is_dir():
            raise ValueError("Input ZIP must contain a 'seed_output/' directory.")
        if not (seed_dir / "property_seed.json").is_file():
            raise ValueError("Missing 'seed_output/property_seed.json' in input ZIP.")
        logger.info("Input ZIP extracted and validated.")

    def _load_property_seed(self, path: Path) -> PropertySeed:
        logger.info("Reading property seed: %s", path)
        try:
            data = json.loads(path.read_text(encoding="utf-8"))
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON in {path}: {e}") from e
        if not isinstance(data, dict) or "source_http_request" not in data:
            raise ValueError("property_seed.json must include 'source_http_request'")
        return cast(PropertySeed, data)

    def _resolve_identifier(self, seed: PropertySeed) -> str:
        """
        Use parcel_id if present, else request_identifier.
        Raise if neither is available or empty.
        """
        pid = (seed.get("parcel_id") or "").strip()
        rid = (seed.get("request_identifier") or "").strip()
        ident = pid or rid
        if not ident:
            raise ValueError("property_seed.json must include 'parcel_id' or 'request_identifier' for output filename.")
        return ident

    def _normalize_query_params(self, mvqs: dict[str, Any] | None) -> dict[str, Any]:
        if not mvqs:
            return {}
        params: dict[str, Any] = {}
        for k, v in mvqs.items():
            if isinstance(v, list) and v:
                params[k] = v[0]
            else:
                params[k] = v
        return params

    def _fetch_html_from_seed(self, seed: PropertySeed) -> str:
        src = seed.get("source_http_request", {})
        method = (src.get("method") or "GET").upper()
        url = src.get("url")
        if not url:
            raise ValueError("source_http_request.url is required")

        params = self._normalize_query_params(cast(dict[str, Any] | None, src.get("multiValueQueryString")))
        headers = cast(dict[str, str] | None, src.get("headers")) or {}
        body = src.get("body")
        json_body = src.get("json")

        kwargs: dict[str, Any] = {"timeout": self.cfg.timeout_sec, "headers": headers}
        if method == "GET":
            kwargs["params"] = params
        elif method == "POST":
            kwargs["params"] = params
            if json_body is not None:
                kwargs["json"] = json_body
            elif body is not None:
                kwargs["data"] = body
        else:
            raise ValueError(f"Unsupported HTTP method: {method}")

        preview_qs = f"?{urlencode(params)}" if params and method == "GET" else ""
        logger.info("%s %s%s", method, url, preview_qs)

        resp = self.session.request(method, url, **kwargs)
        resp.raise_for_status()
        return resp.text

    def _write_output_zip(self, workdir: Path) -> None:
        src_dir = Path(workdir) / "seed_output"
        tmp_path = self.cfg.output_zip.with_suffix(self.cfg.output_zip.suffix + ".tmp")

        with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            for p in sorted(src_dir.rglob("*")):
                if p.is_file():
                    arcname = p.relative_to(workdir).as_posix()
                    zf.write(p, arcname)

        shutil.move(tmp_path, self.cfg.output_zip)


cfg = PackagerConfig(
    input_zip=Path("seed_output.zip"),
    output_zip=Path("output.zip"),
)

packager = PropertyZipPackager(cfg)
packager.run()

2025-08-16 17:47:52,442 | INFO | Extracting input ZIP: seed_output.zip
2025-08-16 17:47:52,445 | INFO | Input ZIP extracted and validated.
2025-08-16 17:47:52,446 | INFO | Reading property seed: /var/folders/g0/jndq4b_s1m96bvgtflhbx4lh0000gn/T/prop_zip_packager_2xi3gh4v/seed_output/property_seed.json
2025-08-16 17:47:52,447 | INFO | GET https://esearch.fbcad.org/Property/View/R66725?year=2025
2025-08-16 17:47:53,370 | INFO | Wrote HTML -> seed_output/R66725.html
2025-08-16 17:47:53,376 | INFO | Output ZIP written -> /Users/movsiienko/Projects/elephant/notebook/output.zip


PosixPath('output.zip')

In [None]:
# @title Step 4: Transform
#!/usr/bin/env python3
!npx -y @elephant-xyz/cli transform --group county --input-zip output.zip --input-csv seed.csv --output-zip county_output.zip

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K[1m[34m🐘 Elephant Network CLI - Transform[39m[22m

[2K[37m⠙[0m [2mbackoff==2.2.1                                                                [0m
[32m✅ Transform process finished[39m
[1m📊 Output:[22m
  Transformed data with HTML: [36mcounty_output.zip[39m

[90mThe output ZIP contains:[39m
[90m  - R66725/ (transformed data with HTML fact sheets)[39m
[1G[0K⠙[1G[0K

In [None]:
# @title Step 4.5: Upload fact sheet to the IPFS
!pip3 install python-dotenv -q
import json
import os
import tempfile
import zipfile
from pathlib import Path
from typing import Iterable, List, Tuple

import requests

from dotenv import load_dotenv
load_dotenv()

PIN_ENDPOINT = "https://api.pinata.cloud/pinning/pinFileToIPFS"
TIMEOUT = 60  # seconds


def _first_extracted_directory(root: Path) -> Path:
    """Return the first subdir, or root itself if files exist directly there."""
    dirs = sorted([p for p in root.iterdir() if p.is_dir()])
    return dirs[0] if dirs else root


def _gather_non_json_files(base_dir: Path) -> Iterable[Tuple[str, Path]]:
    """
    Yield (relative_path, absolute_path) for all non-JSON files under base_dir,
    where relative_path is POSIX-style and RELATIVE TO base_dir (no leading slash).
    """
    for p in base_dir.rglob("*"):
        if p.is_file() and p.suffix.lower() != ".json":
            rel = p.relative_to(base_dir).as_posix()
            yield rel, p


def pin_directory_non_json_from_zip(zip_path: Path) -> str:
    """
    - Extract zip to a temp dir
    - Pick the extracted directory (or root if files are directly inside)
    - Upload all non-JSON files as a *directory* to Pinata, ensuring each part's filename is
      '{dir_name}/{relative_path_within_dir}'
    - Return a public IPFS gateway URL for the resulting CID (CIDv1)
    """
    jwt = os.environ.get("PINATA_JWT")
    if not jwt:
        raise RuntimeError("PINATA_JWT environment variable is required")

    headers = {"Authorization": f"Bearer {jwt}"}

    with tempfile.TemporaryDirectory(prefix="pin_zip_") as tmpd:
        tmp_root = Path(tmpd)

        # 1) Extract
        with zipfile.ZipFile(zip_path, "r") as zf:
            zf.extractall(tmp_root)

        # 2) Locate directory to upload
        target_dir = _first_extracted_directory(tmp_root)
        dir_name = target_dir.name  # used as top-level folder prefix in multipart filenames

        # 3) Collect non-JSON files
        collected = list(_gather_non_json_files(target_dir))
        if not collected:
            raise RuntimeError("No non-JSON files found to upload.")

        # 4) Build multipart payload with '{dir_name}/...' paths
        # Keep file handles to close after the request
        open_files: List[object] = []
        try:
            files_payload: List[Tuple[str, tuple]] = []
            for rel_path, abs_path in collected:
                # Ensure the multipart filename contains the *directory name* prefix
                # so Pinata reconstructs the directory structure correctly.
                multipart_name = f"{dir_name}/{rel_path}"
                fobj = abs_path.open("rb")
                open_files.append(fobj)
                files_payload.append(
                    ("file", (multipart_name, fobj, "application/octet-stream"))
                )

            data = {
                "pinataOptions": json.dumps({"cidVersion": 1}),
                "pinataMetadata": json.dumps({"name": dir_name}),
            }

            # 5) Upload to Pinata
            resp = requests.post(
                PIN_ENDPOINT,
                headers=headers,
                data=data,
                files=files_payload,
                timeout=TIMEOUT,
            )
            try:
                resp.raise_for_status()
            except requests.HTTPError as e:
                # Surface Pinata error response for easier debugging
                raise RuntimeError(f"Pinata error: {e}\nResponse body: {resp.text}") from e

            payload = resp.json()
            cid = payload.get("IpfsHash")
            if not cid:
                raise RuntimeError(f"Unexpected Pinata response (no IpfsHash): {payload}")

            return f"https://ipfs.io/ipfs/{cid}/"
        finally:
            for f in open_files:
                try:
                    f.close()
                except Exception:
                    pass

url = pin_directory_non_json_from_zip(Path("county_output.zip"))
print("Uploaded to:", url)

Uploaded to: https://ipfs.io/ipfs/bafybeidhik4aqjlcp3zfve7dqxqimurdzrg2qmqzwsclnvkcwnnjwudc5a/


In [None]:
# @title Step 5: Validate
!pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_validate():
    try:
        print("Validation started")
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "validate", "county_output.zip"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Validate failed, please check submit_errors.csv for details", file=sys.stderr)
            return

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_validate()


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Validation started


In [None]:
# @title Step 6: Hash

!pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def get_seed_cid(path="seed-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["propertyCid"]


def run_hash():
    try:
        seed_group_cid = get_seed_cid()
        subprocess.run(
            [
                "npx", "-y", "@elephant-xyz/cli",
                "hash", "county_output.zip",
                "--output-zip", "hashed-data.zip",
                "--output-csv", "county-results.csv",
                "--property-cid", seed_group_cid
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Validate failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        print("✅ Hash done\n")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_hash()




[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
✅ Hash done



In [None]:
# @title Step 7: Upload
from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys

def upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli", "upload", "hashed-data.zip", "--output-csv", "county-results.csv"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )

        print("✅ Upload done\n")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    upload()


✅ Upload done



In [None]:
# @title Step 8: Submit

! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_transaction_hash(path="transaction-status.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["transactionHash"]


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_submit_to_contract():
    try:
        subprocess.run(
            [
                "npx", "-y", "@elephant-xyz/cli", "submit-to-contract", "county-results.csv",
                "--from-address", "0xefAd08946612A15d5De8D4Db7fc03556b6424075",
                "--api-key", "f7e18cf6-5d07-4e4a-ae23-f27b812614e6",
                "--domain", "oracles-69c46050.staircaseapi.com",
                "--oracle-key-id", "7ad26e0b-67c9-4c2f-95a2-2792c7db5ac7",
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Submit failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        transaction_hash = get_transaction_hash()
        transaction_link = f"https://polygonscan.com/tx/{transaction_hash}"

        print("✅ Submit done\n")
        print(f"Transaction link: {transaction_link}")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_submit_to_contract()



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
✅ Submit done

Transaction link: https://polygonscan.com/tx/0x4bdfc807b1aef531504cbc9caa55d8cf2553a4e303338a5dbc0fce9215eca1d9


In [None]:
# @title Step 8: Download county-results.csv
import os; from google.colab import files; (files.download('county-results.csv'), print("✅ File was downloaded successfully"))[1] if os.path.exists('county-results.csv') else print("❌ File not found")
