In [None]:
import os

import awswrangler as wr
import fsspec
import pandas as pd
from aodndata.soop.soop_xbt_nrt import parse_bufr_file
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch_dsl.connections import connections


In [None]:
BUCKET = "imos-data-lab-raw"
PATH = "IMOS/SOOP/SOOP-XBT/REALTIME_BUFR"

BUCKET_OPTIMISED = "imos-data-lab-optimised"
PATH_OPTIMISED = "TEST_DATA/XBT"

In [None]:
# Use FSSpec to get a list of files in the bucket at the path
fs = fsspec.filesystem('s3')

files = ['s3://' + f for f in fs.glob(f"s3://{BUCKET}/{PATH}/*/*.csv")]
files[0]

In [None]:
def process_file(file, test=False):
    print(f"Processing file {file}")
    profiles = parse_bufr_file(file)

    for profile in profiles:
        metadata = {
            "profile_geotime": profile["profile_geotime"],
            "profile_metadata": profile["profile_metadata"],
        }

        out_file = f'{metadata["profile_geotime"]["date_utc"]:%Y%m%dT%H%M}_{metadata["profile_metadata"]["XBT_uniqueid"]}'
        metadata["profile_geotime"][
            "date_utc"
        ] = f'{metadata["profile_geotime"]["date_utc"]:%Y-%m-%dT%H:%M:00}'

        p = profile["profile_data"]
        data_headers = ["depth", "temp"]
        data_headers_array = ["glob_gtspp", "glob_gtspp_depth", "glob_gtspp_temp"]

        to_zip = [list(p[var].values) for var in data_headers] + [
            list(p[var]) for var in data_headers_array
        ]
        data = list(zip(*to_zip))

        point = f'POINT ({metadata["profile_geotime"]["longitude"]} {metadata["profile_geotime"]["latitude"]})'
        coords = [metadata["profile_geotime"]["longitude"], metadata["profile_geotime"]["latitude"]]
        time = metadata["profile_geotime"]["date_utc"]
        uid = metadata["profile_metadata"]["XBT_uniqueid"]

        df = pd.DataFrame(data, columns=data_headers + data_headers_array)
        df["uid"] = uid
        df["geom"] = point
        df["datetime"] = time
        df['datetime'] = pd.to_datetime(df.datetime)
        for var in data_headers_array:
            df[var] = df[var].astype(int)

        for row in df.iterrows():
            data = row[1]
            yield {
                "_index": "soop_xbt",
                "temp": data.temp,
                "depth": data.depth,
                "glob_gtspp": data.glob_gtspp,
                "glob_gtspp_depth": data.glob_gtspp_depth,
                "glob_gtspp_temp": data.glob_gtspp_temp,
                "uid": data.uid,
                "location": coords,
                "datetime": data.datetime
            }


In [None]:
client = Elasticsearch(
    "https://foss4g-workshop.es.europe-west3.gcp.cloud.es.io",
    http_auth=("elastic", os.environ.get("ES_KEY", "")),
)

client.indices.create(
    "soop_xbt",
    ignore=400,
    body={"mappings": {"properties": {"location": {"type": "geo_point"}}}},
)

for file in files:
    print(f"Processing file {file}")
    bulk(client, process_file(file))
    # break

