In [2]:
import os
from pathlib import Path

import re
import requests
import boto3

import zipfile
import py7zr

from tqdm import tqdm

import numpy as np
import pandas as pd
import rasterio as rio
import matplotlib.pyplot as plt

import torch
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import pytorch_lightning as pl

In [3]:
#ROOT = Path("C:/Users/hp/Desktop/data-pipeline/data/urban-feature-extraction")
ROOT = Path.cwd().parent / "data"

True

### Base Class

In [11]:
class DatasetETL:
    def __init__(self, root:Path, urls:dict, low_storage_mode:bool):
        download_dir = (root / "downloads") 
        if not root.exists():
            print("Root directory DNE, creating new directory")
            root.mkdir()
            download_dir.mkdir()
        self.root_dir = root
        self.low_storage_mode = low_storage_mode

        if not download_dir.exists():
            print("Downloads directory DNE, creating new directory")
            download_dir.mkdir()
        self.download_dir = download_dir 
        #Source URLs is Dict[file_name:str, url:str]
        self.source_urls:dict = urls

    #TODO Raise error if these are not implemented in child function
    def download(self):
        pass
    def extract(self):
        pass
    def catalog(self):
        pass

    def Extract():
        #Download and Stage Dataset on Disk.
        self.download()
        self.extract()
        self.catalog() 

    def Transform():
        pass
    def Load():
        pass

    def clear_downloads_directory(self):
        downloaded_files:list = [path for path in self.download_dir.iterdir()]
        self.delete_files(downloaded_files)

    #Internal Methods

    def _get_source_urls(self, urls_list:list) -> dict:
        #List[url:str] -> Dict[file_name:str, url:str]

        return {Path(url).name : url for url in urls_list}

    def _download_file(self, url:str, file_path:Path, chunk_size:int = 1024*1024):
        #Download from url and save to disk at file_path

        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            with open(file_path, "wb") as f, tqdm(total=total_size, unit="B", unit_scale=True, desc="Downloading") as progress_bar:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    if chunk: 
                        f.write(chunk)
                        progress_bar.update(len(chunk))

    def _download_source_urls(self, download_dir:Path = Path("")):
        if download_dir == Path(""):
            download_dir = self.download_dir
        #Download files from self.source_urls, skip if already_downloaded

        for file_name, url in self.source_urls.items():
            file_path = download_dir / file_name 

            if file_path.exists():
                #TODO: Check for downloaded file size as well
                print("File Already Exists, Skipping")
                continue

            self._download_file(url, file_path)

    def _extract_zip(self, zip_file_path:Path, target_dir:Path, dirs_to_be_extracted = list()):
        #Extract specified dirs from zip archive, extract all dirs if not specified

        with zipfile.ZipFile(zip_file_path, 'r') as zip:
            #If dirs_to_be_extracted is an empty list, extract entire archive and exit
            if not extract_dirs:
                zip.extractall(target_dir); return
            #Otherwise, extract all files under specified dirs
            #For each file in archive, extract if it's under any specified dir
            for member in zip.infolist():
                for foldername in dirs_to_be_extracted:
                    if foldername in member.filename:
                        #TODO: Add tqdm progress bar for extraction
                        zip.extract(member, target_dir)

    def _extract_7zip(self, zip_file_path:Path, target_dir:Path, dirs_to_be_extracted = list()):
        #Extract specified dirs from 7zip archive, extract all dirs if not specified

        with py7zr.SevenZipFile(zip_file_path, 'r') as zip:

        #If dirs_to_be_extracted is an empty list, extract entire archive
            if not dirs_to_be_extracted: 
                zip.extractall(target_dir); return

        #Otherwise, extract all files under specified dirs
        #For each file in archive, extract if it's under any specified dir
            for member in zip.getnames():
                for foldername in dirs_to_be_extracted:
                    if foldername in member:
                        zip.extract(target_dir, member)
                        zip.reset()

    def _merge_multivolume_archive(self, multivolume_paths:list, target_zip_path:Path):
        #Combine multivolume archive files into a single archive

        with open(target_zip_path, 'ab') as outfile:
            for volume_path in multivolume_paths:
                with open(volume_path, 'rb') as infile:
                    outfile.write(infile.read())


    def _validate_files(self, file_paths:list, validation_file_names:list, dir:Path):
        source_file_names = set(validation_file_names)
        file_names = set([path.name for path in file_paths]) 
        return source_file_names == file_names

    def _delete_files(self, file_paths:list):
        #Delete list of files if they exist, print warnings if they dont. 

        for file_path in file_paths:
            if file_path.exists():
                file_path.unlink()
            else:
                print(f"Error Deleting {file_path.name}")

    def _get_raster_metadata(self, raster_path:Path):
        #Return Shape, Reference Frame and Transformation Matrix of a Raster File

        with rio.open(raster_path) as raster:
            return raster.shape, str(raster.crs), tuple(raster.transform)

### Inria

In [15]:
class InriaETL(DatasetETL):
    def __init__(self, root:Path, low_storage_mode:bool = True):
        super().__init__(
            root = root, 
            urls = self._get_source_urls([
                    "https://files.inria.fr/aerialimagelabeling/aerialimagelabeling.7z.001",
                    "https://files.inria.fr/aerialimagelabeling/aerialimagelabeling.7z.002",
                    "https://files.inria.fr/aerialimagelabeling/aerialimagelabeling.7z.003",
                    "https://files.inria.fr/aerialimagelabeling/aerialimagelabeling.7z.004",
                    "https://files.inria.fr/aerialimagelabeling/aerialimagelabeling.7z.005"
                                        ]),
            low_storage_mode = low_storage_mode
        )

        self.locations = ["austin", "chicago", "kitsap", "tyrol-w", "vienna"]
        self.files_list = [f"{location}{num}.tif" for location in self.locations for num in range(1, 37)]

        dataset_dir = self.root_dir / "AerialImageDataset" / "train"
        self.image_dir = dataset_dir / "images"
        self.mask_dir = dataset_dir / "gt"

    def download(self):
        self._download_source_urls(self.download_dir)
    
    def extract(self):
        #Verify Download
        downloaded_7zip_files_list:list = [path for path in self.download_dir.glob("*.7z.*")]

        #TODO: Implement _validate_files(files_paths_list:list)

        if not self._validate_files(downloaded_7zip_files_list):
            print("Missing volumes")
            return
        else:
            print("Found all volumes")

        #Merge Volumes To One Archive
        merged_7zip_path = self.download_dir / "aerialimagelabeling-merged.7z" 
        if not merged_7zip_path.exists():
            print("Merged archive not found")
            print(f"Merging volumes to {merged_7zip_path.name}")
            self._merge_multivolume_archive(downloaded_7zip_files_list, merged_7zip_path)
        else:
            print("Merged archive found")

        #Delete downloaded volumes        
        if self.low_storage_mode:
            print("Deleting downloaded volumes")
            self._delete_files(downloaded_7zip_files_list)
        
        print("Decompressing merged archive")
        self._extract_7zip(merged_7zip_path, self.download_dir)
        print("Decompression complete")

        if self.low_storage_mode:
            print("Deleting merged archive")
            self._delete_files([merged_7zip_path])

        #Extract Train Folder 
        dataset_zipfile_name = "NEW2-AerialImageDataset.zip"
        dataset_zipfile_path = self.download_dir / dataset_zipfile_name 
        if not dataset_zipfile_path.exists():
            print(f"{dataset_zipfile_name} Not Found")
            return
        print("Extracting Dataset Folder")
        self._extract_zip(dataset_zipfile_path, self.root_dir, ["train"])
        print("Extraction Complete")

        if self.low_storage_mode: 
            print("Deleting dataset archive")
            self._delete_files([dataset_zipfile_path])

    #TODO: Implement Verify Extraction
    #TODO: Move Catalog To DatasetETL Class ?
        
    def catalog(self):
        self.dataset = pd.DataFrame({"name": list(map(lambda x: x.name, self.mask_dir.iterdir()))})

        #TODO: Convert These to Dataframe ApplyMaps
        metadata = list(map(self._get_raster_metadata, self.mask_dir.iterdir()))
        shapes, crses, transforms = zip(*metadata)

        data = {
            "name": extracted_file_names,
            "shape": shapes,
            "crs": crses,
            "transform": transforms,
            "split": [self._get_split(x) for x in extracted_file_names],
            "mask_path": [self.mask_dir / x for x in extracted_file_names],
            "image_path": [self.image_dir / x for x in extracted_file_names],
        }
        self.dataset = pd.DataFrame(data)
    
    def _get_split(self, file_name:str):
        """First 6 (16.67%) in every region for testing """
        numbers = [char for char in file_name if char.isdigit()]
        if int(''.join(numbers)) <= 6:
            return "test"
        return "train"

In [16]:
inria = InriaETL(ROOT / "inria")

In [19]:
inria.catalog()
inria.dataset

RasterioIOError: /home/sambhav/dev/urban-feature-extraction/data/inria/AerialImageDataset/train/gt/austin1.tif: No such file or directory

### Massachussets

In [8]:
class MassachussetsETL(DatasetETL):
    mass_urls = {
        "train": ("https://www.cs.toronto.edu/~vmnih/data/mass_buildings/train/sat/index.html", "https://www.cs.toronto.edu/~vmnih/data/mass_buildings/train/map/index.html"),
        "test" : ("https://www.cs.toronto.edu/~vmnih/data/mass_buildings/test/sat/index.html", "https://www.cs.toronto.edu/~vmnih/data/mass_buildings/test/map/index.html"),
        "val": ("https://www.cs.toronto.edu/~vmnih/data/mass_buildings/valid/sat/index.html", "https://www.cs.toronto.edu/~vmnih/data/mass_buildings/valid/map/index.html"),
    }
    def __init__(self, root:Path):
        super().__init__(root, self._get_source_urls())
        self.image_dir = self.root_dir / "images"
        self.image_dir.mkdir(exist_ok=True)
        self.mask_dir = self.root_dir / "masks"
        self.mask_dir.mkdir(exist_ok=True)
        self.files_list = [url.split("/")[-1] for url in self.source_urls[0]]

    def _get_file_urls(self, url) -> list:
        response = requests.get(url)
        if response.status_code == 200:
            pattern = r'<a\s+(?:[^>]*?\s+)?href="([^"]*)"'
            matches = re.findall(pattern, response.text)
            return matches
        else:
            print("Error:", response.status_code)
            return []

    def _get_source_urls(self) -> tuple:
        images = sum([self._get_file_urls(self.mass_urls[key][0]) for key in self.mass_urls.keys()], [])
        masks = sum([self._get_file_urls(self.mass_urls[key][1]) for key in self.mass_urls.keys()], [])
        return images, masks

    def download(self, urls:list, target_dir_path:Path):
        for url in urls:
            downloaded_file_path = target_dir_path / url.split("/")[-1]
            if downloaded_file_path.exists():
                print(f"{url.split('/')[-1]} Exists, Skipping")
                continue
            self.download_file(url, downloaded_file_path)
    
    def download_images(self) -> None:
        self.download(self.source_urls[0], self.image_dir)
        
    def download_masks(self) -> None:
        self.download(self.source_urls[1], self.image_dir)
        
    def _validate_download_dir(self, target_dir_path:Path) -> list:
        files_not_downloaded = list()
        for file_name in self.files_list:
            downloaded_file_path = target_dir_path / file_name
            if not downloaded_file_path.exists():
                files_not_downloaded.append(downloaded_file_path)
        return files_not_downloaded
    
    def validate_images(self) -> None:
        return self._validate_download_dir(self.image_dir)

    def validate_masks(self) -> None:
        return self._validate_download_dir(self.mask_dir)

    def create_dataframe(self):
        metadata = [self.get_raster_metadata((self.mask_dir / x)) for x in self.files_list] 
        shapes, crses, transforms = zip(*metadata)
        data = {
            "name": self.files_list,
            "shape": shapes, 
            "crs": crses,
            "transform": transforms,
            "image_path": [(self.image_dir / x) for x in self.files_list],
            "mask_path": [(self.mask_dir / x) for x in self.files_list]
        }
        return pd.DataFrame(data)

In [9]:
massachussets = MassachussetsETL(ROOT / "massachussets")

In [10]:
massachussets.download_images()
massachussets.download_masks()

22678915_15.tiff Exists, Skipping
22678930_15.tiff Exists, Skipping
22678945_15.tiff Exists, Skipping
22678960_15.tiff Exists, Skipping
22678975_15.tiff Exists, Skipping
22678990_15.tiff Exists, Skipping
22679005_15.tiff Exists, Skipping
22679020_15.tiff Exists, Skipping
22679035_15.tiff Exists, Skipping
22679050_15.tiff Exists, Skipping
22828915_15.tiff Exists, Skipping
22828945_15.tiff Exists, Skipping
22828960_15.tiff Exists, Skipping
22828975_15.tiff Exists, Skipping
22829005_15.tiff Exists, Skipping
22829020_15.tiff Exists, Skipping
22829035_15.tiff Exists, Skipping
22978870_15.tiff Exists, Skipping
22978885_15.tiff Exists, Skipping
22978900_15.tiff Exists, Skipping
22978915_15.tiff Exists, Skipping
22978930_15.tiff Exists, Skipping
22978960_15.tiff Exists, Skipping
22978975_15.tiff Exists, Skipping
22978990_15.tiff Exists, Skipping
22979005_15.tiff Exists, Skipping
22979020_15.tiff Exists, Skipping
22979035_15.tiff Exists, Skipping
22979050_15.tiff Exists, Skipping
22979065_15.ti

### ISPRS

In [11]:
class ISPRSSemanticLabelingETL(DatasetETL):
    isprs_urls = {
        "potsdam.zip": "https://seafile.projekt.uni-hannover.de/f/429be50cc79d423ab6c4/",
        "toronto.zip": "https://seafile.projekt.uni-hannover.de/f/fc62f9c20a8c4a34aea1/",
        "vaihingen.zip": "https://seafile.projekt.uni-hannover.de/f/6a06a837b1f349cfa749/",
    }
    password = "CjwcipT4-P8g"
    cookie_name = "sfcsrftoken"

    def __init__(self, root:Path, low_storage_mode:bool = True):
        super().__init__(root = root, 
                         source_urls = self.isprs_urls, 
                         low_storage_mode = low_storage_mode
        )

    def _download_file(self, file_path:str, url:str, chunk_size = 1024*1024) -> None:
        session = requests.Session()
        cookies = {self.cookie_name: session.get(url).cookies.get(self.cookie_name)}
        payload:dict = {'csrfmiddlewaretoken': cookies[self.cookie_name], 
                        'password': self.password}
        with requests.post(url+"?dl=1", data = payload, cookies = cookies, stream = True) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            with open(file_path, "wb") as f, tqdm(total=total_size, unit="B", unit_scale=True, desc="Downloading") as progress_bar:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    if chunk: 
                        f.write(chunk)
                        progress_bar.update(len(chunk))
    
    def download(self):
        self.download_source_urls()


In [12]:
isprs = ISPRSSemanticLabelingETL(ROOT / "isprs")

In [13]:
isprs.download()

Downloading:   9%|▊         | 1.16G/13.3G [16:28<2:52:33, 1.17MB/s]


KeyboardInterrupt: 

### City OSM

In [20]:
class CityOSMETL(DatasetETL):
    city_osm_urls = {
        "Berlin.zip": "https://zenodo.org/record/1154821/files/berlin.zip?download=1",
        "Chicago.zip": "https://zenodo.org/record/1154821/files/chicago.zip?download=1",
        "Paris.zip": "https://zenodo.org/record/1154821/files/paris.zip?download=1",
        "Potsdam.zip": "https://zenodo.org/record/1154821/files/potsdam.zip?download=1",
        "Tokyo.zip": "https://zenodo.org/record/1154821/files/tokyo.zip?download=1",
        "Zurich.zip": "https://zenodo.org/record/1154821/files/zurich.zip?download=1"
    }
    def __init__(self, root:Path, low_storage_mode:bool = True):
        super().__init__(root, 
                         self.city_osm_urls, 
                         low_storage_mode=low_storage_mode)
    
    def download(self):
        self.download_source_urls()
    

In [21]:
city_osm = CityOSMETL(ROOT / "city-osm")

In [22]:
city_osm.download()

Downloading:  13%|█▎        | 261M/2.00G [25:31<2:50:26, 170kB/s] 


KeyboardInterrupt: 

### SpaceNet

In [None]:
class SpaceNetETL(DatasetETL):
    
    def __init__(self, root:Path):
        pass