# ZIP Flatten + Unzip PoC

This notebook provides a repeatable proof-of-concept for **flattening and unzipping ZIP files** into a **single output folder**. It is designed to validate that our approach can:
- Discover ZIPs in nested directory structures
- Unzip archives (including ZIPs that contain internal folder paths)
- Flatten all extracted files into a single destination directory
- Avoid filename collisions using a deterministic naming strategy
- Continue processing when corrupt ZIPs are encountered (skip + continue)

### How it works
1. Takes a **source folder** containing ZIP files (recursively)
2. For each ZIP:
   - Extracts contents into a **temporary folder** under the destination directory
   - Moves each extracted file into the destination root folder using the naming format:
     
`     <ZipBaseName>_<Hash(zip_path)>_<FlattenedInternalPath>`
     
3. Removes the temporary folder to keep the destination clean

### Inputs (widgets)
- `src_vol`: Source directory containing ZIPs (recursive)
- `dst_vol`: Destination directory for extracted files (flat output)
- `parallelism`: Max number of ZIPs processed in parallel (`xargs -P`)

### Outputs
- Extracted files in: `dst_vol/`
- Failure log in: `dst_vol/_log/unzip_failures_<timestamp>.log`



In [0]:
import os

# Notebook parameters 
dbutils.widgets.text("src_vol", "<path-to-zip-files>", "Source")
dbutils.widgets.text("dst_vol", "<path-for-extracted-files>", "Destination")
dbutils.widgets.text("parallelism", "4", "Parallel ZIP Workers")

# Read widget values
src_vol = dbutils.widgets.get("src_vol")
dst_vol = dbutils.widgets.get("dst_vol")
parallelism = dbutils.widgets.get("parallelism")

# Pass values into the shell environment for the %sh cell
os.environ["SRC_VOL"] = src_vol
os.environ["DST_VOL"] = dst_vol
os.environ["PARALLELISM"] = parallelism

In [0]:
%sh
# Folder_Hash_File
# - Recursively find ZIP files in the source directory
# - Unzip each one into a temporary folder under the destination
# - Move extracted files into the destination root (flattened)
# - Rename to avoid collisions: <zipBase>_<hash(zip_path)>_<internalPathFlattened>
# - Clean up temp folder
#
# Note: xargs -P "$PARALLELISM" controls ZIP-level parallelism (default 4)

set -euo pipefail

src_vol="$SRC_VOL"
dst_vol="$DST_VOL"
PARALLELISM="${PARALLELISM:-4}"

# Infrastructure must succeed
mkdir -p "$dst_vol"
mkdir -p "$dst_vol/_log"

run_ts=$(date -u +"%Y%m%dT%H%M%SZ")

log_dir="$dst_vol/_log"
mkdir -p "$log_dir"

final_log="$log_dir/unzip_failures_${run_ts}.log"

find "$src_vol" -type f -iname "*.zip" -print0 |
  xargs -0 -P "$PARALLELISM" -I {} bash -c '
    zip="$1"
    dst="$2"
    log_dir="$3"

# Individual logs for each process
    log_file="$log_dir/tmp_$$.log"

    log() { echo "[$(date -u +%H:%M:%S)] [pid=$$] $*"; }
    log "START  $zip"

# Create a deterministic short hash from the zip path (used for uniqueness)
    hash=$(printf "%s" "$zip" | shasum -a 256 | cut -c1-12)

# Zip filename and base name (strip .zip/.ZIP)
    zname=$(basename "$zip")
    zbase=${zname%.[Zz][Ii][Pp]}   # strip .zip/.ZIP


# Create a temporary extraction folder specific to this zip + process id
# This prevents clashes when running in parallel.
    tmp="$dst/tmp_${hash}_$$"
    mkdir -p "$tmp"


# Extract quietly into temp. If unzip fails (corrupt zip), cleanup, log and skip.
    if ! err_out=$(unzip -oq "$zip" -d "$tmp" 2>&1); then
      ts=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
      msg=$(printf "%s" "$err_out" | tr "\n" " " | cut -c1-800)
      printf "%s | UNZIP_FAIL | %s | %s\n" "$ts" "$zip" "$msg" >> "$log_file"
      rm -rf "$tmp"
      log "FAIL   $zip"
      exit 0
    fi


# For each extracted file:
# - compute its relative path inside the zip extraction
# - replace "/" with "_" to flatten internal folder structures
# - move into the destination root with a collision-safe name
    while IFS= read -r -d "" f; do
      rel=${f#"$tmp"/}
      safe=${rel//\//_}
      mv "$f" "$dst/${zbase}_${hash}_$safe"
    done < <(find "$tmp" -type f -print0)

    log "OK     $zip"


# Remove temp extraction folder
    rm -rf "$tmp"
  ' bash {} "$dst_vol" "$log_dir"

# merge temp logs (if any) and remove them
cat "$log_dir"/tmp_*.log 2>/dev/null >> "$final_log" || true
rm -f "$log_dir"/tmp_*.log 2>/dev/null || true

[10:46:17] [pid=11665] START  /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/COLLISION_ACROSS_ZIPS__A.zip
[10:46:17] [pid=11668] START  /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/INTERNAL_STRUCTURE_IN_ZIP__nested_paths_and_colliding_basenames.zip
[10:46:17] [pid=11667] START  /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/CORRUPT_ZIP__invalid_bytes.zip
[10:46:17] [pid=11666] START  /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/COLLISION_ACROSS_ZIPS__B.zip
[10:46:18] [pid=11667] FAIL   /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/CORRUPT_ZIP__invalid_bytes.zip
[10:46:18] [pid=11717] START  /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/MANY_SMALL_FILES__1000_members.zip
[10:46:19] [pid=11665] OK     /Volumes/raw_wilson/erp/incoming/_stage_unzip_behavior_pack/unzip_behavior_pack/COLLISI