# Testing the different formats...

In [1]:
using HTTP
using JSON3
using Dates
using NCDatasets
using JupyterFormatter
using DIVAnd
using Parquet2
using Tables
using DataFrames
using CSV
using BenchmarkTools
include("./DIVAndFairEase.jl")
enable_autoformat()

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mPrecompiling Parquet2 [98572fba-bba0-415d-956f-fa77e587d26d] 
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mPrecompiling CSV [336ed68f-0bac-5ca0-87d4-7b16caf5d00b] (cache misses: wrong dep version loaded (6), incompatible header (12))


1-element Vector{Function}:
 format_current_cell (generic function with 1 method)

<div class="alert alert-block alert-info">
Here the API token is stored as an environment variable.<br>
It is also possible to store it in a local file.
</div>

In [2]:
token = ENV["beaconAPItoken"];

## Configuration
Set the region and the variable of interest.   
The variable `datasourcelist` contains the `Beacon` instances that will be used for the data queries.

In [3]:
regionname = "North_Adriatic"
varname = "sea_water_temperature"
varname = "sea_water_salinity"
doplot = true

datasourcelist = [
    "Euro-Argo",
    "CORA Profile",
    "CORA Timeseries",
    "World Ocean Database",
    "SeaDataNet CDI TS",
]

5-element Vector{String}:
 "Euro-Argo"
 "CORA Profile"
 "CORA Timeseries"
 "World Ocean Database"
 "SeaDataNet CDI TS"

In [4]:
datadir = joinpath("../data/", regionname)
outputdir = joinpath("../output/", regionname)
figdir = joinpath("../figures/", regionname)
mkpath.([datadir, outputdir, figdir]);

### Spatial extent
The bounding box is set according to the region name.

In [5]:
domaininfo = Dict(
    "North_Adriatic" => [12.0, 18.0, 43.0, 46.0],
    "Arctic_region" => [-44.25, 70.0, 56.5, 83.0],
    "North_East_Atlantic" => [-42.0, -0.1, 24.9, 48.0],
    "Baltic_Sea" => [9.4, 30.9, 53.0, 65.9],
    "Black_Sea" => [26.5, 41.95, 40.0, 47.95],
    "Mediterranean_Sea" => [-7.0, 36.375, 30.0, 45.875],
    "North_Sea" => [-100.0, 50.0, -80.0, 80.0],
    "Canary_Islands" => [-20.0, -9.0, 25.0, 31.5],
    "World_Ocean" => [-180.0, 180.0, -90.0, 90.0],
);

### Depth and time ranges

In [6]:
mindepth = 0.0 #Minimum water depth
maxdepth = 1000.0 #Maximum water depth
datestart = DateTime(1960, 1, 1)
dateend = DateTime(2024, 12, 31)

2024-12-31T00:00:00

### Variable units

In [7]:
variableunits = Dict(
    "sea_water_temperature" => "degree_Celsius",
    "sea_water_salinity" => "psu",
    "mass_concentration_of_chlorophyll_a_in_sea_water" => "mg/m3",
    "moles_of_nitrate_per_unit_mass_in_sea_water" => "micromole/kg",
);

## Data query
Different data queries will be performed to get the observations in the selected region.
### Health check [optional]
One can check of the `Beacon` instances are up and running.

In [8]:
for datasource in datasourcelist
    r = HTTP.get(
        joinpath(DIVAndFairEase.beacon_services[datasource], "api/health"),
        ["Authorization" => "Bearer $(token)"],
    )
    if r.status == 200
        @info("$(datasource) Beacon instance is working")
    else
        @warn("$(datasource) Beacon is down or not reachable.")
    end
end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mEuro-Argo Beacon instance is working
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mCORA Profile Beacon instance is working
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mCORA Timeseries Beacon instance is working
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mWorld Ocean Database Beacon instance is working
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mSeaDataNet CDI TS Beacon instance is working


### Prepare queries and download data
The observations are saved as netCDF files, so that they can be later re-used.
<div class="alert alert-block alert-info">
It is also possible to get the data in <code>Parquet</code> format.
</div>

In [9]:
minlon = domaininfo[regionname][1]
maxlon = domaininfo[regionname][2]
minlat = domaininfo[regionname][3]
maxlat = domaininfo[regionname][4]

paramname = Dict(
    "World Ocean Database" => "Temperature",
    "EMODnet Chemistry" => "ITS_90_water_temperature",
    "SeaDataNet CDI TS" => "TEMPPR01",
    "Euro-Argo" => "TEMP",
    "CORA Profile" => "TEMP",
    "CORA Timeseries" => "TEMP",
)

vmin = 0.0
vmax = 40.0
datasource = datasourcelist[1]
for dataformat in ["parquet", "netcdf", "csv", "ipc"]

    @info("Working with file format $(dataformat)")

    parameter1 = "TEMP"


    datasource_name = replace(datasource, " " => "-")

    # Build query
    query = DIVAndFairEase.prepare_query(
        datasource,
        "TEMP",
        Dates.Date(datestart),
        Dates.Date(dateend),
        mindepth,
        maxdepth,
        minlon,
        maxlon,
        minlat,
        maxlat,
        vmin = vmin,
        vmax = vmax,
        outputformat = dataformat,
    )

    # Construct file and figure names
    filename = joinpath(
        datadir,
        "$(regionname)_$(datasource_name)_$(varname)_$(Dates.format(datestart, "yyyymmdd"))-$(Dates.format(dateend, "yyyymmdd"))_$(Int(mindepth))-$(Int(maxdepth))m.$(dataformat)",
    )

    # Write the data in netCDF files
    @info("Data will be written in file:\n$(filename)")
    if isfile(filename)
        @info("File already downloaded")
    else
        @time open(filename, "w") do io
            r = HTTP.request(
                "POST",
                joinpath(DIVAndFairEase.beacon_services[datasource], "api/query"),
                [
                    "Content-type" => "application/json",
                    "Authorization" => "Bearer $(token)",
                ],
                query,
                response_stream = io,
            )
            @info(r.status)
        end
    end
    @info("File size: $(round(filesize(filename)/1000^2, digits=1))M")

end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mWorking with file format parquet
[36m[1m┌ [22m[39m[36m[1mInfo: [22m[39mData will be written in file:
[36m[1m└ [22m[39m../data/North_Adriatic/North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.parquet
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFile already downloaded
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFile size: 0.1M
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mWorking with file format netcdf
[36m[1m┌ [22m[39m[36m[1mInfo: [22m[39mData will be written in file:
[36m[1m└ [22m[39m../data/North_Adriatic/North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.netcdf
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFile already downloaded
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFile size: 3.0M
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mWorking with file format csv
[36m[1m┌ [22m[39m[36m[1mInfo: [22m[39mData will be written in file:
[36m[1m└ [22m[39m../data/Nort

In [10]:
function read_nc(datafile::String)
    NCDataset(datafile, "r") do ds
        lon = ds["LONGITUDE"][:]
        lat = ds["LATITUDE"][:]
        depth = ds["DEPTH"][:]
        dates = ds["datetime"][:]
        T = ds["TEMP"][:]
        return lon, lat, depth, dates, T
    end
end;

In [11]:
@benchmark lon, lat, depth, dates, T = read_nc(
    joinpath(
        datadir,
        "North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.netcdf",
    ),
)

BenchmarkTools.Trial: 1122 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m3.674 ms[22m[39m … [35m 10.142 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 44.81%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m4.300 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m4.433 ms[22m[39m ± [32m464.557 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m2.26% ±  5.00%

  [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▁[39m▇[39m▁[39m▅[39m▅[34m█[39m[39m▃[39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▂[39m▁[39m▂[39m▂[3

In [12]:
function read_parquet(datafile::String)
    ds = Parquet2.Dataset(datafile)
    T = Tables.getcolumn(ds, :TEMP)
    dates = Tables.getcolumn(ds, :datetime)
    # Note: dates (in this case) are expressed as unixtimestamp
    lon = Tables.getcolumn(ds, :LONGITUDE)
    lat = Tables.getcolumn(ds, :LATITUDE)
    depth = Tables.getcolumn(ds, :DEPTH)
    close(ds)
    return lon, lat, depth, dates, T
end

read_parquet (generic function with 1 method)

In [13]:
@benchmark lon, lat, depth, dates, T = read_parquet(
    joinpath(
        datadir,
        "North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.parquet",
    ),
)

BenchmarkTools.Trial: 2028 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m1.983 ms[22m[39m … [35m 12.294 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 69.97%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m2.252 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m2.455 ms[22m[39m ± [32m712.596 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m3.62% ±  8.52%

  [39m [39m▁[39m [39m█[39m [39m [39m [34m▅[39m[39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▇[39m█[39m▆[39m█[3

In [14]:
function read_csv(datafile::String)
    df = DataFrame(CSV.File(datafile))
    lon = df.LONGITUDE
    lat = df.LATITUDE
    depth = df.DEPTH
    dates = df.datetime
    T = df.TEMP
    return lon, lat, depth, dates, T
end

read_csv (generic function with 1 method)

In [15]:
@benchmark lon, lat, depth, dates, T = read_csv(
    joinpath(
        datadir,
        "North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.csv",
    ),
)

BenchmarkTools.Trial: 200 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m23.857 ms[22m[39m … [35m30.570 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 4.85%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m24.416 ms              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m25.000 ms[22m[39m ± [32m 1.139 ms[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m1.10% ± 2.53%

  [39m [39m [39m█[39m█[39m▅[39m▂[34m [39m[39m [39m [39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▄[39m▆[39m█[39m█[39m█

In [16]:
using Arrow
arrowfile = joinpath(
    datadir,
    "North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.arrow",
)
isfile(arrowfile)

true

In [17]:
@benchmark table = Arrow.Table(arrowfile)

BenchmarkTools.Trial: 10000 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m15.729 μs[22m[39m … [35m255.988 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m17.813 μs               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m19.707 μs[22m[39m ± [32m 10.990 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m█[39m▄[39m▁[34m [39m[39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▄[39m█[39m█

In [18]:
table1 = Arrow.Table("../data/North_Adriatic/my_data.arrow")

Arrow.Table with 30968 rows, 2 columns, and schema:
 :LONGITUDE  Union{Missing, Float64}
 :LATITUDE   Union{Missing, Float64}

with metadata given by a Base.ImmutableDict{String, String} with 1 entry:
  "pandas" => "{\"index_columns\": [{\"kind\": \"range\", \"name\": null, \"sta…

In [19]:
outputformatODV = Dict(
    "odv" => Dict(
        "longitude_column" => Dict("column_name" => "LONGITUDE"),
        "latitude_column" => Dict("column_name" => "LATITUDE"),
        "time_column" => Dict("column_name" => "JULD"),
        "depth_column" => Dict("column_name" => "PRES"),
        "data_columns" => [Dict("column_name" => "TEMP")],
    ),
)

Dict{String, Dict{String, Any}} with 1 entry:
  "odv" => Dict("time_column"=>Dict("column_name"=>"JULD"), "depth_column"=>Dic…

In [20]:
datasource = datasourcelist[1]
datasource_name = replace(datasource, " " => "-")

query = DIVAndFairEase.prepare_query(
    datasource,
    "TEMP",
    Dates.Date(datestart),
    Dates.Date(dateend),
    mindepth,
    maxdepth,
    minlon,
    maxlon,
    minlat,
    maxlat,
    vmin = vmin,
    vmax = vmax,
    outputformat = JSON3.write(outputformatODV),
)

# Construct file and figure names
filename = joinpath(
    datadir,
    "$(regionname)_$(datasource_name)_$(varname)_$(Dates.format(datestart, "yyyymmdd"))-$(Dates.format(dateend, "yyyymmdd"))_$(Int(mindepth))-$(Int(maxdepth))m.odv",
)

# Write the data in netCDF files
@info("Data will be written in file:\n$(filename)")
if isfile(filename)
    @info("File already downloaded")
    rm(filename)
end

@time open(filename, "w") do io
    r = HTTP.request(
        "POST",
        joinpath(DIVAndFairEase.beacon_services[datasource], "api/query"),
        ["Content-type" => "application/json", "Authorization" => "Bearer $(token)"],
        query,
        response_stream = io,
    )
    @info(r.status)
end
@info("File size: $(round(filesize(filename)/1000^2, digits=1))M")

[36m[1m┌ [22m[39m[36m[1mInfo: [22m[39mData will be written in file:
[36m[1m└ [22m[39m../data/North_Adriatic/North_Adriatic_Euro-Argo_sea_water_salinity_19600101-20241231_0-1000m.odv
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFile already downloaded


LoadError: HTTP.Exceptions.StatusError(422, "POST", "/api/query", HTTP.Messages.Response:
"""
HTTP/1.1 422 Unprocessable Entity
Content-Type: text/plain; charset=utf-8
X-Powered-By: ARR/3.0
Date: Tue, 10 Jun 2025 13:15:55 GMT
Content-Length: 394

[Message Body was streamed]""")

In [21]:
JSON3.write(outputformatODV)

"{\"odv\":{\"time_column\":{\"column_name\":\"JULD\"},\"depth_column\":{\"column_name\":\"PRES\"},\"data_columns\":[{\"column_name\":\"TEMP\"}],\"longitude_column\":{\"column_name\":\"LONGITUDE\"},\"latitude_column\":{\"column_name\":\"LATITUDE\"}}}"

In [22]:
vmin = 0.0
vmax = 40.0

40.0