In [11]:
import copernicusmarine
from datetime import datetime, timedelta
import xarray as xr
from dotenv import load_dotenv
from kafka import KafkaProducer
import os
import json

In [10]:
!pip install simplejson

Collecting simplejson
  Downloading simplejson-3.20.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.2/138.2 KB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: simplejson
Successfully installed simplejson-3.20.2


# Importing credentials

In [2]:
load_dotenv()  # reads from .env automatically
username = os.getenv("COPERNICUS_USERNAME")
password = os.getenv("COPERNICUS_PASSWORD")

In [3]:
KAFKA_BROKER = 'kafka:9092'
KAFKA_TOPIC = 'data_updates'

In [7]:
# --- 1. Setup Kafka Producer ---
print(f"Connecting to Kafka broker at {KAFKA_BROKER}...")
try:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        # Serialize 'value' as JSON (and handle non-serializable types like timestamps)
        value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8')
    )
    print("Kafka Producer connected.")
except Exception as e:
    print(f"Error connecting to Kafka: {e}")
    print("Please ensure Kafka is running.")
    exit(1)

Connecting to Kafka broker at kafka:9092...
Kafka Producer connected.


# Retrieving relevant subset

In [5]:
min_long = 62.76243948249333,
max_long = 95.10309461377467,
min_lat = 3.1228444517411815,
max_lat = 25.964845628380452,
start=datetime.now() - timedelta(days = 2),
end=datetime.now() - timedelta(days = 1),
min_depth=0.49402499198913574,
max_depth=0.49402499198913574

In [6]:
response = copernicusmarine.subset(
    username = username,
    password = password,
    dataset_id="cmems_mod_glo_phy_anfc_0.083deg_PT1H-m",
    variables=["so", "thetao", "uo", "vo", "zos"],
    minimum_longitude=62.7624394824933,
    maximum_longitude=95.10309461377467,
    minimum_latitude=3.1228444517411815,
    maximum_latitude=25.964845628380452,
    start_datetime=datetime.now() - timedelta(days = 2),
    end_datetime=datetime.now() - timedelta(days = 1),
    minimum_depth=0.49402499198913574,
    maximum_depth=0.49402499198913574,
)


INFO - 2025-11-09T19:50:08Z - Selected dataset version: "202406"
INFO - 2025-11-09T19:50:08Z - Selected dataset part: "default"
INFO - 2025-11-09T19:50:12Z - Starting download. Please wait...
100%|██████████| 50/50 [16:06<00:00, 19.33s/it]  
INFO - 2025-11-09T20:06:19Z - Successfully downloaded to cmems_mod_glo_phy_anfc_0.083deg_PT1H-m_multi-vars_62.83E-95.08E_3.17N-25.92N_0.49m_2025-11-07-2025-11-08_(1).nc


In [7]:
print(f"Data downloaded: {response.file_path} (Size: {response.file_size} MB)")

Data downloaded: cmems_mod_glo_phy_anfc_0.083deg_PT1H-m_multi-vars_62.83E-95.08E_3.17N-25.92N_0.49m_2025-11-07-2025-11-08.nc (Size: 48.70551908396946 MB)


In [12]:
# --- 4. Open Dataset with xarray ---
print("Opening .nc file with xarray...")
with xr.open_dataset(os.fspath(response.file_path), engine="h5netcdf") as data:
    
    # --- 5. Convert to DataFrame ---
    print("Converting to DataFrame...")
    # This flattens the multi-dimensional data into a 2D table
    df = data.to_dataframe()

    # --- 6. Clean the DataFrame ---
    # Move 'time', 'latitude', 'longitude', 'depth' from index to columns
    df = df.reset_index()
    # Remove any rows with missing data
    df = df.dropna()
    print(f"Converted to DataFrame with {len(df)} valid data points.")

    # --- 7. Iterate and Send to Kafka ---
    if not df.empty:
        print(f"Sending {len(df)} messages to Kafka topic '{KAFKA_TOPIC}'...")
        for _, row in df.iterrows():
            # Convert row to a dictionary
            message = row.to_dict()
            
            # Send the message
            producer.send(KAFKA_TOPIC, value=message)
        
        # Ensure all messages are sent before exiting
        producer.flush()
        print("All messages sent successfully.")
    else:
        print("No data to send.")

Opening .nc file with xarray...
Converting to DataFrame...
Converted to DataFrame with 1748208 valid data points.
Sending 1748208 messages to Kafka topic 'data_updates'...
All messages sent successfully.


In [13]:
# --- 8. Clean up the downloaded file ---
if 'response' in locals() and os.path.exists(response.file_path):
    os.remove(response.file_path)
    print(f"Cleaned up file: {response.file_path}")

Cleaned up file: cmems_mod_glo_phy_anfc_0.083deg_PT1H-m_multi-vars_62.83E-95.08E_3.17N-25.92N_0.49m_2025-11-07-2025-11-08_(1).nc
