# Edinburgh Office Dataset Downloader & Google Drive Mounter

**Dataset:** https://homepages.inf.ed.ac.uk/rbf/OFFICEDATA/

This script downloads the Edinburgh office monitoring video dataset and mounts it to Google Drive for easy access.

**License:** CC BY-NC-SA (Attribution-NonCommercial-ShareAlike)

**Citation required:** T. Qasim, R. B. Fisher, N. Bhatti; Ground-truthing Large Human Behavior Monitoring Datasets, Proc. 2020 Int. Conf on Pattern Recognition

In [None]:
import os
import requests
import urllib.parse
from pathlib import Path
import zipfile
import tarfile
from bs4 import BeautifulSoup
import time
from tqdm.notebook import tqdm  # Use notebook-friendly tqdm
import hashlib

# Google Colab specific imports
try:
    from google.colab import drive, files
    import shutil
    COLAB_ENV = True
    print("✅ Detected Google Colab environment")
except ImportError:
    COLAB_ENV = False
    print("❌ Not in Google Colab - will download locally")

## Downloader Class

In [None]:
class EdinburghOfficeDownloader:
    def __init__(self, base_url="https://homepages.inf.ed.ac.uk/rbf/OFFICEDATA/"):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })

        # Setup directories
        if COLAB_ENV:
            self.local_dir = Path('/content/edinburgh_office_dataset')
            self.gdrive_dir = Path('/content/drive/MyDrive/Datasets/EdinburghOffice')
        else:
            self.local_dir = Path('./edinburgh_office_dataset')
            self.gdrive_dir = None

        self.local_dir.mkdir(parents=True, exist_ok=True)

    def mount_google_drive(self):
        """Mount Google Drive if in Colab"""
        if not COLAB_ENV:
            print("⚠️ Not in Colab environment - skipping Google Drive mount")
            return False

        try:
            print("📁 Mounting Google Drive...")
            drive.mount('/content/drive')

            # Create dataset directory in Google Drive
            self.gdrive_dir.mkdir(parents=True, exist_ok=True)
            print(f"✅ Google Drive mounted. Dataset will be saved to: {self.gdrive_dir}")
            return True
        except Exception as e:
            print(f"❌ Failed to mount Google Drive: {e}")
            return False

    def get_file_list(self):
        """Scrape the dataset webpage to get list of available files"""
        print(f"🔍 Scanning dataset webpage: {self.base_url}")

        try:
            response = self.session.get(self.base_url, timeout=30)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')
            files = []

            # Find all links to downloadable files
            for link in soup.find_all('a', href=True):
                href = link['href']
                # Look for common dataset file extensions
                if any(href.lower().endswith(ext) for ext in [
                    '.zip', '.tar.gz', '.tar', '.rar', '.7z', '.mp4', '.avi',
                    '.mov', '.jpg', '.jpeg', '.png', '.mat', '.txt', '.csv'
                ]):
                    full_url = urllib.parse.urljoin(self.base_url, href)
                    files.append({
                        'url': full_url,
                        'filename': os.path.basename(href),
                        'link_text': link.get_text().strip()
                    })

            # Also look for subdirectories
            for link in soup.find_all('a', href=True):
                href = link['href']
                if href.endswith('/') and not href.startswith('http') and href != '../':
                    subdir_url = urllib.parse.urljoin(self.base_url, href)
                    print(f"🔍 Found subdirectory: {subdir_url}")
                    # Recursively scan subdirectories
                    subfiles = self.scan_directory(subdir_url)
                    files.extend(subfiles)

            print(f"✅ Found {len(files)} files to download")
            return files

        except Exception as e:
            print(f"❌ Error scanning webpage: {e}")
            return []

    def scan_directory(self, dir_url):
        """Recursively scan a directory for files"""
        files = []
        try:
            response = self.session.get(dir_url, timeout=30)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            for link in soup.find_all('a', href=True):
                href = link['href']
                if any(href.lower().endswith(ext) for ext in [
                    '.zip', '.tar.gz', '.tar', '.rar', '.7z', '.mp4', '.avi',
                    '.mov', '.jpg', '.jpeg', '.png', '.mat', '.txt', '.csv'
                ]):
                    full_url = urllib.parse.urljoin(dir_url, href)
                    files.append({
                        'url': full_url,
                        'filename': os.path.basename(href),
                        'link_text': link.get_text().strip(),
                        'subdir': dir_url.replace(self.base_url, '')
                    })

        except Exception as e:
            print(f"⚠️ Error scanning directory {dir_url}: {e}")

        return files

    def download_file(self, file_info, destination_dir):
        """Download a single file with progress bar"""
        url = file_info['url']
        filename = file_info['filename']

        # Handle subdirectories
        if 'subdir' in file_info and file_info['subdir']:
            subdir_path = destination_dir / file_info['subdir']
            subdir_path.mkdir(parents=True, exist_ok=True)
            filepath = subdir_path / filename
        else:
            filepath = destination_dir / filename

        # Skip if file already exists and has the same size
        if filepath.exists():
            try:
                head_response = self.session.head(url, timeout=30)
                remote_size = int(head_response.headers.get('content-length', 0))
                local_size = filepath.stat().st_size

                if remote_size > 0 and local_size == remote_size:
                    print(f"⏭️ Skipping {filename} (already downloaded)")
                    return True
            except:
                pass

        print(f"📥 Downloading: {filename}")

        try:
            response = self.session.get(url, stream=True, timeout=30)
            response.raise_for_status()

            total_size = int(response.headers.get('content-length', 0))

            with open(filepath, 'wb') as f:
                if total_size > 0:
                    with tqdm(total=total_size, unit='B', unit_scale=True, desc=filename) as pbar:
                        for chunk in response.iter_content(chunk_size=8192):
                            if chunk:
                                f.write(chunk)
                                pbar.update(len(chunk))
                else:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)

            print(f"✅ Downloaded: {filename}")
            return True

        except Exception as e:
            print(f"❌ Failed to download {filename}: {e}")
            if filepath.exists():
                filepath.unlink()  # Remove partial download
            return False

    def extract_archives(self, directory):
        """Extract any downloaded archive files"""
        print("📦 Extracting archive files...")

        archive_files = []
        for ext in ['.zip', '.tar.gz', '.tar', '.rar', '.7z']:
            archive_files.extend(directory.glob(f"**/*{ext}"))

        for archive_path in archive_files:
            print(f"📦 Extracting: {archive_path.name}")

            extract_dir = archive_path.parent / archive_path.stem
            extract_dir.mkdir(exist_ok=True)

            try:
                if archive_path.suffix == '.zip':
                    with zipfile.ZipFile(archive_path, 'r') as zip_ref:
                        zip_ref.extractall(extract_dir)
                elif archive_path.name.endswith('.tar.gz') or archive_path.name.endswith('.tar'):
                    with tarfile.open(archive_path, 'r:*') as tar_ref:
                        tar_ref.extractall(extract_dir)

                print(f"✅ Extracted: {archive_path.name}")

            except Exception as e:
                print(f"❌ Failed to extract {archive_path.name}: {e}")

    def sync_to_gdrive(self):
        """Copy downloaded files to Google Drive"""
        if not COLAB_ENV or not self.gdrive_dir:
            print("⚠️ Google Drive not available - files saved locally only")
            return

        print("☁️ Syncing to Google Drive...")

        try:
            # Copy entire dataset directory to Google Drive
            if self.gdrive_dir.exists():
                print(f"Path {self.gdrive_dir} exists. Removing before copying.")
                shutil.rmtree(self.gdrive_dir)

            shutil.copytree(self.local_dir, self.gdrive_dir)
            print(f"✅ Dataset synced to Google Drive: {self.gdrive_dir}")

            # Create a README with dataset info
            readme_content = """
# Edinburgh Office Monitoring Dataset

This dataset contains low frame rate video of people doing their normal activities
in an office setting. The data is acquired using a fixed camera as a set of
1280*720 pixel color images captured at an average of about 1 FPS.

## License
Attribution-NonCommercial-ShareAlike (CC BY-NC-SA)

## Citation
T. Qasim, R. B. Fisher, N. Bhatti; Ground-truthing Large Human Behavior
Monitoring Datasets, Proc. 2020 Int. Conf on Pattern Recognition, online, 2021.

## Acknowledgment
"We thank the University of Edinburgh for the use of the low resolution
video and ground truth data."

## Contact
Robert Fisher at rbf@inf.ed.ac.uk
School of Informatics, University of Edinburgh

## Downloaded
Dataset downloaded and mounted on: """ + time.strftime("%Y-%m-%d %H:%M:%S")

            readme_path = self.gdrive_dir / 'README.md'
            readme_path.write_text(readme_content)

        except Exception as e:
            print(f"❌ Failed to sync to Google Drive: {e}")

    def generate_file_manifest(self):
        """Generate a manifest of downloaded files"""
        print("📋 Generating file manifest...")

        manifest = []
        for filepath in self.local_dir.rglob('*'):
            if filepath.is_file():
                # Calculate file hash for integrity checking
                sha256_hash = hashlib.sha256()
                with open(filepath, "rb") as f:
                    for byte_block in iter(lambda: f.read(4096), b""):
                        sha256_hash.update(byte_block)

                manifest.append({
                    'file': str(filepath.relative_to(self.local_dir)),
                    'size': filepath.stat().st_size,
                    'sha256': sha256_hash.hexdigest()
                })

        # Save manifest
        manifest_path = self.local_dir / 'file_manifest.txt'
        with open(manifest_path, 'w') as f:
            f.write("Edinburgh Office Dataset - File Manifest\n")
            f.write("=" * 50 + "\n\n")

            total_size = 0
            for item in manifest:
                f.write(f"File: {item['file']}\n")
                f.write(f"Size: {item['size']:,} bytes\n")
                f.write(f"SHA256: {item['sha256']}\n")
                f.write("-" * 30 + "\n")
                total_size += item['size']

            f.write(f"\nTotal files: {len(manifest)}\n")
            f.write(f"Total size: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)\n")

        print(f"✅ File manifest saved: {manifest_path}")

    def run(self):
        """Main execution function"""
        print("🚀 Starting Edinburgh Office Dataset Download")
        print("=" * 60)

        # Mount Google Drive if in Colab
        if COLAB_ENV:
            self.mount_google_drive()

        # Get list of files to download
        files_to_download = self.get_file_list()

        if not files_to_download:
            print("❌ No files found to download. Please check the dataset URL.")
            return False

        # Download files
        print(f"\n📥 Starting download of {len(files_to_download)} files...")
        successful_downloads = 0

        for i, file_info in enumerate(files_to_download, 1):
            print(f"\n[{i}/{len(files_to_download)}] ", end="")
            if self.download_file(file_info, self.local_dir):
                successful_downloads += 1

            # Add a small delay to be respectful to the server
            time.sleep(1)

        print(f"\n✅ Successfully downloaded {successful_downloads}/{len(files_to_download)} files")

        # Extract archives
        self.extract_archives(self.local_dir)

        # Generate file manifest
        self.generate_file_manifest()

        # Sync to Google Drive
        if COLAB_ENV:
            self.sync_to_gdrive()

        print("\n🎉 Dataset download and setup complete!")
        print(f"📁 Local directory: {self.local_dir}")
        if self.gdrive_dir:
            print(f"☁️ Google Drive directory: {self.gdrive_dir}")

        return True

## Execute Download

In [None]:
# Initialize downloader
downloader = EdinburghOfficeDownloader()

# Run the download process
success = downloader.run()

if success:
    print("\n✅ All done! Your dataset is ready to use.")
    if COLAB_ENV:
        print("\n📋 Quick Start Guide:")
        print("1. Navigate to your Google Drive")
        print("2. Open the 'Datasets/EdinburghOffice' folder")
        print("3. Check the README.md for dataset information")
        print("4. Use the file_manifest.txt to verify file integrity")
else:
    print("\n❌ Download failed. Please check the error messages above.")

## Utility Functions

In [None]:
def load_dataset_info():
    """Load basic information about the downloaded dataset"""
    if COLAB_ENV:
        dataset_path = Path('/content/drive/MyDrive/Datasets/EdinburghOffice')
    else:
        dataset_path = Path('./edinburgh_office_dataset')

    if not dataset_path.exists():
        print("❌ Dataset not found. Please run the downloader first.")
        return None

    # Read manifest
    manifest_path = dataset_path / 'file_manifest.txt'
    if manifest_path.exists():
        print(f"📋 Dataset manifest: {manifest_path}")
        with open(manifest_path, 'r') as f:
            print(f.read())

    return dataset_path

def list_video_files():
    """List all video files in the dataset"""
    dataset_path = load_dataset_info()
    if not dataset_path:
        return []

    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv']
    video_files = []

    for ext in video_extensions:
        video_files.extend(list(dataset_path.rglob(f"*{ext}")))

    print(f"🎥 Found {len(video_files)} video files:")
    for video in video_files:
        print(f"  - {video.relative_to(dataset_path)}")

    return video_files

def list_image_files():
    """List all image files in the dataset"""
    dataset_path = load_dataset_info()
    if not dataset_path:
        return []

    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
    image_files = []

    for ext in image_extensions:
        image_files.extend(list(dataset_path.rglob(f"*{ext}")))

    print(f"🖼️ Found {len(image_files)} image files:")
    for image in image_files:
        print(f"  - {image.relative_to(dataset_path)}")

    return image_files

In [None]:
# Example of using the utility functions
list_video_files();
list_image_files();