# ArcGIS Survey123 Attachment Downloader

- Preview downloads to test naming
- User-selected attributes (up to 4) for naming files in your preferred order
- Automatic filename cleaning to remove long descriptive text before dates

**Requirements:**
- `arcgis` library (install via `conda install -c esri arcgis` or `!pip install arcgis`)
- Valid ArcGIS Online/Enterprise credentials
- Write access to `save_path`
- To find the **survey_item_id**: 
  - Log in and open the survey123 project
  - URL will have: https://survey123.arcgis.com/surveys/ **32 letters and numbers** /
  - Copy the survey id and add to Section 2
- Update the save path to align with the survey name

## 0. Installing Required Libraries

Remove the hashtag below and run the cell to import the libraries.

**If there is an error after running the setup and import section, try installing with conda instead:**
```
conda install -c esri arcgis
```


In [None]:
#!pip install arcgis pandas

## 1. Setup and Imports

Import libraries and define connection variables. **Replace with your details.**


In [None]:
import arcgis
from arcgis.gis import GIS
import os
import re
import csv
import pandas as pd
import logging
from pathlib import Path
from typing import Dict, Any, List
from IPython.display import display

# Configure logging for error tracking
logging.basicConfig(
    level=logging.INFO, 
    format='%(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)

print("✓ Libraries imported successfully")


## 2. Performance Optimizations Setup

Compile regex patterns once and define helper functions

This section sets up the core optimizations:
- Pre-compiled regex patterns (avoid recompiling for each file)
- Reusable utility functions
- OID lookup builder (key to fast downloads)


In [None]:
# Compile regex patterns
DATE_PATTERN = re.compile(r'(\d{8}-\d{4,6})')
FILENAME_SANITIZER = re.compile(r'[^A-Za-z0-9_-]')
LAYER_NAME_SANITIZER = re.compile(r'[^A-Za-z0-9]+')

# ===== UTILITY FUNCTIONS =====

 # Single pass through columns
def find_objectid_field(df: pd.DataFrame) -> str:
    # First try exact matches (most common)
    exact_matches = ['OBJECTID', 'FID', 'OID']
    for col in df.columns:
        if col.upper() in exact_matches:
            return col
    
    # Then try partial matches
    for col in df.columns:
        if 'OBJECTID' in col.upper() or 'FID' in col.upper():
            return col
    
    # Fallback to index
    return df.index.name or 'index'

# Reuse pre-compiled regex

def sanitize_filename(text: str, max_length: int = 30) -> str:
    safe_val = FILENAME_SANITIZER.sub('_', str(text))
    return safe_val[:max_length]

# Clean filename to just the name (no path) and remove descriptive text before date

def clean_original_filename(filename: str) -> str:
    """Remove descriptive text before date in Survey123 filenames.
    
    Examples:
        'image_of_plants_in_the_hoop-20250805-173647.jpg' → '20250805-173647.jpg'
        'canopy_closure_photo-20260114-103045.jpg' → '20260114-103045.jpg'
        'IMG_0234.jpg' → 'IMG_0234.jpg' (unchanged if no date pattern)
    """
    # Use pre-compiled pattern
    match = DATE_PATTERN.search(filename)
    if match:
        return filename[match.start():]
    
    # Fallback: split from right, take last part
    parts = filename.rsplit('-', 2)
    return parts[-1] if len(parts) > 1 else filename

# Build OID lookup dictionary for O(1) access 
def build_oid_lookup(df: pd.DataFrame, objectid_field: str, 
                     naming_attributes: List[str]) -> Dict[int, Dict[str, Any]]:

    oid_to_attrs = {}
    unique_oids = df[objectid_field].unique()
    
    for oid in unique_oids:
        row = df[df[objectid_field] == oid]
        if len(row) > 0:
            oid_to_attrs[oid] = {attr: row[attr].iloc[0] for attr in naming_attributes}
    
    return oid_to_attrs

print("✓ Optimization functions defined")


## 3. Connect to ArcGIS

Connecting to Survey123 and configuring the download location for photos.


In [None]:
# Define variables
portalURL = "https://www.arcgis.com"
username = "Please Add Your Username"
password = "Please Add Your Password"
survey_item_id = "Please Add Your Survey Item ID"
save_path = r"C:\temp\(Please Add Folder Name)"  # Use short path to avoid Windows 260-char limit
keep_org_item = False

# Create save directory
Path(save_path).mkdir(parents=True, exist_ok=True)

# Connect to GIS
try:
    gis = GIS(portalURL, username, password, verify_cert=True)
    print(f"✓ Connected to survey: {gis.users.me.username}")
    
    survey_by_id = gis.content.get(survey_item_id)
    print(f"✓ Survey: {survey_by_id.title}")
    
    # Get related feature service
    rel_fs = survey_by_id.related_items('Survey2Service','forward')[0]
    print(f"✓ Feature service: {rel_fs.title}")
except Exception as e:
    logger.error(f"Failed to connect to ArcGIS: {str(e)}")
    raise


## 4. Export and View Data CSV

Export survey data to Excel, then load main layer data for inspection.


In [None]:
try:
    # Export data as Excel
    print("Exporting survey data...")
    item_excel = rel_fs.export(title=survey_by_id.title + '_data', export_format='Excel')
    excel_path = item_excel.download(save_path=save_path)[0]
    print(f"✓ Excel exported to: {excel_path}")
    
    if not keep_org_item:
        item_excel.delete()
    
    # Find main data layer (usually first layer with ObjectID)
    main_layer = next((l for l in rel_fs.layers if hasattr(l, 'properties') and l.properties.name != 'Attachments'), None)
    
    if main_layer:
        print(f"\n✓ Found main layer: {main_layer.properties.name}")
        data_df = main_layer.query(as_df=True)
        print(f"  Total records: {len(data_df)}")
        print(f"  Total fields: {len(data_df.columns)}")
        
        display(data_df.head())
        print("\nAvailable columns:")
        print(list(data_df.columns))
        
        # Find the ObjectID field using optimized function (OPTIMIZATION 1)
        objectid_field = find_objectid_field(data_df)
        print(f"\n✓ Using '{objectid_field}' as ObjectID field")
    else:
        raise ValueError("Could not find main data layer")
        
except Exception as e:
    logger.error(f"Failed to export/load data: {str(e)}")
    raise


## 5. Filename Cleaning Test

Test the optimized filename cleaning function (uses pre-compiled regex).


In [None]:
print("Filename cleaning examples (using compiled regex patterns):\n")

examples = [
    "image_of_plants_in_the_hoop-20250805-173647.jpg",
    "plot32_180_Plant_stand_Distance_between_plants-20250704-1931.jpg",
    "canopy_photo-20260114-103045.png",
    "IMG_0234.jpg"
]

for fname in examples:
    cleaned = clean_original_filename(fname)
    savings = len(fname) - len(cleaned)
    print(f"Original: {fname}")
    print(f"Cleaned:  {cleaned}")
    print(f"Saved:    {savings} characters\n")


## 6. Select Naming Attributes (with Order)

Choose up to **4 attributes** to name photos in your preferred order.

**Please select Object ID as the first attribute.**

Use column names OR numbers from the list above.

Format: `{attribute1}_{attribute2}_{attribute3}_{cleaned_filename}.jpg`


In [None]:
# Show numbered list of columns for easy selection
print("\nAvailable columns for naming:")
for i, col in enumerate(data_df.columns, 1):
    print(f" {i:2d}. {col}")

# Get user input with enhanced prompts
naming_input = input(
    f"\nEnter column name(s) for photo naming (comma-separated, or press Enter for '{objectid_field}'):\n"
    "Example: 'plot_id,treatment,date' or use numbers: '3,5,7'\n>>> "
)

# Parse input - split by comma and strip whitespace
if naming_input.strip():
    raw_inputs = [attr.strip() for attr in naming_input.split(',')]
    naming_attributes = []
    
    # Check if user entered numbers or column names
    for attr in raw_inputs:
        if attr.isdigit():
            # User entered column index number
            idx = int(attr) - 1  # Convert to 0-based index
            if 0 <= idx < len(data_df.columns):
                naming_attributes.append(data_df.columns[idx])
            else:
                print(f"Warning: Index {attr} out of range. Skipping.")
        else:
            # User entered column name
            naming_attributes.append(attr)
else:
    naming_attributes = [objectid_field]

# Validate all attributes exist
valid_attributes = []
for attr in naming_attributes:
    if attr in data_df.columns:
        valid_attributes.append(attr)
    else:
        print(f"Warning: '{attr}' not found in columns. Skipping.")

# Fallback to objectid_field if no valid attributes
if not valid_attributes:
    print(f"No valid columns found. Using '{objectid_field}'.")
    naming_attributes = [objectid_field]
else:
    naming_attributes = valid_attributes

# Limit to first 4 attributes to keep filenames reasonable
if len(naming_attributes) > 4:
    print(f"\nWarning: More than 4 attributes selected. Using first 4: {naming_attributes[:4]}")
    naming_attributes = naming_attributes[:4]

print(f"\n✓ Using {naming_attributes} for naming photos (in this order).")
print(f"Format: {('_'.join(['{' + attr + '}' for attr in naming_attributes]) + '_{cleaned_date}.jpg')}")

# Show example with actual data from first row
if len(data_df) > 0:
    sample_parts = []
    for attr in naming_attributes:
        sample_val = str(data_df[attr].iloc[0])[:20]  # First 20 chars
        safe_val = sanitize_filename(sample_val)
        sample_parts.append(safe_val)
    
    example_original = "image_of_faba_beans_in_the_hoop-20250805-173647.jpg"
    example_cleaned = clean_original_filename(example_original)
    
    print(f"\nExample with your data:")
    print(f" {'_'.join(sample_parts)}_{example_cleaned}")
    print(f"\nOriginal filename would have been:")
    print(f" {'_'.join(sample_parts)}_{example_original}")
    print(f" Savings: {len(example_original) - len(example_cleaned)} characters per file")


## 7. Preview: Download 2-3 Sample Images

Test download first few images from layers with attachments with cleaned filenames.

**Review the preview folder to verify naming before running full download.**


In [None]:
try:
    preview_path = Path(save_path) / 'preview'
    preview_path.mkdir(parents=True, exist_ok=True)
    
    preview_count = 0
    layers_with_att = [l for l in rel_fs.layers + rel_fs.tables if l.properties.hasAttachments == True]
    
    print(f"Downloading preview images with naming: {' → '.join(naming_attributes)}\n")
    
    for layer in layers_with_att:
        if preview_count >= 5:
            break
        
        sanitized_name = LAYER_NAME_SANITIZER.sub('', layer.properties.name)
        layer_folder = preview_path / f'{sanitized_name}_attachments'
        layer_folder.mkdir(parents=True, exist_ok=True)
        
        oids = layer.query(where="1=1", return_ids_only=True)['objectIds'][:2]  # 2 per layer max
        
        for oid in oids:
            if preview_count >= 5:
                break
            
            try:
                atts = layer.attachments.get_list(oid=oid)
                for att in atts[:1]:  # 1 per OID
                    att_id = att['id']
                    orig_path = layer.attachments.download(oid=oid, attachment_id=att_id, save_path=str(layer_folder))
                    orig_name = Path(orig_path[0]).name
                    
                    # Clean original filename (remove descriptive text before date)
                    clean_name = clean_original_filename(orig_name)
                    
                    # Build name from multiple attributes in specified order
                    name_parts = []
                    for attr in naming_attributes:
                        # Find matching row by ObjectID
                        matching_rows = data_df[data_df[objectid_field] == int(oid)]
                        if len(matching_rows) > 0:
                            att_val = matching_rows[attr].iloc[0]
                        else:
                            att_val = 'unknown'
                        # Sanitize value for filename
                        safe_val = sanitize_filename(str(att_val), max_length=20)
                        name_parts.append(safe_val)
                    
                    # Combine attributes with cleaned filename
                    new_name = f"{'_'.join(name_parts)}_{clean_name}"
                    new_path = layer_folder / new_name
                    Path(orig_path[0]).rename(new_path)
                    preview_count += 1
                    
                    print(f"Preview {preview_count}:")
                    print(f" Original: {orig_name} ({len(orig_name)} chars)")
                    print(f" New:      {new_name} ({len(new_name)} chars)")
                    print(f" Saved:    {len(orig_name) - len(clean_name)} chars from filename cleaning\n")
            except Exception as e:
                logger.error(f"Error downloading preview for OID {oid}: {str(e)}")
                continue
    
    print(f"\n✓ Preview complete. Check preview folder: {preview_path}")
    print(f"If naming looks correct, proceed to Section 8 for full download.")
    
except Exception as e:
    logger.error(f"Preview download failed: {str(e)}")
    raise


## 8. Full Download

Download ALL images with custom naming and cleaned filenames.

**Run only after preview looks correct.**


In [None]:
import time

start_time = time.time()

try:
    # Build OID lookup dictionary ONCE before downloads
    print("Building OID lookup dictionary...")
    oid_to_attrs = build_oid_lookup(data_df, objectid_field, naming_attributes)
    print(f"✓ Built lookup for {len(oid_to_attrs)} unique OIDs\n")
    
    # Full download with custom naming and filename cleaning
    layers = rel_fs.layers + rel_fs.tables
    total_downloaded = 0
    total_chars_saved = 0
    errors_encountered = 0
    
    for layer in layers:
        if not layer.properties.hasAttachments:
            continue
        
        print(f"\n{'='*70}")
        print(f"Processing layer: {layer.properties.name}")
        print(f"{'='*70}")
        
        # Sanitize layer name for folder
        sanitized_name = LAYER_NAME_SANITIZER.sub('', layer.properties.name)
        layer_path = Path(save_path) / f"{sanitized_name}_attachments"
        layer_path.mkdir(parents=True, exist_ok=True)
        
        csv_path = layer_path / f"{layer.properties.name}_attachments.csv"
        csv_fields = ['Parent ObjectId'] + naming_attributes + ['Original Filename', 'New Filename', 'Attachment path']
        
        try:
            with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
                csvwriter = csv.writer(csvfile)
                csvwriter.writerow(csv_fields)
                
                # Get all feature OIDs
                query_result = layer.query(where="1=1", return_ids_only=True, order_by_fields='objectid ASC')
                feature_oids = query_result['objectIds']
                total_features = len(feature_oids)
                
                print(f"Total features: {total_features}")
                print(f"Downloading attachments...\n")
                
                for idx, oid in enumerate(feature_oids, 1):
                    try:
                        current_oid_attachments = layer.attachments.get_list(oid=oid)
                        
                        if current_oid_attachments:
                            # Use dictionary lookup (O(1)) instead of DataFrame filter (O(n))
                            attr_vals = oid_to_attrs.get(oid, {attr: 'unknown' for attr in naming_attributes})
                            attr_values = [attr_vals.get(attr, 'unknown') for attr in naming_attributes]
                            name_parts = [sanitize_filename(attr_vals.get(attr, 'unknown')) for attr in naming_attributes]
                            
                            for att in current_oid_attachments:
                                try:
                                    attachment_id = att['id']
                                    orig_path = layer.attachments.download(
                                        oid=oid, 
                                        attachment_id=attachment_id, 
                                        save_path=str(layer_path)
                                    )
                                    orig_name = Path(orig_path[0]).name
                                    
                                    # Clean original filename
                                    clean_name = clean_original_filename(orig_name)
                                    chars_saved = len(orig_name) - len(clean_name)
                                    total_chars_saved += chars_saved
                                    
                                    # Build new filename
                                    new_name = f"{'_'.join(name_parts)}_{clean_name}"
                                    new_path = layer_path / new_name
                                    Path(orig_path[0]).rename(new_path)
                                    
                                    # Get relative path for CSV
                                    att_relative_path = new_path.relative_to(Path(save_path))
                                    csvwriter.writerow([oid] + attr_values + [orig_name, new_name, str(att_relative_path)])
                                    
                                    total_downloaded += 1
                                    
                                except Exception as e:
                                    logger.error(f"Failed to download attachment {att.get('id')} for OID {oid}: {str(e)}")
                                    errors_encountered += 1
                                    continue
                    
                    except Exception as e:
                        logger.error(f"Error processing OID {oid}: {str(e)}")
                        errors_encountered += 1
                        continue
                    
                    # Progress reporting (~every 5%)
                    progress_interval = max(1, total_features // 20)
                    if idx % progress_interval == 0 or idx == total_features:
                        percent = (idx / total_features) * 100
                        print(f"  [{idx:6d}/{total_features:6d}] {percent:5.1f}% complete...")
        
        except Exception as e:
            logger.error(f"Error processing layer {layer.properties.name}: {str(e)}")
            errors_encountered += 1
            continue
    
    # Summary report
    elapsed_time = time.time() - start_time
    
    print("\n" + "="*70)
    print("✓ FULL DOWNLOAD COMPLETE")
    print("="*70)
    print(f"Total attachments downloaded: {total_downloaded:,}")
    print(f"Total characters saved by filename cleaning: {total_chars_saved:,}")
    if total_downloaded > 0:
        print(f"Average savings per file: {total_chars_saved // total_downloaded} characters")
    print(f"\nExecution time: {elapsed_time:.1f} seconds")
    print(f"Errors encountered: {errors_encountered}")
    print(f"\nFiles saved to: {save_path}")
    print(f"Naming order: {' → '.join(naming_attributes)}")
    print("="*70 + "\n")
    
    # Display summary (only first 10 rows to save memory)
    if total_downloaded > 0:
        try:
            summary_df = pd.read_csv(csv_path, nrows=10)
            print("First 10 attachments:")
            display(summary_df)
        except Exception as e:
            logger.warning(f"Could not read CSV for summary: {str(e)}")

except Exception as e:
    logger.error(f"Download process failed: {str(e)}")
    raise


## Summary

**Output locations:**
- Excel data: `{save_path}/{survey_title}_data.xlsx`
- Preview images: `{save_path}/preview/`
- Full images: `{save_path}/LayerName_attachments/`
- Mapping CSVs: Link object IDs to original and new filenames with all selected attributes
