Camilla Green 

5/16/25

# Asana Task and Attachment Export Script
This script uses the Asana API to access tasks, subtasks, and attachments. The script retrieves tasks and subtasks from all of the projects that the user has access to, extracts their details, and saves them to a CSV file. It also retrieves attachments for each task. The output of the script is saved to the same location as the script itself. The script creates a folder "attachments". Each project has a folder inside "attachments," and each task has a folder inside the project folder which contains the associated attachments. The script also creates a log file of any projects that were skipped. For the use case of the GRID3 team, all of the projects that the user has access to will be downloaded. Due to the large file size, projects will be processed in batches of 25. The script zips all of the output into one zip file. 

# To summarize, the output will look like:

attachments.zip 

Inside:

/attachments/<project_name>/<task_name>/<attachment_name.extension>

The csv is named <project_name>_export.csv, is inside <project_name> folder, and contains fields for all of the tasks and subtasks

In [None]:
#import necessary packages
import requests
import csv
import os
import re
import zipfile
import shutil
from dotenv import load_dotenv

In my case, I saved my API key locally in a file called "asana_api_key.txt". The script reads the API key from this file. Instead, you can replace the API key in the following chunk with your own API key

# First get your asana pat from here:
https://app.asana.com/0/my-apps

In [None]:
#pat - personal access token

load_dotenv()  # Load from .env file
PAT = os.getenv("PAT")

#REPLACE THIS WITH YOUR OWN TOKEN
#PAT = "YOUR OWN TOKEN HERE"
headers = {
    "Authorization": f"Bearer {PAT}"
}
#print info about owner of api token
response = requests.get("https://app.asana.com/api/1.0/users/me", headers=headers)
print(response.json())

In [None]:
#this will determine if the user is authenticated and print the user if it is successful
def verify_authentication():
    response = requests.get("https://app.asana.com/api/1.0/users/me", headers=headers)
    user = response.json().get("data", {}).get("name", "Unknown")
    print(f"🔐 Authenticated as: {user}")

In [None]:
verify_authentication()

🔐 Authenticated as: Camilla Green


This will list all the asana projects so you can get the relevant project id 

In [None]:
headers = {
    "Authorization": f"Bearer {PAT}"
}

response = requests.get("https://app.asana.com/api/1.0/projects", headers=headers)

print(response.json())

{'data': [{'gid': '1147666751701275', 'name': 'Cleaning and processing basemap layers from Haut-Lomami and Tanganyika', 'resource_type': 'project'}, {'gid': '1148360962805936', 'name': 'Production and Delivery of Boundaries (Alpha)', 'resource_type': 'project'}, {'gid': '1153037761383214', 'name': 'Production and Delivery of Boundaries (Beta)', 'resource_type': 'project'}, {'gid': '1148360962805943', 'name': 'Delivery of Settlement layer (microplan, comprehensive and various versions) (Alpha)', 'resource_type': 'project'}, {'gid': '1148360962805948', 'name': 'Delivery of FOSA layer (Alpha)', 'resource_type': 'project'}, {'gid': '1148949750639595', 'name': 'Kwilu Use Case Workshop', 'resource_type': 'project'}, {'gid': '1149607162606753', 'name': 'Weekly Check-in ', 'resource_type': 'project'}, {'gid': '1158312676770821', 'name': 'Settlement base maps', 'resource_type': 'project'}, {'gid': '1158312676770825', 'name': 'Infrastructure maps', 'resource_type': 'project'}, {'gid': '116292262

In [None]:
#this will get info on all projects accessible by the authenticated user
def get_all_accessible_projects():
    projects = {}
    # First get workspaces
    workspaces_url = "https://app.asana.com/api/1.0/workspaces"
    workspaces_response = requests.get(workspaces_url, headers=headers)
    workspaces = workspaces_response.json().get("data", [])

    # Then get projects in each workspace
    for ws in workspaces:
        workspace_gid = ws["gid"]
        workspace_name = ws["name"]
        print(f"🔍 Scanning projects in workspace: {workspace_name}")
        projects_url = f"https://app.asana.com/api/1.0/projects?workspace={workspace_gid}&archived=false"
        while projects_url:
            proj_response = requests.get(projects_url, headers=headers)
            proj_data = proj_response.json().get("data", [])
            for p in proj_data:
                projects[p["name"]] = p["gid"]
            projects_url = proj_response.json().get("next_page", {}).get("uri")
    
    
    print(f"\n Total projects added: {len(projects)}")
    return projects


# Replace hardcoded dictionary:
projects = get_all_accessible_projects()


🔍 Scanning projects in workspace: ciesin.columbia.edu

 Total projects added: 116


In [None]:
#create batches of 25 projects to process at a time
def batch_projects(projects_dict, batch_size=25):
    items = list(projects_dict.items())
    for i in range(0, len(items), batch_size):
        yield dict(items[i:i + batch_size])

In [None]:
# Utility Functions
# get project name from Asana API using its GID - this is used to create a folder for the project
def get_project_name(project_gid):
    """Get the project name from Asana API using its GID."""
    url = f"https://app.asana.com/api/1.0/projects/{project_gid}"
    response = requests.get(url, headers=headers)
    return response.json()["data"].get("name", "asana_project")

#clean the name of the project to make it a valid folder name
#this is done by replacing invalid characters with underscores
def clean_filename(name):
    """Sanitize file and folder names."""
    return re.sub(r'[\\/*?:"<>|]', "_", name.replace(" ", "_"))

#create a folder for the project with the project name from asana,
#this folder will go inside the folder named "attachments" 
def create_output_folder(project_gid, base_path="attachments"):
    project_name = clean_filename(get_project_name(project_gid))
    folder_name = f"{project_name}"
    output_path = os.path.join(base_path, folder_name)
    os.makedirs(output_path, exist_ok=True)
    return output_path



In [None]:
# Task and Metadata Export
#get all tasks for a project
def get_all_tasks(project_gid):
    tasks = []
    url = f"https://app.asana.com/api/1.0/projects/{project_gid}/tasks"
    params = {"opt_fields": "name,completed,assignee.name,due_on,notes,custom_fields"}
    while url:
        r = requests.get(url, headers=headers, params=params)
        tasks.extend(r.json().get("data", []))
        url = r.json().get("next_page", {}).get("uri")
    return tasks

#get all subtasks for each task
def get_subtasks_for_task(task_gid):
    subtasks = []
    url = f"https://app.asana.com/api/1.0/tasks/{task_gid}/subtasks"
    params = {"opt_fields": "name,completed"}
    while url:
        r = requests.get(url, headers=headers, params=params)
        subtasks.extend(r.json().get("data", []))
        url = r.json().get("next_page", {}).get("uri")
    return subtasks

#get stories (comments and system history/events) for each task and subtask
def get_stories_for_task(task_gid):
    stories = []
    url = f"https://app.asana.com/api/1.0/tasks/{task_gid}/stories"
    while url:
        r = requests.get(url, headers=headers)
        for s in r.json().get("data", []):
            created_by = s.get("created_by")
            author = created_by["name"] if created_by else "System"
            stories.append({
                "type": s.get("type"),
                "resource_subtype": s.get("resource_subtype"),
                "author": author,
                "created_at": s.get("created_at"),
                "text": s.get("text", "")
            })
        url = r.json().get("next_page", {}).get("uri")
    return stories

#format stories (comments) into a single string so they can be written to a csv as 1 field
def flatten_stories(stories):
    return " | ".join(
        f"[{s['created_at']}] {s['author']} ({s['resource_subtype']}): {s['text']}"
        for s in stories
    )

#format subtasks into a single string so they can be written to a csv as 1 field
def flatten_subtasks(subtasks):
    lines = []
    for s in subtasks:
        name = s['name']
        completed = s['completed']
        stories = get_stories_for_task(s['gid'])
        story_text = flatten_stories(stories)
        lines.append(f"{name} (Completed: {completed}) - Stories: [{story_text}]")
    return " | ".join(lines)

In [None]:
#export the project data to a csv file
def export_project_data_to_csv(project_gid, output_folder, log_file="skipped_projects.log"):
    """Export full task data to a CSV file or log if no tasks found."""
    project_name = clean_filename(get_project_name(project_gid))
    output_file = os.path.join(output_folder, f"{project_name}_export.csv")
    tasks = get_all_tasks(project_gid)
    all_task_data = []

    for task in tasks:
        subtasks = get_subtasks_for_task(task["gid"])
        stories = get_stories_for_task(task["gid"])
        task_data = {
            "Task GID": task["gid"],
            "Name": task.get("name", "[No name]"),
            "Completed": task.get("completed"),
            "Assignee": task.get("assignee", {}).get("name", "Unassigned") if task.get("assignee") else "Unassigned",
            "Due On": task.get("due_on") or "No due date",
            "Notes": task.get("notes") or "No notes",
            "Custom Fields": ", ".join([
                f"{f.get('name')}: {f.get('text_value') or f.get('number_value') or (f.get('enum_value') or {}).get('name', '')}"
                for f in task.get("custom_fields", []) if f
            ]),
            "Subtasks": flatten_subtasks(subtasks),
            "Stories": flatten_stories(stories)
        }
        all_task_data.append(task_data)

    if not all_task_data:
        msg = f"{project_name} (GID: {project_gid}) - Skipped: No tasks found\n"
        with open(log_file, "a", encoding="utf-8") as log:
            log.write(msg)
        print(f"⚠️ {msg.strip()}")
        return

    with open(output_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=all_task_data[0].keys())
        writer.writeheader()
        writer.writerows(all_task_data)

    print(f"📄 Exported {len(all_task_data)} tasks to '{output_file}'")


In [None]:
# access attachments for each project, process each task and download into folders based on task name
def get_attachments_for_task(task_gid):
    attachments = []
    url = f"https://app.asana.com/api/1.0/tasks/{task_gid}/attachments"
    while url:
        r = requests.get(url, headers=headers)
        attachments.extend(r.json().get("data", []))
        url = r.json().get("next_page", {}).get("uri")
    return attachments

#get the download URL for each attachement
def get_download_url(attachment_gid):
    url = f"https://app.asana.com/api/1.0/attachments/{attachment_gid}"
    r = requests.get(url, headers=headers)
    return r.json().get("data", {}).get("download_url") if r.status_code == 200 else None

# Helper function to safely download with retries
def safe_download(url, retries=3, timeout=30):
    for attempt in range(retries):
        try:
            response = requests.get(url, stream=True, timeout=timeout)
            if response.status_code == 200:
                return response
            else:
                print(f"⚠️ Status {response.status_code} for {url}")
                return None
        except requests.exceptions.RequestException as e:
            print(f"⚠️ Attempt {attempt + 1} failed: {e}")
            if attempt < retries - 1:
                time.sleep(2 ** attempt)  # exponential backoff
            else:
                return None

# Main attachment download function
def download_all_attachments(project_gid, base_folder, log_file="skipped_projects.log"):
    tasks = get_all_tasks(project_gid)

    if not tasks:
        project_name = clean_filename(get_project_name(project_gid))
        msg = f"{project_name} (GID: {project_gid}) - Skipped: No tasks for attachment download\n"
        with open(log_file, "a", encoding="utf-8") as log:
            log.write(msg)
        print(f"⚠️ {msg.strip()}")
        return

    for task in tasks:
        task_gid = task["gid"]
        task_name = clean_filename(task.get("name", "untitled_task"))
        task_folder = os.path.join(base_folder, f"{task_name}")
        os.makedirs(task_folder, exist_ok=True)

        attachments = get_attachments_for_task(task_gid)
        for att in attachments:
            url = get_download_url(att["gid"]) or att.get("permalink_url")
            if not url:
                print(f"⚠️ Skipping {att['name']}: no download URL")
                continue
            try:
                print(f"⬇️ Downloading {att['name']} to {task_folder}...")
                r = safe_download(url)
                if r:
                    file_path = os.path.join(task_folder, clean_filename(att["name"]))
                    with open(file_path, "wb") as f:
                        for chunk in r.iter_content(1024):
                            f.write(chunk)
                    print(f"✅ Saved to {file_path}")
                else:
                    print(f"❌ Failed to download {att['name']} after retries")
            except Exception as e:
                print(f"❌ Error downloading {att['name']}: {e}")

In [None]:
#this will zip the folder containing all the attachments into "attachments.zip" 
def zip_folder(folder_path, zip_name=None, delete_original=False):
    #If you want to delete the original folder after zipping, set delete_original=True.
    if not zip_name:
        zip_name = folder_path + ".zip"

    print(f"🗜️ Zipping folder '{folder_path}' to '{zip_name}'...")
    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, start=folder_path)
                try:
                    with open(file_path, 'rb') as fsrc:
                        zipf.writestr(arcname, fsrc.read())  # reads small chunks internally
                except Exception as e:
                    print(f"⚠️ Skipped {file_path} due to error: {e}")

    print(f"✅ Zip created: {zip_name}")

    if delete_original:
        shutil.rmtree(folder_path)
        print(f"🧹 Deleted original folder: {folder_path}")


In [None]:
#  Main Loop to process tasks, subtasks, and attachments
#this processes all projects in the dictionary in batches and creates a folder for each project
# It will zip the attachments folder at the end
def process_all_projects(projects, combined_output_root="attachments", combined_zip_name="attachments.zip"):
    os.makedirs(combined_output_root, exist_ok=True)
    verify_authentication() 
    for batch_num, project_batch in enumerate(batch_projects(projects, batch_size=25), start=1):
        print(f"\n📦 Starting Batch {batch_num} (processing {len(project_batch)} projects)...")
        for name, gid in project_batch.items():
            print(f"\nProcessing Project: {name}")
            project_folder = create_output_folder(gid, base_path=combined_output_root)
            export_project_data_to_csv(gid, project_folder)
            download_all_attachments(gid, project_folder)

    # 🔚 Create one zip from the full attachments directory
    zip_folder(combined_output_root, combined_zip_name)


In [None]:
# Run everything
#this will process all the projects listed in the dictionary, providing a csv file and a folder with attachments for each project
#the csv file will be named <project_name>_export.csv and have all the tasks, subtasks, and their metadata
#the attachments will be downloaded into folders named after the tasks
process_all_projects(projects)

🔐 Authenticated as: Camilla Green

📦 Starting Batch 1 (processing 25 projects)...

Processing Project: Cleaning and processing basemap layers from Haut-Lomami and Tanganyika
📄 Exported 1 tasks to 'attachments/Cleaning_and_processing_basemap_layers_from_Haut-Lomami_and_Tanganyika/Cleaning_and_processing_basemap_layers_from_Haut-Lomami_and_Tanganyika_export.csv'

Processing Project: Production and Delivery of Boundaries (Alpha)
📄 Exported 8 tasks to 'attachments/Production_and_Delivery_of_Boundaries_(Alpha)/Production_and_Delivery_of_Boundaries_(Alpha)_export.csv'

Processing Project: Production and Delivery of Boundaries (Beta)
📄 Exported 21 tasks to 'attachments/Production_and_Delivery_of_Boundaries_(Beta)/Production_and_Delivery_of_Boundaries_(Beta)_export.csv'
⬇️ Downloading image.png to attachments/Production_and_Delivery_of_Boundaries_(Beta)/Issues_in_Kayamba...
✅ Saved to attachments/Production_and_Delivery_of_Boundaries_(Beta)/Issues_in_Kayamba/image.png
⬇️ Downloading Capture_Ka