# Major-TOM Filtering

In [1]:
from pathlib import Path

DATASET_DIR = Path('./data/Major-TOM/')
DATASET_DIR.mkdir(exist_ok=True, parents=True)
ACCESS_URL = 'https://huggingface.co/datasets/Major-TOM/Core-S2L2A/resolve/main/metadata.parquet?download=true'
LOCAL_URL = DATASET_DIR / '{}.parquet'.format(ACCESS_URL.split('.parquet')[0].split('/')[-1])

In [92]:
import urllib.request
LOCAL_URL, RESPONSE = urllib.request.urlretrieve(ACCESS_URL, LOCAL_URL)

In [32]:
import pyarrow.parquet as pq

df = pq.read_table(LOCAL_URL).to_pandas()
df['timestamp'] = pd.to_datetime(df.timestamp)
df.head()

NameError: name 'geopandas' is not defined

In [37]:
import geopandas as gpd

gdf = gpd.GeoDataFrame(
    df, geometry=gpd.points_from_xy(df.centre_lon, df.centre_lat), crs=df.crs.iloc[0]
)

In [57]:
def filter_metadata(df,
                    region=None,
                    daterange=None,
                    cloud_cover=(0,100),
                    nodata=(0, 1.0)
                   ):
    """Filters the Major-TOM dataframe based on several parameters

    Args:
        df (geopandas dataframe): Parent dataframe
        region (shapely geometry object) : Region of interest
        daterange (tuple) : Inclusive range of dates (example format: '2020-01-01')
        cloud_cover (tuple) : Inclusive percentage range (0-100) of cloud cover
        nodata (tuple) : Inclusive fraction (0.0-1.0) of no data allowed in a sample

    Returns:
        df: a filtered dataframe
    """
    # temporal filtering
    if daterange is not None:
        assert (isinstance(daterange, list) or isinstance(daterange, tuple)) and len(daterange)==2
        df = df[df.timestamp >= daterange[0]]
        df = df[df.timestamp <= daterange[1]]
    
    # spatial filtering
    if region is not None:
        idxs = df.sindex.query(region)
        df = df.take(idxs)
    # cloud filtering
    if cloud_cover is not None:
        df = df[df.cloud_cover >= cloud_cover[0]]
        df = df[df.cloud_cover <= cloud_cover[1]]

    # spatial filtering
    if nodata is not None:
        df = df[df.nodata >= nodata[0]]
        df = df[df.nodata <= nodata[1]]

    return df

In [77]:
from shapely.geometry import box

switzerland = box(5.9559111595,45.8179931641,10.4920501709,47.808380127)
gabon = box(8.1283659854,-4.9213919841,15.1618722208,2.7923006325)
napoli = box(14.091710578,40.7915558593,14.3723765416,40.9819258062)

In [78]:
filtered_df = filter_metadata(gdf,
                              cloud_cover = (0,10),
                              region=napoli,
                              daterange=('2020-01-01', '2025-01-01'),
                              nodata=(0.0,0.0)
                              )

In [87]:
from fsspec.parquet import open_parquet_file
import pyarrow.parquet as pq
from io import BytesIO
from PIL import Image
from rasterio.io import MemoryFile

def read_row(row, columns=["thumbnail"]):
    """Reads a row from a Major-TOM dataframe

    Args:
        row (row from geopandas dataframe): The row of metadata
        columns (list): columns to be read from the file

    Returns:
        data (dict): dictionary with returned data from requested columns
    """
    with open_parquet_file(row.parquet_url,columns = columns) as f:
        with pq.ParquetFile(f) as pf:
            row_group = pf.read_row_group(row.parquet_row, columns=columns)

    if columns == ["thumbnail"]:
        stream = BytesIO(row_group['thumbnail'][0].as_py())
        return Image.open(stream)
    else:
        row_output = {}
        for col in columns:
            bytes = row_group[col][0].as_py()

            if col != 'thumbnail':
                with MemoryFile(bytes) as mem_f:
                    with mem_f.open(driver='GTiff') as f:
                        row_output[col] = f.read().squeeze()
            else:
                stream = BytesIO(bytes)
                row_output[col] = Image.open(stream)

        return row_output
        

out = read_row(filtered_df.iloc[0], columns = ['B04', 'thumbnail'])

### Download

In [90]:
filtered_df

Unnamed: 0,grid_cell,grid_row_u,grid_col_r,product_id,timestamp,cloud_cover,nodata,centre_lat,centre_lon,crs,parquet_url,parquet_row,geometry
1593374,454U_120R,454,120,S2B_MSIL2A_20220719T095559_N0400_R122_T33TVF_2...,2022-07-19 09:55:59,0.0,0.0,40.823861,14.292709,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,455,POINT (14.293 40.824)
1595266,455U_120R,455,120,S2B_MSIL2A_20220719T095559_N0400_R122_T33TVF_2...,2022-07-19 09:55:59,0.0,0.0,40.913671,14.311585,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,347,POINT (14.312 40.914)
1595265,455U_119R,455,119,S2A_MSIL2A_20200113T095351_N0500_R079_T33TVF_2...,2020-01-13 09:53:51,0.0,0.0,40.913731,14.19273,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,346,POINT (14.193 40.914)


In [148]:
from tqdm.notebook import tqdm
import os
from fsspec.parquet import open_parquet_file

def filter_download(df, local_dir, source_name, by_row = False, verbose = False):
    """Downloads and unpacks the data of Major-TOM based on a metadata dataframe

    Args:
        df (geopandas dataframe): Metadata dataframe
        local_dir (str or Path) : Path to the where the data is to be stored locally
        source_name (str) : Name alias of the resulting dataset
        by_row (bool): If True, it will access individual rows of parquet via http - otherwise entire parquets are downloaded temporarily
        verbose (bool) : option for potential internal state printing

    Returns:
        None

    """

    if isinstance(local_dir, str):
        local_dir = Path(local_dir)

    temp_file = local_dir / 'temp.parquet'

    # identify all parquets that need to be downloaded (group them)
    urls = df.parquet_url.unique()
    print('Starting download of {} parquet files.'.format(len(urls))) if verbose else None

    for url in tqdm(urls, desc='Downloading and unpacking...'):
        # identify all relevant rows
        rows = df[df.parquet_url == url].parquet_row.unique()
        
        if not by_row: # (downloads entire parquet)        
            # download a temporary file
            temp_path, http_resp = urllib.request.urlretrieve(url, temp_file)            
        else:
            temp_path = open_parquet_file(url)
            
        # populate the bands
        with pq.ParquetFile(temp_path) as pf:
            for row_idx in rows:
                table = pf.read_row_group(row_idx)

                product_id = table['product_id'][0].as_py()
                grid_cell = table['grid_cell'][0].as_py()
                row = grid_cell.split('_')[0]
            
                dest = local_dir / Path("{}/{}/{}/{}".format(source_name, row, grid_cell, product_id))
                dest.mkdir(exist_ok=True, parents=True)

                columns = [col for col in table.column_names if col[0] == 'B'] + ['cloud_mask']
                
                # tifs
                for col in columns:
                    with open(dest / "{}.tif".format(col), "wb") as f:
                        # Write bytes to file
                        f.write(table[col][0].as_py())

                # thumbnail (png)
                col = 'thumbnail'
                with open(dest / "{}.png".format(col), "wb") as f:
                    # Write bytes to file
                    f.write(table[col][0].as_py())
        if not by_row:
            # remove downloaded file
            os.remove(temp_path)
        else:
            temp_path.close()

In [149]:
filter_download(filtered_df, local_dir='./data/mczerkawski', source_name='L2A', by_row=True)

Downloading and unpacking...:   0%|          | 0/2 [00:00<?, ?it/s]

In [150]:
filter_download(filtered_df, local_dir='./data/mczerkawski', source_name='L2A', by_row=False)

Downloading and unpacking...:   0%|          | 0/2 [00:00<?, ?it/s]

In [151]:
filtered_df

Unnamed: 0,grid_cell,grid_row_u,grid_col_r,product_id,timestamp,cloud_cover,nodata,centre_lat,centre_lon,crs,parquet_url,parquet_row,geometry
1593374,454U_120R,454,120,S2B_MSIL2A_20220719T095559_N0400_R122_T33TVF_2...,2022-07-19 09:55:59,0.0,0.0,40.823861,14.292709,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,455,POINT (14.293 40.824)
1595266,455U_120R,455,120,S2B_MSIL2A_20220719T095559_N0400_R122_T33TVF_2...,2022-07-19 09:55:59,0.0,0.0,40.913671,14.311585,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,347,POINT (14.312 40.914)
1595265,455U_119R,455,119,S2A_MSIL2A_20200113T095351_N0500_R079_T33TVF_2...,2020-01-13 09:53:51,0.0,0.0,40.913731,14.19273,EPSG:32633,https://huggingface.co/datasets/Major-TOM/Core...,346,POINT (14.193 40.914)


In [174]:
import os
import pandas as pd
from torch.utils.data import Dataset
from pathlib import Path

class MajorTOM(Dataset):
    def __init__(self,
                 df,
                 local_dir = None,
                 tif_bands=['B04','B03','B02'],
                 png_bands=['thumbnail']
                ):
        super().__init__()
        self.df = df
        self.local_dir = Path(local_dir) if isinstance(local_dir,str) else local_dir
        self.tif_bands = tif_bands
        self.png_bands = png_bands
    
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        meta = self.df.iloc[idx]

        product_id = meta.product_id
        grid_cell = meta.grid_cell
        row = grid_cell.split('_')[0]
    
        path = self.local_dir / Path("{}/{}/{}".format(row, grid_cell, product_id))
        out_dict = {'meta' : meta}
        
        for band in self.tif_bands:
            with rio.open(path / '{}.tif'.format(band)) as f:
                out = f.read()
            out_dict[band] = out

        for band in self.png_bands:
            out_dict[band] = Image.open(path / '{}.png'.format(band))

        return out_dict
        
ds = MajorTOM(filtered_df, './data/mczerkawski/L2A')

ds[0]

{'meta': grid_cell                                              454U_120R
 grid_row_u                                                   454
 grid_col_r                                                   120
 product_id     S2B_MSIL2A_20220719T095559_N0400_R122_T33TVF_2...
 timestamp                                    2022-07-19 09:55:59
 cloud_cover                                                  0.0
 nodata                                                       0.0
 centre_lat                                             40.823861
 centre_lon                                             14.292709
 crs                                                   EPSG:32633
 parquet_url    https://huggingface.co/datasets/Major-TOM/Core...
 parquet_row                                                  455
 geometry             POINT (14.29270867998482 40.82386055223662)
 Name: 1593374, dtype: object,
 'B04': array([[[1563, 1437, 1466, ..., 2152, 2128, 2482],
         [1463, 1468, 1722, ..., 2418, 2334,