# Bronze Notebook
This notebook will contain the data processing steps for creating the bronze level of the data lake. This includes loading in the raw data from different data sources and aggregating the results. We will begin by importing all of the necessary packages and creating variables that will be consistent throughout the entire notebook.

## Imports
We first need to import all of the necessary modules.

In [0]:
# Imports
import io
import os
import re
from datetime import date, timedelta, datetime
from pathlib import Path
from typing import NamedTuple
import numpy as np
import pandas as pd
import requests
import xarray as xr
from netCDF4 import Dataset
from concurrent.futures import ThreadPoolExecutor, as_completed

## Setup
Now we need to setup the necessary classes that will be used to extract the raw data into the bronze layer of the datalake.

In [0]:
# Immutable datatypes that will be used throughout the project
class AzureDataLake(NamedTuple):
    """
    Immutable object that contains all of the necessary data needed to interact with Azure
    """
    BRONZE_ADLS_PATH: str = "abfs://bronze@[ACCOUNT_NAME].dfs.core.windows.net/"
    SILVER_ADLS_PATH: str = "abfss://silver@[ACCOUNT_NAME].dfs.core.windows.net"
    GOLD_ADLS_PATH: str = "abfss://gold@[ACCOUNT_NAME].dfs.core.windows.net"

class Location(NamedTuple):
    """
    Immutable object that contains all of the necessary geographic data
    """
    lat_N: float = 51.3769
    lat_S: float = 50.7726
    lon_E: float = -113.7319
    lon_W: float = -114.3362

class LocationHelper(Location):
    """
    Provides additional methods to assist with location specific operations
    """
    @property
    def get_region(self) -> str:
        """
        Templates the latitudes and longitudes in a string that is expected by EARTHDATA APIs

        :return: region
        """
        return f"[BBOX]N{self.lat_N} S{self.lat_S} W{self.lon_W} E{self.lon_E}"
    
    @property
    def get_lat_center(self) -> float:
        return (0.5 * (self.lat_S + self.lat_N))
    
    @property
    def get_lon_center(self) -> float:
        return (0.5 * (self.lon_W + self.lon_E))

In [0]:
class SingletonBronzeETL:
    """
    Handles the extraction of data from NASA's EARTHDATA website and does minimal transformations necessary to upload the raw data
    to the Bronze layer of the lake.
    """
    def __new__(cls, location_helper, azure_data_lake, local: bool=False):
        # Singleton Pattern
        if not hasattr(cls, 'instance'):
            cls.instance = super(SingletonBronzeETL, cls).__new__(cls)
        return cls.instance

    def __init__(self, location_helper: LocationHelper, azure_data_lake: AzureDataLake, local):
        """
        :param location_helper: Coordinates of the city being analyzed.
        :param azure_data_lake: Contains Paths to the Bronze, Silver, and Gold layers.
        :param local: Flag to determine if the notebook is running in azure cloud or locally. Credential management will change depending on the environment.
        """
        self._LAADS_TOKEN = None
        self._ACCOUNT_NAME = None
        self._ACCOUNT_KEY = None
        self._local = local

        # Get the keys and tokens from a .env file
        if self._local:
            from dotenv import load_dotenv
            load_dotenv()
            self._LAADS_TOKEN = os.environ.get("LAADS_TOKEN")
            self._ACCOUNT_NAME = os.environ.get("ACCOUNT_NAME")
            self._ACCOUNT_KEY = os.environ.get("ACCOUNT_KEY")

            # Ensure credentials are not missing
            missing_credentials = [cred for cred, val in [
                ("LAADS_TOKEN", self._LAADS_TOKEN),
                ("ACCOUNT_NAME", self._ACCOUNT_NAME),
                ("ACCOUNT_KEY", self._ACCOUNT_KEY),
            ] if not val]
            
            if missing_credentials:
                raise ValueError(f"Missing required environment variables: {missing_credentials}")
        
        else:
            # NOTE: LAADS_TOKEN is being taken from a AKV (Azure Back Key Vault) secret
            #       For more information, please read https://learn.microsoft.com/en-us/azure/databricks/security/secrets/
            self._LAADS_TOKEN = dbutils.secrets.get(scope="ensf612_project", key="LAADS-TOKEN")
            self._ACCOUNT_NAME = dbutils.secrets.get(scope="ensf612_project", key="ACCOUNT-NAME")


        self._NASA_REQUEST_HEADERS = {
            "X-Requested-With": "XMLHttpRequest",
            "Authorization": f"Bearer {self._LAADS_TOKEN}"
        }
        self._location_helper = location_helper
        self._azure_data_lake = azure_data_lake
    
    # Helper Methods
    def _upload_parquet(self, df: pd.DataFrame, datalake_path: str):
        datalake_path = datalake_path.replace("[ACCOUNT_NAME]", self._ACCOUNT_NAME)
        if self._local:
            df.to_parquet(datalake_path, 
                        index=False, 
                        storage_options={
                                "account_name": self._ACCOUNT_NAME,
                                "account_key": self._ACCOUNT_KEY
                            }
                        )
        else:
            from pyspark.sql import SparkSession, DataFrame
            spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()
            df = spark.createDataFrame(df)
            df.write.mode("overwrite").format("parquet").save(datalake_path)

In [0]:
class MCD06COSP(SingletonBronzeETL):
    def __init__(self, location_helper: LocationHelper, azure_data_lake: AzureDataLake, local: bool=False):
        super(MCD06COSP, self).__init__(location_helper, azure_data_lake, local)
        self._dataset_info = {
            "base_details_url": "https://ladsweb.modaps.eosdis.nasa.gov/api/v2/content/details",
            "base_archive_url": "https://ladsweb.modaps.eosdis.nasa.gov/api/v2/content/archives",
            "product": "MCD06COSP_D3_MODIS"
        }

    # Helper Methods
    def _compute_cloud_fraction_from_bytes(self, file_bytes: bytes, group_name: str='Cloud_Mask_Fraction') -> float:
        with Dataset('inmem', mode='r', memory=file_bytes) as nc:
            lats = nc.variables['latitude'][:].astype(float)
            lons = nc.variables['longitude'][:].astype(float)
            nlat = lats.size
            nlon = lons.size
            
            if group_name not in nc.groups:
                raise KeyError(f"Group '{group_name}' not in {list(nc.groups.keys())}")

            grp = nc.groups[group_name]
            if 'Mean' not in grp.variables:
                raise KeyError(f"'Mean' not found in group '{group_name}': {list(grp.variables.keys())}")
            
            cloud = grp.variables['Mean'][:].astype(float)
            shape = cloud.shape
            if shape == (nlat, nlon):
                lat_first = True
            elif shape == (nlon, nlat):
                lat_first = False
            else:
                raise ValueError(f'Unexpected cloud array shape {shape} with nlat={nlat}, nlon={nlon}')
            
            lat_idx0 = int(np.argmin(np.abs(lats - self._location_helper.get_lat_center)))
            lon_idx0 = int(np.argmin(np.abs(lons - self._location_helper.get_lon_center)))
            lat_idx = np.arange(max(0, lat_idx0 - 1), min(nlat, lat_idx0 + 2))
            lon_idx = np.arange(max(0, lon_idx0 - 1), min(nlon, lon_idx0 + 2))

            if lat_idx.size == 0 or lon_idx.size == 0:
                return float('nan')
            if lat_first:
                subset = cloud[np.ix_(lat_idx, lon_idx)]
            else:
                subset = cloud[np.ix_(lon_idx, lat_idx)]
            subset = np.where(subset < -1e5, np.nan, subset)
            if np.all(np.isnan(subset)):
                return float('nan')
            return float(np.nanmean(subset))
    
    def _extract_path(self, item: dict) -> str:
        if "downloadsLink" in item:
            url = item["downloadsLink"]
            return url.split("/archives/", 1)[1]
        if "self" in item:
            url = item["self"]
            return url.split("/details/", 1)[1]
        if "name" in item:
            return item["name"]
        raise KeyError(f"No archive/path field in {list(item.keys())}")
    
    def _fetch_summer_details_for_year(self, year: int) -> pd.DataFrame:
        rows = []
        d = date(year, 6, 1)
        end = date(year, 8, 31)

        while d <= end:
            temporal_range = f"{d:%Y-%m-%d}..{d:%Y-%m-%d}"
            params = {
                "products": self._dataset_info["product"],
                "temporalRanges": temporal_range,
                "regions": self._location_helper.get_region,
                "formats": "json",
            }

            try:
                resp = requests.get(
                    self._dataset_info["base_details_url"],
                    params=params,
                    headers=self._NASA_REQUEST_HEADERS,
                    timeout=60,
                )
                
                if resp.status_code >= 500:
                    print(f"Year {year}, date {d}: LAADS {resp.status_code}, skipping")
                    d += timedelta(days=1)
                    continue

                resp.raise_for_status()
                data = resp.json()

            except requests.RequestException as e:
                print(f"Year {year}, date {d}: request failed ({e}), skipping")
                d += timedelta(days=1)
                continue

            items = []
            if isinstance(data, dict) and "content" in data:
                items = data["content"]

            elif isinstance(data, list):
                items = data

            for it in items:
                rel_path = self._extract_path(it)
                fname = it.get("name", rel_path.split("/")[-1])

                if "dataDay" in it:
                    left = it["dataDay"].split("=", 1)[0].strip()
                    year_str, doy_str = left.split("-")
                    dd = datetime.strptime(year_str + doy_str.zfill(3), "%Y%j").date()
                else:
                    dd = self.date_from_mcd06cosp_filename(fname).date()

                rows.append(
                    {
                        "year": dd.year,
                        "date": dd,
                        "rel_path": rel_path,
                        "file_name": fname,
                    }
                )

            d += timedelta(days=1)

        if not rows:
            return pd.DataFrame(columns=["year", "date", "rel_path", "file_name"])

        df = pd.DataFrame(rows)
        df["date"] = pd.to_datetime(df["date"])
        df = df.sort_values("date").reset_index(drop=True)
        return df

    def _upload_year(self, year: int):
        df_raw_year = self._fetch_summer_details_for_year(year)
        if df_raw_year.empty:
            print(f"Year {year}: no data")
            return None
        
        bronze_path_year = str(Path(self._azure_data_lake.BRONZE_ADLS_PATH).joinpath(f"laads_links_summer_{year}.parquet"))
        self._upload_parquet(df_raw_year, bronze_path_year)
        print(f"Wrote RAW {bronze_path_year} with {len(df_raw_year)} rows")
        return bronze_path_year
            
    # Public Methods
    def create_bronze_layer(self, years, max_worker_threads=12):
        paths = []
        # Get all of the RAW data within a given range of years
        with ThreadPoolExecutor(max_workers=max_worker_threads) as ex:
            futures = {ex.submit(self._upload_year, y): y for y in years}
            for fut in as_completed(futures):
                p = fut.result()
                if p is not None:
                    paths.append(p)
        
        # Upload a manifest file of all the years
        paths_df = pd.DataFrame({'raw_path': paths})
        manifest_path = str(Path(self._azure_data_lake.BRONZE_ADLS_PATH).joinpath("laads_links_manifest.parquet"))
        self._upload_parquet(paths_df, manifest_path)
        return manifest_path
        
    def date_from_mcd06cosp_filename(self, fname: str) -> datetime:
        m = re.search(r'\.A(\d{4})(\d{3})', fname)
        if not m:
            raise ValueError(f'Could not parse date from {fname}')
        year = int(m.group(1))
        doy = int(m.group(2))
        return datetime.strptime(f'{year}{doy:03d}', '%Y%j')

## Set the Environment
**NOTE:** Only run the cell pertaining to your environment. The TA's grading this project does not have access to our Azure Cloud, so they must run the notebook locally.

In [0]:
# Azure Cloud Setup
bronze_etl = MCD06COSP(location_helper=LocationHelper(), azure_data_lake=AzureDataLake())

In [0]:
# Local Setup
bronze_etl = MCD06COSP(location_helper=LocationHelper(), azure_data_lake=AzureDataLake(), local=True)

## Create the Bronze Layer

In [0]:
years = list(range(2000, 2026))
manifest_path = bronze_etl.create_bronze_layer(years)


Year 2001: no data
Year 2000: no data


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnknownException[0m                          Traceback (most recent call last)
File [0;32m<command-5542673593405681>, line 2[0m
[1;32m      1[0m years [38;5;241m=[39m [38;5;28mlist[39m([38;5;28mrange[39m([38;5;241m2000[39m, [38;5;241m2026[39m))
[0;32m----> 2[0m manifest_path [38;5;241m=[39m bronze_etl[38;5;241m.[39mcreate_bronze_layer(years)

File [0;32m<command-5542673593405680>, line 151[0m, in [0;36mMCD06COSP.create_bronze_layer[0;34m(self, years, max_worker_threads)[0m
[1;32m    149[0m futures [38;5;241m=[39m {ex[38;5;241m.[39msubmit([38;5;28mself[39m[38;5;241m.[39m_upload_year, y): y [38;5;28;01mfor[39;00m y [38;5;129;01min[39;00m years}
[1;32m    150[0m [38;5;28;01mfor[39;00m fut [38;5;129;01min[39;00m as_completed(futures):
[0;32m--> 151[0m     p [38;5;241m=[39m fut[38;5;241m.[39mresult()
[1;32m    152[0m     [38;5;28;01mif[39;00m 