In [1]:
import pandas as pd
import numpy as np
import sqlalchemy
import geopandas as gpd
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from geoalchemy2 import Geometry
from dotenv import load_dotenv
from urllib.parse import quote 
import os, glob
from collections import defaultdict
from shapely.geometry import Point, LineString, MultiLineString
from itertools import groupby
from operator import itemgetter


In [2]:
def adjust_longitude(lon):
    return lon - 360 if lon > 180 else lon

# Function to create MultiLineString by splitting lines crossing the 180-degree longitude
def split_at_antimeridian(coords):
    """
    Enhanced version to handle single point and multiple coordinate scenarios
    
    :param coords: List of coordinate tuples
    :return: Geometry (Point or MultiLineString)
    """
    # Handle empty or insufficient coordinates
    if not coords or len(coords) == 0:
        return None
    
    # Single coordinate case
    if len(coords) == 1:
        return Point(coords[0])
    
    # Multiple coordinates case
    if len(coords) == 2:
        # If only two points, create a simple LineString
        try:
            return LineString(coords)
        except Exception as e:
            print(f"Error creating LineString with coords {coords}: {e}")
            return None
    
    # Original multi-point handling with antimeridian splitting
    multilines = []
    line = [coords[0]]
    for i in range(1, len(coords)):
        prev_lon = coords[i-1][0]
        curr_lon = coords[i][0]
        
        # Check for significant longitude change
        if abs(curr_lon - prev_lon) > 180:
            if len(line) > 1:
                multilines.append(LineString(line))
            line = [coords[i]]
        else:
            line.append(coords[i])
    
    # Add final line segment
    if len(line) > 1:
        multilines.append(LineString(line))
    
    # Return appropriate geometry type
    if not multilines:
        return Point(coords[0])
    
    return MultiLineString(multilines) if len(multilines) > 1 else multilines[0]


# Enhanced connection parameters for SQLAlchemy
def create_enhanced_engine():
    load_dotenv()
    DBUSER = os.getenv('DBUSER')
    DBPASS = os.getenv('DBPASS')
    DBHOST = os.getenv('DBHOST')
    DBPORT = os.getenv('DBPORT')
    DBNAME = os.getenv('DBNAME')

    connection_string = 'postgresql://' + DBUSER + ':%s@' + DBHOST + ':' + DBPORT + '/' + DBNAME
    
    # Use explicit connection arguments compatible with psycopg2
    connect_args = {
        'keepalives': 1,
        'keepalives_idle': 30,
        'keepalives_interval': 10,
        'keepalives_count': 5,
        # Potentially add other specific psycopg2 connection parameters here
    }

    return create_engine(
        connection_string % quote(DBPASS), 
        echo=True, 
        poolclass=QueuePool,
        pool_size=5,  # Adjust based on your needs
        max_overflow=10,  # Allows temporary connections beyond pool_size
        pool_timeout=30,  # Wait time for getting a connection from the pool
        pool_recycle=1800,  # Recycle connections after 30 minutes
        connect_args=connect_args
    )

In [3]:
data_dir = '../data_src/'
data_file = 'GLODAPv2.2023_Pacific_Ocean.csv' #Indian /Pacific -> try cruises cross the meridian
df = pd.read_csv(data_dir + data_file)
# Remove G2 prefix
df.columns = [col[2:] if col.startswith('G2') else col for col in df.columns]
# Replace missing values
df.replace(-9999, np.nan, inplace=True)

df['longitude'] = df['longitude'].apply(adjust_longitude)

# Combine date and time columns into UTC datetime
df['datetime'] = pd.to_datetime(df[['year', 'month', 'day', 'hour', 'minute']], errors='coerce', utc=True)

# Drop original datetime columns
df.drop(['day', 'hour', 'minute'], axis=1, inplace=True)

def rename_flags(col):
    if col.endswith('f'):
        return f"flag_{col[:-1]}"
    elif col.endswith('qc'):
        return f"qc_{col[:-2]}"
    elif col.endswith('err'):
        return f"err_{col[:-3]}"
    else:
        return col

df.columns = [rename_flags(col) for col in df.columns]

# Create Point geometries
df['geom'] = df.apply(lambda row: Point(row['longitude'], row['latitude']), axis=1)


  df = pd.read_csv(data_dir + data_file)


In [4]:
print(df.head())

       expocode  cruise  station  region  cast    year  month  latitude  \
0  09AR19930404    66.0      1.0     8.0   1.0  1993.0    4.0   -43.219   
1  09AR19930404    66.0      1.0     8.0   1.0  1993.0    4.0   -43.219   
2  09AR19930404    66.0      1.0     8.0   1.0  1993.0    4.0   -43.219   
3  09AR19930404    66.0      1.0     8.0   1.0  1993.0    4.0   -43.219   
4  09AR19930404    66.0      1.0     8.0   1.0  1993.0    4.0   -43.219   

   longitude  bottomdepth  ...  flag_doc  don  flag_don  tdn  flag_tdn  chla  \
0      148.1        170.0  ...       9.0  NaN       9.0  NaN       9.0   NaN   
1      148.1        170.0  ...       9.0  NaN       9.0  NaN       9.0   NaN   
2      148.1        170.0  ...       9.0  NaN       9.0  NaN       9.0   NaN   
3      148.1        170.0  ...       9.0  NaN       9.0  NaN       9.0   NaN   
4      148.1        170.0  ...       9.0  NaN       9.0  NaN       9.0   NaN   

   flag_chla                                           doi  \
0     

In [11]:
print(df.columns.tolist())

['expocode', 'cruise', 'station', 'region', 'cast', 'year', 'month', 'latitude', 'longitude', 'bottomdepth', 'maxsampdepth', 'bottle', 'pressure', 'depth', 'temperature', 'theta', 'salinity', 'flag_salinity', 'qc_salinity', 'sigma0', 'sigma1', 'sigma2', 'sigma3', 'sigma4', 'gamma', 'oxygen', 'flag_oxygen', 'qc_oxygen', 'aou', 'flag_aou', 'nitrate', 'flag_nitrate', 'qc_nitrate', 'nitrite', 'flag_nitrite', 'silicate', 'flag_silicate', 'qc_silicate', 'phosphate', 'flag_phosphate', 'qc_phosphate', 'tco2', 'flag_tco2', 'qc_tco2', 'talk', 'flag_talk', 'qc_talk', 'fco2', 'flag_fco2', 'fco2temp', 'phts25p0', 'flag_phts25p0', 'phtsinsitutp', 'flag_phtsinsitutp', 'qc_phts', 'cfc11', 'pcfc11', 'flag_cfc11', 'qc_cfc11', 'cfc12', 'pcfc12', 'flag_cfc12', 'qc_cfc12', 'cfc113', 'pcfc113', 'flag_cfc113', 'qc_cfc113', 'ccl4', 'pccl4', 'flag_ccl4', 'qc_ccl4', 'sf6', 'psf6', 'flag_sf6', 'qc_sf6', 'c13', 'flag_c13', 'qc_c13', 'c14', 'flag_c14', 'err_c14', 'h3', 'flag_h3', 'err_h3', 'he3', 'flag_he3', 'err_

In [8]:
print(df[df['expocode'] == '49NZ1998129'])

Empty DataFrame
Columns: [expocode, cruise, station, region, cast, year, month, latitude, longitude, bottomdepth, maxsampdepth, bottle, pressure, depth, temperature, theta, salinity, flag_salinity, qc_salinity, sigma0, sigma1, sigma2, sigma3, sigma4, gamma, oxygen, flag_oxygen, qc_oxygen, aou, flag_aou, nitrate, flag_nitrate, qc_nitrate, nitrite, flag_nitrite, silicate, flag_silicate, qc_silicate, phosphate, flag_phosphate, qc_phosphate, tco2, flag_tco2, qc_tco2, talk, flag_talk, qc_talk, fco2, flag_fco2, fco2temp, phts25p0, flag_phts25p0, phtsinsitutp, flag_phtsinsitutp, qc_phts, cfc11, pcfc11, flag_cfc11, qc_cfc11, cfc12, pcfc12, flag_cfc12, qc_cfc12, cfc113, pcfc113, flag_cfc113, qc_cfc113, ccl4, pccl4, flag_ccl4, qc_ccl4, sf6, psf6, flag_sf6, qc_sf6, c13, flag_c13, qc_c13, c14, flag_c14, err_c14, h3, flag_h3, err_h3, he3, flag_he3, err_he3, he, flag_he, err_he, neon, flag_neon, err_neon, o18, flag_o18, toc, flag_toc, doc, flag_doc, don, ...]
Index: []

[0 rows x 108 columns]


In [5]:
print(df[df['expocode'] == '33KK20020701'].drop_duplicates(['longitude','latitude']))

Empty DataFrame
Columns: [expocode, cruise, station, region, cast, year, month, latitude, longitude, bottomdepth, maxsampdepth, bottle, pressure, depth, temperature, theta, salinity, flag_salinity, qc_salinity, sigma0, sigma1, sigma2, sigma3, sigma4, gamma, oxygen, flag_oxygen, qc_oxygen, aou, flag_aou, nitrate, flag_nitrate, qc_nitrate, nitrite, flag_nitrite, silicate, flag_silicate, qc_silicate, phosphate, flag_phosphate, qc_phosphate, tco2, flag_tco2, qc_tco2, talk, flag_talk, qc_talk, fco2, flag_fco2, fco2temp, phts25p0, flag_phts25p0, phtsinsitutp, flag_phtsinsitutp, qc_phts, cfc11, pcfc11, flag_cfc11, qc_cfc11, cfc12, pcfc12, flag_cfc12, qc_cfc12, cfc113, pcfc113, flag_cfc113, qc_cfc113, ccl4, pccl4, flag_ccl4, qc_ccl4, sf6, psf6, flag_sf6, qc_sf6, c13, flag_c13, qc_c13, c14, flag_c14, err_c14, h3, flag_h3, err_h3, he3, flag_he3, err_he3, he, flag_he, err_he, neon, flag_neon, err_neon, o18, flag_o18, toc, flag_toc, doc, flag_doc, don, ...]
Index: []

[0 rows x 108 columns]


In [13]:
# Convert to GeoDataFrame for easy handling and direct export to PostGIS
gdf = gpd.GeoDataFrame(df, geometry='geom', crs="EPSG:4326")


In [12]:
# Load cruise metadata
cruise_df = pd.read_csv('../data/glodapv2_2023_cruise_metadata.csv')
cruise1 = cruise_df[cruise_df['expocode'].isin(df['expocode'])]
print(cruise1)

          expocode  start_date    end_date  \
478   09AR19930404  1993-04-04  1993-05-09   
479   09FA19930624  1993-06-24  1993-07-17   
480   09FA20010524  2001-05-24  2001-07-07   
481   09SS20090203  2009-02-03  2009-03-24   
482   18DD19851029  1985-10-29  1985-11-12   
...            ...         ...         ...   
1106  49NZ20210713  2021-07-13  2021-08-26   
1107  49UP20210728  2021-07-28  2021-07-28   
1108  49UP20210827  2021-08-27  2021-08-27   
1109  49UP20210920  2021-09-20  2021-09-20   
1113  74EQ20191202  2019-12-02  2020-01-09   

                                                    map                 legs  \
478   https://www.ncei.noaa.gov/access/ocean-carbon-...         09AR19930404   
479   https://www.ncei.noaa.gov/access/ocean-carbon-...         09FA19930624   
480   https://www.ncei.noaa.gov/access/ocean-carbon-...         09FA20010524   
481   https://www.ncei.noaa.gov/access/ocean-carbon-...         09SS20090203   
482   https://www.ncei.noaa.gov/access/ocean-ca

In [14]:
#print(df[df['expocode'] == '318M19730822'].drop_duplicates(['longitude','latitude']))
test1 = df[df['expocode'] == '318M19730822']
print(test1['longitude'].unique()) #cross meridian

[-127.9  -139.57 -142.1  -150.03 -152.95 -153.84 -154.52 -155.03 -155.5
 -156.07 -158.32 -159.84 -168.47 -177.   -177.32 -176.97 -176.84 -176.58
 -177.31  170.45  169.43  160.5   151.84  141.97  161.93  170.64  170.08
  169.35  173.47  177.44 -178.63 -176.22 -169.13 -163.25 -161.33 -163.97
 -165.43 -167.07 -172.01 -177.18  179.    178.93  178.99  178.98  179.03
  179.02 -178.1  -175.05 -173.81 -169.9  -169.01 -169.97 -170.35 -170.38
 -170.03 -169.18 -168.48 -167.06 -166.   -164.99 -168.05 -171.41 -172.81
 -174.52 -175.25 -175.48 -175.72 -175.73 -176.31 -176.91 -177.19 -177.58
 -179.6   170.11  170.05  169.78  170.1   170.    169.97  173.67 -173.5
 -173.98 -174.   -175.58 -178.03 -179.98 -171.49 -166.66 -166.75 -166.93
 -166.77 -166.83 -166.7  -170.07 -168.6  -166.78 -163.63 -161.97 -160.35
 -158.8  -157.16 -156.41 -155.64 -154.43 -153.62 -134.87 -126.6  -127.15
 -127.83 -127.78 -128.4  -129.37 -129.95 -138.13 -146.07 -130.92 -126.26
 -125.91 -125.54 -125.46 -125.29 -125.13 -124.8  -124

In [15]:
cruise_dict = cruise1.set_index('expocode').to_dict(orient='index')

# Dictionary to accumulate coordinates for cruises across files
cruise_coords = defaultdict(list)
print(cruise_dict)

{'09AR19930404': {'start_date': '1993-04-04', 'end_date': '1993-05-09', 'map': 'https://www.ncei.noaa.gov/access/ocean-carbon-acidification-data-system/oceans/PACIFICA/maps/09AR19930311.png', 'legs': '09AR19930404', 'region': 'Pacific', 'alias': '09AR9391_2, WOCE P11A', 'ship': 'Aurora Australis', 'chief_scientist': 'S. Rintoul', 'carbon_PI': 'B. Tilbrook', 'hydrography_PI': 'S. Rintoul', 'oxygen_PI': 'S. Rintoul', 'nutrients_PI': 'S. Rintoul', 'cfc_PI': nan, 'organics_PI': nan, 'isotopes_PI': nan, 'other_pi': nan, 'measurements': 'CTDPRS, TMP, CTDTMP, CTDSAL, SALNTY, CTDOXY, OXYGEN, SILCAT, NO2+NO3, PHSPHT, TCARBN, THETA', 'cruise_references': nan, 'data_files': 'https://www.ncei.noaa.gov/data/oceans/ncei/ocads/data/0115004/', 'metadata_report': 'https://www.ncei.noaa.gov/data/oceans/ncei/ocads/metadata/0115004.html', 'QC_details': 'https://glodapv2.geomar.de/adjustments/show/09AR19930404'}, '09FA19930624': {'start_date': '1993-06-24', 'end_date': '1993-07-17', 'map': 'https://www.nce

In [16]:
# Accumulate points for LineString per cruise
for expocode, group in gdf.groupby('expocode'):
    sorted_group = group.sort_values('datetime')
    cruise_coords[expocode].extend(
        # Old code to use linestring: sorted_group[['longitude', 'latitude', 'expocode', 'station', 'region', 'cast', 'datetime']].values.tolist()
        sorted_group[['longitude', 'latitude', 'datetime']].values.tolist()
    )

In [None]:
# Old code to create cruise linestring
lines = []
for expocode, coords in cruise_coords.items():
    # Convert collected coordinates into a DataFrame to sort and deduplicate
    coords_df = pd.DataFrame(coords, columns=['longitude', 'latitude', 'expocode', 'station', 'region', 'cast', 'datetime'])
    # print(coords_df)
    
    # Sort the DataFrame as required
    coords_df.sort_values(by=['expocode', 'cast', 'datetime'], inplace=True)
    
    # Remove consecutive duplicate coordinates, preserving revisits later
    unique_coords = [
        (float(lon), float(lat)) for lon, lat in [
            next(group) for _, group in groupby(
                coords_df[['longitude', 'latitude']].itertuples(index=False, name=None),
                key=itemgetter(0, 1)
            )
        ]
    ]

    # Create LineString geometry from cleaned coordinates
    if len(unique_coords) >= 2:
        line_geom = LineString(unique_coords)
    else:
        line_geom = Point(unique_coords[0])

    meta = cruise_dict.get(expocode, {})
    lines.append({
        'expocode': expocode,
        'region': meta.get('region'),
        'ship': meta.get('ship'),
        'chief_scientist': meta.get('chief_scientist'),
        'start_date': meta.get('start_date'),
        'end_date': meta.get('end_date'),
        'geom': line_geom
    })

print(lines)

In [59]:
print(lines[1]['geom'])


LINESTRING (103.65 -36.364, 99.413 -39.483, 79.366 -48.28, 78.599 -49.894, 77.729 -50.24, 77.024 -50.542, 76.188 -50.691, 76.014 -50.724, 75.959 -50.736, 75.782 -50.789, 75.617 -50.817, 75.38 -50.899, 74.594 -51.098, 73.81 -51.287, 73.001 -51.506, 71.362 -52.927, 72.659 -53.032, 72.627 -52.998, 72.552 -53.035, 72.593 -53.073, 72.555 -52.984, 73.665 -54.167, 73.99 -53.06, 73.64 -53.213, 73.316 -53.281, 73.239 -52.96, 73.722 -53.005, 73.607 -53.003, 72.66 -53.032, 72.661 -53.033, 72.662 -53.035, 72.661 -53.029, 72.669 -53.034, 72.662 -53.039, 72.653 -53.034, 72.661 -53.034, 72.662 -53.039, 74.021 -52.922, 74.402 -52.81, 74.792 -52.697, 75.262 -52.542, 75.607 -52.41, 76.006 -52.302, 74.323 -52.838, 73.709 -53.007, 73.716 -53.012, 73.724 -53.007, 73.717 -53.003, 73.716 -53.008)


In [18]:
# Create MultiLineStrings for cruises
lines = []
for expocode, coords in cruise_coords.items():
    coords_df = pd.DataFrame(coords, columns=['longitude', 'latitude', 'datetime']).sort_values('datetime')

    unique_coords = [
        (float(lon), float(lat)) for lon, lat in [
            next(group) for _, group in groupby(
                coords_df[['longitude', 'latitude']].itertuples(index=False, name=None),
                key=itemgetter(0, 1)
            )
        ]
    ]

    line_geom = split_at_antimeridian(unique_coords)
    lines.append({'expocode': expocode, 'geom': line_geom})

# Update cruise geometries dynamically
lines_gdf = gpd.GeoDataFrame(lines, geometry='geom', crs="EPSG:4326")

In [19]:
expocode = '318M19730822'
geom = lines_gdf[lines_gdf['expocode'] == expocode].iloc[0].geom

if isinstance(geom, MultiLineString):
    full_coords = [list(line.coords) for line in geom.geoms]
elif isinstance(geom, LineString):
    full_coords = list(geom.coords)
else:
    full_coords = geom.wkt  # For cases where it's a Point

print(full_coords)
# got at least three lines:
# [[(-127.9, 34.175), (-139.57, 33.1), .., (-177.18, 3.385)], [(179.0, 4.562), (178.93, 3.078), ..., (178.98, -2.983), (179.0, -4.567)], [(-178.1, -8.483), (-175.05, -12.68), ..., (-121.85, 25.48), (-121.49, 28.512)]]

[[(-127.9, 34.175), (-139.57, 33.1), (-142.1, 32.717), (-150.03, 31.378), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-152.95, 22.368), (-153.84, 22.16), (-154.52, 22.03), (-155.03, 21.852), (-154.52, 22.03), (-155.03, 21.852), (-154.52, 22.03), (-155.03, 21.852), (-154.52, 22.03), (-155.03, 21.852), (-154.52, 22.03), (-155.03, 21.852), (-154.52, 22.03), (-155.5, 21.687), (-155.03, 21.852), (-155.5, 21.687), (-155.03, 21.852), (-155.5, 21.687), (-155.03, 21.852), (-155.5, 21.687), (-155.03, 21.852), (-155.5, 21.687), (-155.03, 21.852), (-155.5, 21.687), (-156.07, 21.502), (-158.32, 24.278), (-159.84, 30.0), (-168.47, 30.967), (-177.0, 32.025), (-177.32, 37.475), (-176.97, 40.77), (-176.84, 44.612), (-176.58, 50.445), (-177.31, 53.105)], [(170.45, 46.375), (169.43, 45.228), 

In [13]:
# Create engine with enhanced connection parameters
engine = create_enhanced_engine()

# Initialize PostGIS and create tables
with engine.begin() as conn:  # use engine.begin() to ensure the commands run in a transaction
    conn.execute(text("CREATE EXTENSION IF NOT EXISTS postgis;"))

'''
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS cruisev2_2023 (
        expocode TEXT PRIMARY KEY,
        region TEXT,
        ship TEXT,
        chief_scientist TEXT,
        start_date TEXT,
        end_date TEXT,
        geom GEOMETRY(LINESTRING, 4326)
    );
    """))

    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS glodapv2_2023 (
        id SERIAL PRIMARY KEY,
        expocode TEXT REFERENCES cruisev2_2023(expocode),
        cruise INT,
        station INT,
        region INT,
        cast_number INT,
        year INT,
        month INT,
        datetime TIMESTAMP WITH TIME ZONE,
        latitude FLOAT,
        longitude FLOAT,
        bottomdepth FLOAT,
        maxsampdepth FLOAT,
        bottle FLOAT,
        pressure FLOAT,
        depth FLOAT,
        geom GEOMETRY(POINT, 4326)
    );
    CREATE INDEX IF NOT EXISTS idx_glodap_expocode ON glodapv2_2023(expocode);
    CREATE INDEX IF NOT EXISTS idx_glodap_datetime ON glodapv2_2023(datetime);
    CREATE INDEX IF NOT EXISTS idx_glodap_longitude ON glodapv2_2023(longitude);
    CREATE INDEX IF NOT EXISTS idx_glodap_latitude ON glodapv2_2023(latitude);
    CREATE INDEX IF NOT EXISTS idx_glodap_depth ON glodapv2_2023(depth);
    CREATE INDEX IF NOT EXISTS idx_cruise_expocode ON cruisev2_2023(expocode);
    """))
'''


2025-03-25 15:50:43,372 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-03-25 15:50:43,373 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-03-25 15:50:43,375 INFO sqlalchemy.engine.Engine select current_schema()
2025-03-25 15:50:43,375 INFO sqlalchemy.engine.Engine [raw sql] {}


2025-03-25 15:50:43,376 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-03-25 15:50:43,377 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-03-25 15:50:43,378 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-25 15:50:43,379 INFO sqlalchemy.engine.Engine CREATE EXTENSION IF NOT EXISTS postgis;
2025-03-25 15:50:43,379 INFO sqlalchemy.engine.Engine [generated in 0.00055s] {}
2025-03-25 15:50:43,381 INFO sqlalchemy.engine.Engine COMMIT


'\n    conn.execute(text("""\n    CREATE TABLE IF NOT EXISTS cruisev2_2023 (\n        expocode TEXT PRIMARY KEY,\n        region TEXT,\n        ship TEXT,\n        chief_scientist TEXT,\n        start_date TEXT,\n        end_date TEXT,\n        geom GEOMETRY(LINESTRING, 4326)\n    );\n    """))\n\n    conn.execute(text("""\n    CREATE TABLE IF NOT EXISTS glodapv2_2023 (\n        id SERIAL PRIMARY KEY,\n        expocode TEXT REFERENCES cruisev2_2023(expocode),\n        cruise INT,\n        station INT,\n        region INT,\n        cast_number INT,\n        year INT,\n        month INT,\n        datetime TIMESTAMP WITH TIME ZONE,\n        latitude FLOAT,\n        longitude FLOAT,\n        bottomdepth FLOAT,\n        maxsampdepth FLOAT,\n        bottle FLOAT,\n        pressure FLOAT,\n        depth FLOAT,\n        geom GEOMETRY(POINT, 4326)\n    );\n    CREATE INDEX IF NOT EXISTS idx_glodap_expocode ON glodapv2_2023(expocode);\n    CREATE INDEX IF NOT EXISTS idx_glodap_datetime ON glodap

In [15]:
def create_cruise_geometries(cruise_coords):
    """
    Create cruise geometries with extensive logging and debugging
    
    :param cruise_coords: Dictionary of cruise coordinates
    :return: GeoDataFrame with cruise geometries
    """
    lines = []
    problematic_cruises = []
    
    for expocode, coords in cruise_coords.items():
        # If no coordinates for this cruise, skip
        if not coords:
            problematic_cruises.append((expocode, "No coordinates"))
            continue
        
        # Convert coordinates to DataFrame and sort by datetime
        coords_df = pd.DataFrame(coords, columns=['longitude', 'latitude', 'datetime']).sort_values('datetime')
        
        # Simplify coordinate selection
        unique_coords = coords_df[['longitude', 'latitude']].drop_duplicates().values.tolist()
        
        # Skip if not enough unique coordinates
        if len(unique_coords) < 1:
            problematic_cruises.append((expocode, "Insufficient unique coordinates"))
            continue
        
        try:
            # Create geometry, handling potential antimeridian crossing
            line_geom = split_at_antimeridian(unique_coords)
            
            # Validate geometry
            if line_geom is None:
                problematic_cruises.append((expocode, "Geometry creation returned None"))
                continue
            
            lines.append({'expocode': expocode, 'geom': line_geom})
        except Exception as e:
            problematic_cruises.append((expocode, str(e)))
            print(f"Error creating geometry for {expocode}: {e}")
    
    # Detailed logging of problematic cruises
    if problematic_cruises:
        print("Problematic Cruises:")
        for expocode, reason in problematic_cruises:
            print(f"Expocode: {expocode}, Reason: {reason}")
    
    # Create GeoDataFrame
    lines_gdf = gpd.GeoDataFrame(lines, geometry='geom', crs="EPSG:4326")
    
    # Print summary
    print(f"Total cruises processed: {len(cruise_coords)}")
    print(f"Cruises with valid geometries: {len(lines_gdf)}")
    print(f"Problematic cruises: {len(problematic_cruises)}")
    
    return lines_gdf

def update_cruise_geometries(cruise_coords):
    # Create geometries
    lines_gdf = create_cruise_geometries(cruise_coords)
    
    # Update geometries
    with engine.begin() as conn:
        # Add geometry column if not exists
        conn.execute(text("""
            DO $$ 
            BEGIN
                BEGIN
                    ALTER TABLE cruisev2_2023 ADD COLUMN geom geometry(GEOMETRY, 4326);
                EXCEPTION WHEN duplicate_column THEN
                    -- Column already exists, do nothing
                    NULL;
                END;
            END $$;
        """))
        
        # Batch update geometries
        update_stmt = text("""
            UPDATE cruisev2_2023 
            SET geom = ST_GeomFromText(:geom, 4326) 
            WHERE expocode = :expocode
        """)
        
        # Additional debug logging for batch update
        successful_updates = 0
        failed_updates = []
        
        # Prepare batch parameters
        batch_params = []
        for _, row in lines_gdf.iterrows():
            try:
                # Ensure WKT is valid
                wkt = row['geom'].wkt
                batch_params.append({'geom': wkt, 'expocode': str(row['expocode'])})
            except Exception as e:
                failed_updates.append((row['expocode'], str(e)))
        
        # Execute batch update
        try:
            result = conn.execute(update_stmt, batch_params)
            successful_updates = result.rowcount
        except Exception as e:
            print(f"Batch update failed: {e}")
        
        # Logging
        print(f"Successful geometry updates: {successful_updates}")
        if failed_updates:
            print("Failed Updates:")
            for expocode, reason in failed_updates:
                print(f"Expocode: {expocode}, Reason: {reason}")

def process_glodap_data():
    # Previous code remains the same until cruise metadata insertion
    # Load cruise metadata
    cruise_df = pd.read_csv('../data/glodapv2_2023_cruise_metadata.csv')
    cruise_df.columns = cruise_df.columns.str.lower().str.replace(' ', '_').str.replace('/', '_')
    #cruise_df['cruise_number'] = range(1, len(cruise_df) + 1)  # Assign unique cruise numbers

    # Processing CSV files
    unique_cruises = {}
    cruise_coords = {}
    data_dir = '../data_src/'
    csv_files = sorted(glob.glob(data_dir + '*_Ocean.csv'))

    # First, process and insert GLODAP data
    with engine.connect() as conn:
        for csv_file in csv_files:
            print(f'Processing {csv_file}')
            for chunk in pd.read_csv(csv_file, chunksize=500000):
                # Data preprocessing steps (same as your original code)
                chunk.columns = [col[2:] if col.startswith('G2') else col for col in chunk.columns]
                chunk.replace(-9999, np.nan, inplace=True)

                chunk['longitude'] = chunk['longitude'].apply(adjust_longitude)
                chunk['datetime'] = pd.to_datetime(chunk[['year', 'month', 'day', 'hour', 'minute']], utc=True, errors='coerce')
                chunk.drop(['day', 'hour', 'minute', 'cruise'], axis=1, inplace=True)

                chunk.columns = [
                    f"flag_{col[:-1]}" if col.endswith('f') else
                    f"qc_{col[:-2]}" if col.endswith('qc') else
                    f"err_{col[:-3]}" if col.endswith('err') else col
                    for col in chunk.columns
                ]

                chunk.rename(columns={'cast': 'cast_number'}, inplace=True)

                int_cols = ['region', 'cast_number', 'year', 'month']
                for col in int_cols:
                    chunk[col] = pd.to_numeric(chunk[col], errors='coerce').round().astype('Int64')

                chunk['station'] = chunk['station'].astype(str)
                chunk['geom'] = chunk.apply(lambda row: Point(row['longitude'], row['latitude']), axis=1)
                gdf_chunk = gpd.GeoDataFrame(chunk, geometry='geom', crs="EPSG:4326")

                # Dynamically create glodap data table with explicit types
                if csv_file == csv_files[0] and chunk.index.start == 0:
                    text_columns = ['expocode', 'doi', 'station']
                    column_types = {col: sqlalchemy.types.FLOAT for col in gdf_chunk.columns if col not in text_columns}

                    for col in text_columns:
                        column_types[col] = sqlalchemy.types.TEXT

                    column_types.update({
                        'region': sqlalchemy.types.INTEGER,
                        'cast_number': sqlalchemy.types.INTEGER,
                        'year': sqlalchemy.types.INTEGER,
                        'month': sqlalchemy.types.INTEGER,
                        'datetime': sqlalchemy.types.DateTime(timezone=True),
                        'geom': Geometry('POINT', srid=4326)
                    })

                    gdf_chunk.head(0).to_postgis('glodapv2_2023', engine, if_exists='replace', index=False,
                                                 dtype=column_types)

                # Insert data dynamically
                gdf_chunk.to_postgis('glodapv2_2023', engine, if_exists='append', index=False,
                                     dtype={'geom': Geometry('POINT', srid=4326)})

                for expocode, group in gdf_chunk.groupby('expocode'):
                    sorted_group = group.sort_values('datetime')
                    cruise_coords.setdefault(expocode, []).extend(
                        sorted_group[['longitude', 'latitude', 'datetime']].values.tolist()
                    )

        # Insert cruise metadata into PostgreSQL without geometry
        conn.execute(text("DROP TABLE IF EXISTS cruisev2_2023"))
        cruise_df.to_sql('cruisev2_2023', engine, if_exists='replace', index=False,
            dtype={
                'expocode': sqlalchemy.types.TEXT,
                #'cruise_number': sqlalchemy.types.INTEGER,
                'region': sqlalchemy.types.TEXT,
                'ship': sqlalchemy.types.TEXT,
                'chief_scientist': sqlalchemy.types.TEXT,
                'start_date': sqlalchemy.types.TEXT,
                'end_date': sqlalchemy.types.TEXT,
            })
    
    return cruise_coords
    # Call geometry update after main processing
    # update_cruise_geometries(cruise_coords)

In [None]:
cruise_coords = process_glodap_data()
# print(cruise_coords)

In [17]:
update_cruise_geometries(cruise_coords)

Total cruises processed: 1113
Cruises with valid geometries: 1113
Problematic cruises: 0
2025-03-25 15:59:12,568 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-25 15:59:12,569 INFO sqlalchemy.engine.Engine 
            DO $$ 
            BEGIN
                BEGIN
                    ALTER TABLE cruisev2_2023 ADD COLUMN geom geometry(GEOMETRY, 4326);
                EXCEPTION WHEN duplicate_column THEN
                    -- Column already exists, do nothing
                    NULL;
                END;
            END $$;
        
2025-03-25 15:59:12,570 INFO sqlalchemy.engine.Engine [generated in 0.00075s] {}
2025-03-25 15:59:12,667 INFO sqlalchemy.engine.Engine 
            UPDATE cruisev2_2023 
            SET geom = ST_GeomFromText(%(geom)s, 4326) 
            WHERE expocode = %(expocode)s
        
2025-03-25 15:59:12,668 INFO sqlalchemy.engine.Engine [generated in 0.00290s] [{'geom': 'LINESTRING (7.2267 80.567, 9.46 80.633, 12.853 80.733, 18.588 80.905, 17.668 81.052, 1

In [None]:
# Read and transform variables definition CSV
var_def = pd.read_csv('../data_src/glodapv2_2023_variables_def_new.csv')

# Function to convert WOCE_flag and QC_flag columns
def convert_flags(val):
    if pd.isna(val):
        return val
    val = val.strip('()')
    if val.endswith('f'):
        return f'flag_{val[:-1]}'
    if val.endswith('qc'):
        return f'qc_{val[:-2]}'
    if val.endswith('err'):
        return f'err_{val[:-3]}'    
    return val

var_def['variables'] = var_def['variables'].apply(convert_flags)
var_def['WOCE_flag'] = var_def['WOCE_flag'].apply(convert_flags)
var_def['QC_flag'] = var_def['QC_flag'].apply(convert_flags)

print(var_def)



                          definition       units variables  WOCE_flag QC_flag  \
0                           EXPOCODE         NaN  expocode        NaN     NaN   
1           Digital objectidentifier         NaN       doi        NaN     NaN   
2   Assigned sequentialcruise number         NaN    cruise        NaN     NaN   
3                  Basin identifierc         NaN    region        NaN     NaN   
4                            Station         NaN   station        NaN     NaN   
..                               ...         ...       ...        ...     ...   
59              Total organic carbon  µmol L−1 f       toc   flag_toc     NaN   
60           Dissolved organiccarbon  µmol L−1 f       doc   flag_doc     NaN   
61         Dissolved organicnitrogen  µmol L−1 f       don   flag_don     NaN   
62          Dissolved total nitrogen  µmol L−1 f       tdn   flag_tdn     NaN   
63                     Chlorophyll a   µg kg−1 f      chla  flag_chla     NaN   

   WHP_exchange  
0        

In [101]:
# Save transformed data
var_def.to_csv('../data/glodapv2_2023_variables_def_new.csv', index=False)

In [102]:
# Insert transformed data into PostgreSQL
var_def.to_sql('variablesv2_2023', engine, if_exists='replace', index=False)

2025-03-24 09:29:29,334 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-03-24 09:29:29,335 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-03-24 09:29:29,335 INFO sqlalchemy.engine.Engine select current_schema()
2025-03-24 09:29:29,336 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-03-24 09:29:29,336 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-03-24 09:29:29,337 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-03-24 09:29:29,337 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-24 09:29:29,339 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname

64