In [1]:
from typing import List
import requests
import re
from bs4 import BeautifulSoup

from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential

def get_table_from_link(url: str, class_: str) -> List[str]:
    """
    Extract table data from a web page by scraping elements with a specific CSS class.
    
    Parameters
    ----------
    url : str
        The URL of the web page to scrape.
    class_ : str
        The CSS class name to search for within table cells.
        
    Returns
    -------
    List[str]
        A list of BeautifulSoup Tag objects containing the matched table cells.
        
    Notes
    -----
    This function assumes the target table has an id="list" attribute.
    It searches for <td> elements within that table matching the specified class.
    """
    page = requests.get(url)
    soup = BeautifulSoup(page.content, "html.parser")
    table_ = soup.find(id = "list")
    list_ = table_.find_all("td", class_=class_)
    return list_

def find_data_storage(url: str, pattern: str) -> float:
    """
    Calculate total storage requirements from size data scraped from a web page.
    
    Parameters
    ----------
    url : str
        The URL of the web page containing size information.
    pattern : str
        Regex pattern parameter (currently unused - function uses hardcoded pattern).
        
    Returns
    -------
    float
        Total storage size converted to megabytes (MB).
        
    Notes
    -----
    The function searches for table cells with class="size", extracts numeric values
    from text matching the pattern numbers with decimal points, and
    sums them. The conversion factor 0.001024 is applied, suggesting conversion
    from KiB to MB using binary conversion (1024 bytes per KiB, then /1000).
    """
    storage_list = get_table_from_link(url, class_="size")
    
    total_storage = 0
    for itr in storage_list:
        pattern = re.compile(pattern)
        if pattern.match(itr.text):
            storage_per_file = float(itr.text.split(" ")[0])
            total_storage += storage_per_file

    return total_storage * 0.001024 # converting to MB


def find_tiff_url(url: str, pattern: str) -> List[str]:
    """
    Extract and construct URLs matching a specified pattern from a web page.
    
    Parameters
    ----------
    url : str
        The base URL of the web page to scrape.
    pattern : str
        Regex pattern to match against href attributes in links.
        
    Returns
    -------
    List[str]
        A list of complete URLs constructed by combining the base URL
        with matching href values.
        
    Notes
    -----
    The function searches for table cells with class="link", extracts href
    attributes from anchor tags within those cells, and filters them using
    the provided regex pattern. Complete URLs are formed by concatenating
    the base URL with the matching href values.
    
    Assumes each link cell contains at least one anchor tag with an href attribute.
    """
    links = get_table_from_link(url, class_ = "link")

    all_url = []
    for link in links:
        temp_url = link.find_all(href = True)[0]['href']
        pattern = re.compile(pattern)
        if pattern.match(temp_url):
            all_url.append(url + temp_url)

    return all_url

url = "https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/"
year_urls = find_tiff_url(url, pattern = r"\d{4}\/")

# get links to all TIFF files
# for year in year_urls:
#     data_urls = find_tiff_url(year, pattern = r"chirps-.*")
#     print(data_urls)


# # get storage requirements for all tiff files
# total_storage = 0
# for year in year_urls:
#     # the storage output from this function is already in MB
#     total_storage += find_data_storage(url = year, pattern = r"\d+\..+")

# total_storage = total_storage * 0.001 # converting to GB
# print(f"Total Storage required by the CHIRPS zip files: {total_storage:.2f} GB")

# print("The above storage is taken up by compressed files, for a better estimate, we use the conversion factor of 12.735(obtained from downloading one file)")
# print(f"Total true storage requirement for the CHIRPS dataset: {(total_storage * 12.735):.2f} GB")

In [2]:
# get links to all TIFF files
# data_urls is a list of a list with 45 years worth of data from 1981-2025
# where index 0 has all data for 1981 and index 1 has 1982 ... index 44 has 2025
# converting to a list of dicts to make it easy to work with downstream
data_urls = []
for i, year in enumerate(year_urls):
    # urls per year
    urls = find_tiff_url(year, pattern = r"chirps-.*")
    data_urls.append({"year": str(i + 1981), "urls": urls})
    

In [19]:
# unzip file
import gzip

def unzip_file(url: str) -> bytes:
    """
    Opens an object at a given url, and returns a decompressed byte object

    Parameters
    -----------
    url : str
        The base url to the source file
    
    Returns
    -------
    bytes
        Decompressed byte object
    """
    unzipped_file = requests.get(url) 
    if unzipped_file.status_code == 200:
        if ".gz" in url:
            output_file = gzip.decompress(unzipped_file.content)
        else:
            output_file = unzipped_file.content
    
    return output_file

In [4]:
# clipping tiff to Nigeria specific bounding box
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
from rasterio.crs import CRS
from rasterio.warp import transform_bounds

def clip_to_cog(input_tiff: str, clipped_tiff: str, bbox: list, bbox_crs: str):
    """
    Clips a GeoTIFF to a specified bounding box, handling differing CRS,
    and saves it as a Cloud-Optimized GeoTIFF (COG).

    Args:
        input_tiff: Path to the source GeoTIFF file.
        clipped_tiff: Path for the output clipped COG file.
        bbox: A list representing the bounding box in the format
              [min_x, min_y, max_x, max_y].
        bbox_crs: The Coordinate Reference System of the provided bounding box,
                  defaulting to WGS84 ('EPSG:4326').
    """
    try:
        with rasterio.open(input_tiff) as src:
        
            # Get the CRS of the source raster
            src_crs = src.crs
            
            # Reproject the bounding box if the CRS are different
            if CRS.from_string(bbox_crs) != src_crs:
                left, bottom, right, top = transform_bounds(
                    CRS.from_string(bbox_crs),
                    src_crs,
                    *bbox
                )
                reprojected_bbox = [left, bottom, right, top]
            else:
                reprojected_bbox = bbox
        
        
            window = from_bounds(*reprojected_bbox, src.transform)
            data = src.read(window=window)
            window_transform = src.window_transform(window)

            profile = src.profile.copy()
            profile.update({
                'height': window.height, 
                'width': window.width, 
                'transform': window_transform,
                'tiled': True, 
                'blockxsize': 512, 
                'blockysize': 512,
                'compress': 'deflate'
            })

            # write COG
            with rasterio.open(clipped_tiff, 'w', **profile) as dst:
                dst.write(data)

                factors =  [2, 4, 8, 16]
                dst.build_overviews(factors, Resampling.average)
                dst.update_tags(ns='rio_overview', resampling='average')
    except Exception as e:
        print(f"An error has occurred: {e}")

In [14]:
import os
def decompress_convert_to_cog(work_item: dict, directory: str):
    """
    Download, decompress, and convert a single CHIRPS rainfall data file to Cloud Optimized GeoTIFF (COG) format.
    
    This function processes one rainfall data file by downloading it from a URL, decompressing the .gz file,
    writing it to disk, and then clipping it to Nigeria's bounding box before converting to COG format.
    
    Parameters
    ----------
    work_item : dict
        Dictionary containing file processing information with the following keys:
        - 'url' : str
            Full URL to the .tif.gz file to be downloaded and processed
        - 'year' : str
            Year string (e.g., '1981') used for filename extraction from URL path
    directory : str
        Base directory path where the processed files will be saved. Should end with '/'.
        The function will save the intermediate .tif file in this directory and the final
        COG file in the 'cogs/' subdirectory.
    
    Returns
    -------
    full_path_to_file: str
        Returns the full local path to the file that is processed
    
    Note
    ----
    The Nigeria bounding box coordinates are hardcoded as:
    [2.316388, 3.837669, 15.126447, 14.153350] in EPSG:4326 CRS.
    """
    url = work_item['url']
    year = work_item['year']
    year_dir = str(year) + "/"
    
    # getting file name from url
    raw_file_name = url.split(year_dir)[1].replace(".gz", "")
    decompressed_file = unzip_file(work_item['url'])
    
    # full path of the output tif files
    raw_file_path = os.path.join(directory, "raw-data", raw_file_name)
    
    with open(raw_file_path, "wb") as f:
        f.write(decompressed_file)
    
    cog_file_name = f"nigeria-cog-{raw_file_name}"
    clipped_tiff_path = os.path.join(f"{directory}processed-cogs", cog_file_name)
    bbox_aoi = [2.316388, 3.837669, 15.126447, 14.153350]
    bbox_crs = "EPSG:4326"
    
    clip_to_cog(raw_file_path, clipped_tiff_path, bbox_aoi, bbox_crs)
    
    # return COG file path
    return (clipped_tiff_path, cog_file_name, raw_file_path, raw_file_name)



# iterate through all the years, and convert to COGS
# local path to TIFs
directory = "../data/nigeria_tifs/"

# for parallel workflow, convert to a flat list from the nested data_urls list
work_items = []
for data in data_urls:
    for url in data['urls']:
        work_items.append({"year": data['year'], "url": url})


In [25]:
# processed in the next cell
work_items[0]['year']
items_2021 = []
for i in work_items:
    if i['year'] == '2021':
        items_2021.append(i)

items_2021


[{'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.01.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.02.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.03.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.04.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.05.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.06.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/africa_daily/tifs/p05/2021/chirps-v2.0.2021.01.07.tif.gz'},
 {'year': '2021',
  'url': 'https://data.chc.ucsb.edu/products

In [71]:
from datetime import datetime
import json
import os
from typing import List, Tuple
    
def update_progress_file(task_id, completed, failed_files):  # Write to progress/task_{id}.json  
    """
    Updates a progress file called {task_id}.json and uploads it to logs on Azure Blob Store

    Parameters:
    -----------
    task_id: str
        Batch number 
    completed: str
        The number of files that were processed in this batch
    failed_files: dict
        A dict of failed files that could have failed due to various reasons

    Returns
    -------
    None
    """
    iso_timestamp = datetime.now().isoformat()
    batch_number = task_id
    completed = completed
    
    json_file = {
        "iso_timestamp": iso_timestamp,
        "batch_number": batch_number,
        "completed": completed,
        "failed_files": failed_files
    }
    
    local_path = "../data/nigeria_tifs/batch-logs"
    file_name = f"{task_id}.json"
    upload_file_path = os.path.join(local_path, file_name)
    container_name = "batch-logs"

    with open(upload_file_path, 'w') as f:
        json.dump(json_file, f)

    # upload_blob_to_azure(container_name=container_name, file_path=upload_file_path, file_name=file_name)
    # cleanup_local_files(upload_file_path)

    
    
def upload_blob_to_azure(container_name: str, file_path: str, file_name: str):
    """
    Uploads a local file at <file_path> to a blob names <file_name> within a container <container_name>

    Parameters:
    ----------
    container_name: str
        Name of the container on Azure Blob Storage account
    
    file_path: str
        Path to the local file

    file_name: str
        Name of the uploaded file in the container

    Returns:
    -------
    None
    """
    
    # uses the default credential option on this machine
    credential = DefaultAzureCredential()

    # Create blob service client
    blob_service_client = BlobServiceClient(
        account_url="https://mpcpstorageaccount.blob.core.windows.net",
        credential=credential
    )
    
    blob_client = blob_service_client.get_blob_client(container = container_name, blob=file_name)
    
    print(f"\nUploading to Azure as blob:\n\t" + file_path)
    with open(file = file_path, mode = "rb") as data:
        blob_client.upload_blob(data)    

def cleanup_local_files(file_paths: List[Tuple] | str):  # Delete local files after uploading them to Azure Blob
    try:
        if type(file_paths) == str:
            os.remove(file_paths)
            print(f"Local {file_paths} removed")
        else:
            for (i, j) in file_paths:
                os.remove(i) # processed file
                os.remove(j) # raw file
                print(f"Local raw file removed: {i} and COG file: {j} removed.")
    except FileNotFoundError:
        print(f"File '{i}' not found.")
        print(f"File '{j}' not found.")
    

def process_batch_with_progress(work_items_chunk, task_id):
    processed_count = 0
    for i, item in enumerate(work_items_chunk):
        try: 
            cog_container_name = "processed-cogs"
            raw_container_name = "raw-data"
            cog_file_path, cog_file_name, raw_file_path, raw_file_name = decompress_convert_to_cog(item, directory)
            # upload_blob_to_azure(container_name=cog_container_name, file_path=cog_file_path, file_name=cog_file_name)
            # upload_blob_to_azure(container_name=raw_container_name, file_path=raw_file_path, file_name=raw_file_name)
            completed.append((cog_file_path, raw_file_path))
        except Exception as e:
            failed_files.append({"item": item, "Error": str(e)})
            print(f"Failed: {item} - Error: {str(e)}")

        processed_count += 1
        
        if processed_count % 10 == 0 or i == len(work_items_chunk) - 1:
            print(f"Task ID: {task_id}, Completed: {completed}, Failed Files: {failed_files}")
            update_progress_file(task_id, len(completed), failed_files)
            cleanup_local_files(completed)



# create chunks of data for processing
batch = 10
# work_items_chunks = [work_items[i:i+batch] for i in range(0, len(work_items), batch)]
work_items_chunks = [items_2021[i:i+batch] for i in range(0, len(work_items), batch)]

failed_files = []
completed = []
for task_id, work_items_chunk in enumerate(work_items_chunks[33:34]):
    process_batch_with_progress(work_items_chunk, task_id)
            

Task ID: 0, Completed: [('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.11.27.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.11.27.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.11.28.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.11.28.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.11.29.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.11.29.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.11.30.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.11.30.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.12.01.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.12.01.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.12.02.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.12.02.tif'), ('../data/nigeria_tifs/processed-cogs/nigeria-cog-chirps-v2.0.2021.12.03.tif', '../data/nigeria_tifs/raw-data/chirps-v2.0.2021.12.03.tif

In [8]:
# from concurrent.futures import ThreadPoolExecutor, as_completed
# import tqdm

# failed_files = []

# with ThreadPoolExecutor(max_workers=20) as executor:
#     # Create futures and map them to work items
#     future_to_item = {}
#     for item in work_items:
#         future = executor.submit(decompress_convert_to_cog, item, directory)
#         future_to_item[future] = item
    
#     for future in tqdm.tqdm(as_completed(future_to_item.keys()), total=len(future_to_item),  desc="Processing files"):
#         work_item = future_to_item[future]
#         try:
#             future.result() 
#         except Exception as e:
#             failed_files.append(work_item)
#             print(f"Failed: {work_item['url']} - Error: {str(e)}")

# print(f"\nCompleted! {len(failed_files)} files failed out of {len(work_items)} total")