In [1]:
# TODO: should we save every output as a [geoparquet](https://geoparquet.org/) in the future to improve read performance (reduction 30% read time)?

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from logging import getLogger
import shutil
from pathlib import Path
import geopandas as gpd
import pandas as pd
import requests

scripts_dir = Path("../..").joinpath("src")
import sys
if scripts_dir not in sys.path:
    sys.path.insert(0, scripts_dir.resolve().as_posix())

from helpers.utils import downloadFile, rm_tree, make_archive, writeReadGCP
from helpers.settings import get_settings
from helpers.file_handler import FileConventionHandler
from pipelines.utils import watch
from pipelines.processors import (
    set_wdpa_id,
    protection_level,
    status,
    create_year,
    get_mpas,
    set_location_iso,
    set_fps_classes,
    filter_by_methodology,
    filter_by_terrestrial,
    transform_points,
    clean_geometries,
)

In [3]:
mysettings = get_settings()

### eez_intermediate

In [5]:
# Pipe params
force_clean = True
step = "preprocess"
pipe = "eez"

In [6]:
# Data sources
## EEZ
EEZ_url = "https://www.marineregions.org/download_file.php"
EEZ_file_name = "eez_v11.shp"
EEZ_params = {"name": "World_EEZ_v11_20191118.zip"}
EEZ_headers = {
    "content-type": "application/x-www-form-urlencoded",
    "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off",
    "dnt": "1",
    "origin": "https://www.marineregions.org",
    "sec-fetch-dest": "document",
    "sec-fetch-mode": "navigate",
    "sec-fetch-site": "same-origin",
    "sec-fetch-user": "?1",
    "upgrade-insecure-requests": "1",
}

EEZ_body = {
    "name": "Jason",
    "organisation": "skytruth",
    "email": "hello@skytruth.com",
    "country": "Spain",
    "user_category": "academia",
    "purpose_category": "Conservation",
    "agree": "1",
}

## High seas
hs_url = "https://www.marineregions.org/download_file.php"
hs_file_name = "High_seas_v1.shp"
hs_params = {"name": "World_High_Seas_v1_20200826.zip"}
hs_headers = {
    "content-type": "application/x-www-form-urlencoded",
    "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off",
    "dnt": "1",
    "origin": "https://www.marineregions.org",
    "sec-fetch-dest": "document",
    "sec-fetch-mode": "navigate",
    "sec-fetch-site": "same-origin",
    "sec-fetch-user": "?1",
    "upgrade-insecure-requests": "1",
}
hs_body = {
    "name": "Jason",
    "organisation": "skytruth",
    "email": "hello@skytruth.com",
    "country": "Spain",
    "user_category": "academia",
    "purpose_category": "Conservation",
    "agree": "1",
}

In [7]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

In [8]:
# Extract data
## download files EEZ & High seas
downloadFile(
    EEZ_url,
    input_path,
    EEZ_body,
    EEZ_params,
    EEZ_headers,
    overwrite=force_clean,
)
downloadFile(hs_url, input_path, hs_body, hs_params, hs_headers, overwrite=force_clean)

PosixPath('/home/mambauser/data/eez/raw/World_High_Seas_v1_20200826.zip')

In [9]:
## unzip file if needed & load data
unziped_folders = []
for idx, path in enumerate(input_path.glob("*.zip")):
    unziped_folder = temp_working_path.joinpath(path.stem)
    print(unziped_folder)

    if unziped_folder.exists() and force_clean:
        rm_tree(unziped_folder)

    shutil.unpack_archive(path, unziped_folder.parent if idx == 0 else unziped_folder)

    files = [gpd.read_file(file) for file in unziped_folder.rglob("*.shp") if "boundaries" not in file.stem]
    unziped_folders.append(
        pd.concat(files)
    )

/home/mambauser/data/eez/raw/temp_preprocess/World_EEZ_v11_20191118
/home/mambauser/data/eez/raw/temp_preprocess/World_High_Seas_v1_20200826


In [10]:
# Transform data
## set the same structure for both datasets updating the high seas one
unziped_folders[1] = (
    unziped_folders[1]
    .rename(
        columns={"name": "GEONAME", "area_km2": "AREA_KM2", "mrgid": "MRGID"},
    )
    .assign(
        POL_TYPE="High Seas",
        ISO_SOV1="ABNJ",
    )
)

# merge datasets
df = pd.concat(unziped_folders, ignore_index=True)

df.drop(
    columns=list(
        set(df.columns)
        - set(
            [
                "MRGID",
                "GEONAME",
                "POL_TYPE",
                "ISO_SOV1",
                "ISO_SOV2",
                "ISO_SOV3",
                "AREA_KM2",
                "geometry",
            ]
        )
    ),
    inplace=True,
)

In [11]:
# save data
gpd.GeoDataFrame(
    df,
    crs=unziped_folders[0].crs,
).to_file(filename=output_file.as_posix(), driver="ESRI Shapefile")

# zip data
make_archive(output_path, zipped_output_file)

In [12]:
# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

In [13]:
# LOAD
## load zipped file to GCS
writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=remote_path,
    file=zipped_output_file,
    operation="w",
)

### Mpa Atlas intermediate

In [4]:
force_clean = True
step = "preprocess"
pipe = "mpaatlas"

In [5]:
# Data source
mpaatlas_url = "https://guide.mpatlas.org/api/v1/zone/geojson"
mpaatlas_file_name = "mpatlas_assess_zone.geojson"

In [6]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

In [7]:
# Download data
input_file = downloadFile(
    mpaatlas_url,
    input_path,
    overwrite=force_clean,
    file=mpaatlas_file_name,
)

In [8]:
if not force_clean and zipped_output_file.exists():
    print(f"File {zipped_output_file} already exists")

# Transform data
gdf = gpd.read_file(input_file)

df = (gdf
      .pipe(set_wdpa_id)
      .pipe(protection_level)
      .pipe(status)
      .pipe(create_year))

df.drop(
    columns=list(
        set(df.columns)
        - set(
            [
                "wdpa_id",
                "mpa_zone_id", 
                "name",
                "designation",
                "sovereign",
                "establishment_stage",
                "protection_mpaguide_level",
                "protection_level",
                "year",
                "geometry",
            ]
        )
    ),
    inplace=True,
)
df.rename(columns={"sovereign": "location_id", "wdpa_pid": "wdpa_id"}, inplace=True)

In [9]:
#save data
gpd.GeoDataFrame(
    df,
    crs=gdf.crs,
).to_file(filename=output_file.as_posix(), driver="ESRI Shapefile", encoding="utf-8")

make_archive(output_path, zipped_output_file)

  ).to_file(filename=output_file.as_posix(), driver="ESRI Shapefile", encoding="utf-8")
  ogr_write(
  ogr_write(
  ogr_write(
  ogr_write(
  ogr_write(
  ogr_write(


In [10]:
# LOAD
## load zipped file to GCS
writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=remote_path,
    file=zipped_output_file,
    operation="w",
)

In [13]:
# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

### Protected seas intermediate

In [22]:
# DEPRECATED
force_clean = True
step = "preprocess"
pipe = "protectedseas"

In [23]:
ps_csv_url = "ProtectedSeas/ProtectedSeas_06142023.csv"
ps_csv_output = input_path.joinpath(ps_csv_url.split("/")[-1])

ps_geometries_url = (
    "ProtectedSeas/ProtectedSeas_ProtectedSeas_06142023_shp_ProtectedSeas_06142023_shp.zip"
)
ps_geometries_output = input_path.joinpath(ps_geometries_url.split("/")[-1])

In [24]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

In [25]:
if not force_clean and zipped_output_file.exists():
    print(f"File {zipped_output_file} already exists")

In [26]:
## get the data

writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=ps_csv_url,
    file=ps_csv_output,
    operation="r",
)

writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=ps_geometries_url,
    file=ps_geometries_output,
    operation="r",
)

In [27]:
# unzip shapefile
shutil.unpack_archive(ps_geometries_output, temp_working_path)

In [28]:
# transform data
# TODO: Modify the preprocessing steps so we do not eliminate the geometries that does not intersect with MPAs - do to a change in the processing methodology
data_table = pd.read_csv(ps_csv_output).pipe(get_mpas).pipe(set_location_iso).pipe(set_fps_classes)

data_table.drop(
    columns=data_table.columns.difference(
        [
            "site_id",
            "iso",
            "FPS_cat",
            "site_name",
            "country",
            "wdpa_id",
            "removal_of_marine_life_is_prohibited",
            "total_area",
        ]
    ),
    inplace=True,
)

data_table.rename(columns={"removal_of_marine_life_is_prohibited": "FPS"}, inplace=True)

# load geoemtries & merge

gdf = gpd.read_file(ps_geometries_output)

  return df[mask1][mask2].reset_index()


In [29]:
# save data
gdf.merge(data_table, how="inner", left_on="SITE_ID", right_on="site_id").drop(
    columns=["SITE_ID", "SITE_NAME"]
).to_file(filename=output_file.as_posix(), driver="ESRI Shapefile", encoding="utf-8")

# zip data
make_archive(output_path, zipped_output_file)

In [30]:
# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

In [31]:
# LOAD
## load zipped file to GCS
writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=remote_path,
    file=zipped_output_file,
    operation="w",
)

### Mpas protected planet intermediate

In [11]:
force_clean = True
step = "preprocess"
pipe = "mpa"

In [12]:
mpa_url = "https://www.protectedplanet.net/downloads"
mpa_body = {
    "domain": "general",
    "format": "shp",
    "token": "marine",
    "id": 21961,
}

In [13]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

In [14]:
# download data
r = requests.post(url=mpa_url, data=mpa_body)
r.raise_for_status()

download_url = r.json().get("url")
input_file_name = f'{r.json().get("title")}.zip'
print(r.json())

input_file =  downloadFile(
    url=download_url,
    output_path=input_path,
    overwrite=force_clean,
    file=input_file_name,
)

{'id': 'marine-shp', 'title': 'WDPA_WDOECM_Aug2024_Public_marine_shp', 'url': 'https://d1gam3xoknrgr2.cloudfront.net/current/WDPA_WDOECM_Aug2024_Public_marine_shp.zip', 'hasFailed': False, 'token': 'marine'}


In [15]:
# unzip file twice due how data is provisioned by protected planet
shutil.unpack_archive(
    input_file,
    temp_working_path,
    "zip",
)

for file in temp_working_path.glob("*.zip"):
    shutil.unpack_archive(file, temp_working_path.joinpath(file.stem), "zip")

In [16]:
# load data & Transform it
unziped_folders = []
for file in temp_working_path.glob("*/*.shp"):
    df = (
        gpd.read_file(file)
        .pipe(filter_by_methodology)
        .pipe(transform_points)
        .pipe(clean_geometries)
    )
    unziped_folders.append(df)

# merge datasets
gdf = gpd.GeoDataFrame(
    pd.concat(unziped_folders, ignore_index=True),
    crs=unziped_folders[0].crs,
)

gdf.drop(
    columns=list(
        set(gdf.columns)
        - set(
            [
                "geometry",
                "WDPAID",
                "WDPA_PID",
                "PA_DEF",
                "NAME",
                "PARENT_ISO",
                "DESIG_ENG",
                "IUCN_CAT",
                "STATUS",
                "STATUS_YR",
                "GIS_M_AREA",
                "AREA_KM2",
            ]
        )
    ),
    inplace=True,
)
gdf["WDPAID"] = pd.to_numeric(gdf["WDPAID"], downcast="integer")

In [17]:
# save data & zip it
gdf.to_file(filename=output_file, driver="ESRI Shapefile", encoding="utf-8")

make_archive(output_path, zipped_output_file)

In [18]:
# LOAD
## load zipped file to GCS
writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=remote_path,
    file=zipped_output_file,
    operation="w",
)

In [19]:
# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

### Mpas protected planet intermediate terrestrial

In [4]:
force_clean = True
step = "preprocess"
pipe = "mpa-terrestrial"

In [5]:
mpa_url = "https://www.protectedplanet.net/downloads"
mpa_body = {
    "domain": "general",
    "format": "shp",
    "token": "wdpa",
    "id": 76011,
}

In [7]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "gpkg")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

In [8]:
# download data
r = requests.post(url=mpa_url, data=mpa_body)
r.raise_for_status()

download_url = r.json().get("url")
input_file_name = f'{r.json().get("title")}.zip'
print(r.json())

input_file = downloadFile(
    url=download_url,
    output_path=input_path,
    overwrite=force_clean,
    file=input_file_name,
)

{'id': 'wdpa-shp', 'title': 'WDPA_Sep2024_Public_shp', 'url': 'https://d1gam3xoknrgr2.cloudfront.net/current/WDPA_Sep2024_Public_shp.zip', 'hasFailed': False, 'token': 'wdpa'}


In [8]:
# unzip file twice due how data is provisioned by protected planet
shutil.unpack_archive(
    input_file,
    temp_working_path,
    "zip",
)

for file in temp_working_path.glob("*.zip"):
    shutil.unpack_archive(file, temp_working_path.joinpath(file.stem), "zip")

In [8]:
# load data & Transform it
unziped_folders = []
for file in temp_working_path.glob("*/*.shp"):
    df = (
        gpd.read_file(file)
        .pipe(filter_by_methodology)
        .pipe(filter_by_terrestrial)
        .pipe(transform_points)
        .pipe(clean_geometries)
    )
    unziped_folders.append(df)

# merge datasets
gdf = gpd.GeoDataFrame(
    pd.concat(unziped_folders, ignore_index=True),
    crs=unziped_folders[0].crs,
)

gdf.drop(
    columns=list(
        set(gdf.columns)
        - set(
            [
                "geometry",
                "WDPAID",
                "WDPA_PID",
                "PA_DEF",
                "NAME",
                "PARENT_ISO",
                "DESIG_ENG",
                "IUCN_CAT",
                "STATUS",
                "STATUS_YR",
                "GIS_AREA",
                "MARINE",
            ]
        )
    ),
    inplace=True,
)
gdf["WDPAID"] = pd.to_numeric(gdf["WDPAID"], downcast="integer")

In [12]:
# save data & zip it
gdf.to_file(
    filename=output_file,
    driver="GPKG",
    layer="name",
    encoding="utf-8",
)

In [12]:
# LOAD
## load zipped file to GCS
writeReadGCP(
    credentials=mysettings.GCS_KEYFILE_JSON,
    bucket_name=mysettings.GCS_BUCKET,
    blob_name=remote_path,
    file=output_file,
    operation="w",
)

: 

In [None]:
# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

### Habitats

In [4]:
force_clean = True
step = "preprocess"
pipe = "habitats"

In [5]:
habitats_download_url = "https://habitats.oceanplus.org/downloads/global_statistics.zip"
Mangroves_download_url = "https://mangrove-atlas-api.herokuapp.com/admin/widget_protected_areas.csv"
mangroves_request_headers = {
    "Cookie": "_mangrove_atlas_api_session=fJuobvI2fH42WfGfMtRTp%2BksIDdPEpY6DG8uCuITsENtrRGG4AA3nYEeAI7dytzpK%2F0dGIHq84O54MRr6eiPgiwCYXp2XP4IzXM40dFt%2FI6hoB0WXC%2Fwrd81XreNnMZiSEE6IVT5R0fqMcmsZdPn53u0A1d4CGU3FfliOZuWkckBuA%2F7C4upBGuSS8817LqOh1slG%2BsEOGp3nk7WX4fMoPbsHWtARfFwdfoAHz448LO7uWuZdyiu7YOrS0ZxOZEb9JZ8hcUJph4pBFofZLpOvtQQutgZY21T5bhQ7Kwfl56e6Qr0SZ%2B8sIzMfky3h%2FjOA6DNTLoy%2BZLiZBAgFHlTYm2JwlwqWgAZU8D7cE7Zn%2Fxgf3LFF9pZ9Fe3QG4c8LIwH%2FxqjEd8GsZAhBMgBWbxubigQ9gZssZt6CIO--7qiVsTAT8JAKj1jU--U7TI%2Fz9c151bfD8iZdkBDw%3D%3D"
}
seamounts_download_url = "https://datadownload-production.s3.amazonaws.com/ZSL002_ModelledSeamounts2011_v1.zip"

In [6]:
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)

#### Seamounts

In [8]:
input_seamounts_path = input_path.joinpath("seamounts")
input_seamounts_path.mkdir(parents=True, exist_ok=True)
# download data
input_file_name = "seamounts.zip"
input_file = downloadFile(
    url=seamounts_download_url,
    output_path=input_seamounts_path,
    overwrite=force_clean,
    file=input_file_name,
)

In [9]:
# unzip data
shutil.unpack_archive(
    input_file,
    temp_working_path,
    "zip",
)

In [13]:
temp_working_path

PosixPath('/home/mambauser/data/habitats/raw/temp_preprocess')

In [24]:
first =gpd.read_file(next(temp_working_path.rglob("*SeamountsBaseArea.shp")))

In [25]:
first

Unnamed: 0,PEAKID,DEPTH,HEIGHT,LONG,LAT,AREA2D,FILTER,geometry
0,26000,-2547,1148,2.762500,84.979736,982.028337,0,"POLYGON ((2.91249 84.82976, 2.76249 84.79636, ..."
1,26157,-3084,1296,9.143056,84.935292,348.473055,0,"POLYGON ((9.99309 84.93526, 9.25139 84.82696, ..."
2,26158,-3043,1342,9.183333,84.938070,367.540380,0,"POLYGON ((9.07499 85.04636, 9.18329 85.03806, ..."
3,26228,-3142,1379,8.748611,84.907514,299.443636,0,"POLYGON ((9.79859 84.90756, 8.83199 84.82416, ..."
4,26229,-3146,1383,8.887500,84.913070,309.588492,0,"POLYGON ((8.88749 84.83806, 8.81249 84.83806, ..."
...,...,...,...,...,...,...,...,...
33447,4999430,-298,1376,-142.295833,-74.566097,819.608801,0,"POLYGON ((-142.29582 -74.72444, -142.46251 -74..."
33448,4999462,-295,1274,-142.250000,-74.570264,777.598079,1,"POLYGON ((-142.25001 -74.72864, -142.41671 -74..."
33449,4999913,-348,3288,-164.179167,-74.766097,1000.023088,1,"POLYGON ((-164.01251 -74.93274, -164.17921 -74..."
33450,5000862,-2739,1060,-158.162500,-75.141097,814.426234,0,"POLYGON ((-158.16251 -75.28274, -158.30421 -75..."


In [None]:
if not force_clean and zipped_output_file.exists():
    print(f"File {zipped_output_file} already exists")