In [0]:
%pip install zstandard

# Leyendo los datos desde Crowdsourced Bathymetry in the NOAA Open Data Dissemination (NODD) Program

In [0]:
from pyspark.sql.types import *
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import pandas as pd

# Define schema for better performance
bathymetry_schema = StructType([
    StructField("UNIQUE_ID", StringType(), True),
    StructField("FILE_UUID", StringType(), True),
    StructField("LON", DoubleType(), True),
    StructField("LAT", DoubleType(), True),
    StructField("DEPTH", DoubleType(), True),
    StructField("TIME", StringType(), True),  # Will convert to timestamp later
    StructField("PLATFORM_NAME", StringType(), True),
    StructField("PROVIDER", StringType(), True)
])

def process_bathymetry_efficient(year, month, day):
    """
    Efficiently process bathymetry data
    """
    s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
    bucket = 'noaa-dcdb-bathymetry-pds'
    prefix = f'csb/csv/{year}/{month:02d}/{day:02d}/'
    
    # Get file list
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    
    if 'Contents' not in response:
        return None
    
    # Process first file to get actual schema
    first_key = response['Contents'][0]['Key']
    obj = s3.get_object(Bucket=bucket, Key=first_key)
    sample_df = pd.read_csv(obj['Body'], nrows=10)
    
    print(f"Columns found: {list(sample_df.columns)}")
    
    # Read all files
    all_dfs = []
    for file_info in response['Contents'][:10]:  # Limit for demo
        if file_info['Key'].endswith('.csv'):
            obj = s3.get_object(Bucket=bucket, Key=file_info['Key'])
            df = pd.read_csv(obj['Body'])
            all_dfs.append(df)
    
    if all_dfs:
        combined = pd.concat(all_dfs, ignore_index=True)
        spark_df = spark.createDataFrame(combined)
        
        # Convert TIME to timestamp if it exists
        if 'TIME' in spark_df.columns:
            from pyspark.sql.functions import to_timestamp
            spark_df = spark_df.withColumn(
                "TIME", 
                to_timestamp(col("TIME"))
            )
        
        return spark_df
    
    return None

# Process and save to Delta table
df = process_bathymetry_efficient(2019, 6, 26)

if df:
    # Save as Delta table for efficient querying
    df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bathymetry_data")
    
    # Query the saved data
    result = spark.sql("""
        SELECT 
            PLATFORM_NAME,
            COUNT(*) as num_readings,
            ROUND(AVG(DEPTH), 2) as avg_depth,
            ROUND(MIN(LAT), 4) as min_lat,
            ROUND(MAX(LAT), 4) as max_lat,
            ROUND(MIN(LON), 4) as min_lon,
            ROUND(MAX(LON), 4) as max_lon
        FROM bathymetry_data
        GROUP BY PLATFORM_NAME
        ORDER BY num_readings DESC
        LIMIT 20
    """)
    
    result.show(truncate=False)

# Leyendo los datos de Marine Cadastre

In [0]:
# List contents of the marine-cadastre directory
display(dbutils.fs.ls("/Volumes/proyecto/default/raw_data/marine-cadastre/"))

In [0]:
# Recursively explore the directory structure
def explore_directory(path, max_depth=3, current_depth=0):
    """Recursively explore directory structure"""
    if current_depth >= max_depth:
        return
    
    try:
        items = dbutils.fs.ls(path)
        for item in items:
            indent = "  " * current_depth
            if item.isDir():
                print(f"{indent}üìÅ {item.name}")
                explore_directory(item.path, max_depth, current_depth + 1)
            else:
                size_mb = item.size / (1024 * 1024)
                print(f"{indent}üìÑ {item.name} ({size_mb:.2f} MB)")
    except Exception as e:
        print(f"Error exploring {path}: {e}")

# Explore the marine cadastre directory
explore_directory("/Volumes/proyecto/default/raw_data/marine-cadastre/", max_depth=3)

In [0]:
import zstandard as zstd
import pandas as pd
import io
from pyspark.sql.types import *
from pyspark.sql.functions import *

def read_zst_csv_to_spark(file_path):
    """
    Read a .zst compressed CSV file into a Spark DataFrame
    """
    # Read the compressed file
    with open(file_path.replace("dbfs:", "/dbfs"), 'rb') as f:
        dctx = zstd.ZstdDecompressor()
        decompressed = dctx.decompress(f.read())
        
        # Convert bytes to string and read as CSV
        csv_string = decompressed.decode('utf-8')
        df_pandas = pd.read_csv(io.StringIO(csv_string))
        
        # Convert to Spark DataFrame
        return spark.createDataFrame(df_pandas)

# Read a single file first to understand the structure
sample_file = "/Volumes/proyecto/default/raw_data/marine-cadastre/ais-2025-01-01.csv.zst"
sample_df = read_zst_csv_to_spark(sample_file)

print("AIS Data Schema:")
sample_df.printSchema()
print(f"\nNumber of records: {sample_df.count()}")
sample_df.show(5, truncate=False)