In [1]:
import os
from pyais import decode
import pandas as pd

In [2]:
# Decode single or multi-part messages
def decode_message(lines, start_idx, multi_part_buffer):
    line = lines[start_idx].strip()
    parts = line.split(',')
    if len(parts) < 6 or not parts[5]:
        raise ValueError("Empty or malformed payload")
    
    if parts[1] == "2" and parts[2] == "1":  # First part
        seq_id = parts[3]
        if start_idx + 1 < len(lines) and lines[start_idx + 1].split(',')[1] == "2" and lines[start_idx + 1].split(',')[2] == "2" and lines[start_idx + 1].split(',')[3] == seq_id:
            return decode(line.split(',2024-')[0], lines[start_idx + 1].split(',2024-')[0])
        else:
            multi_part_buffer[seq_id] = line
            raise ValueError("Missing second part")
    elif parts[1] == "2" and parts[2] == "2":  # Second part
        seq_id = parts[3]
        if seq_id in multi_part_buffer:
            decoded = decode(multi_part_buffer[seq_id].split(',2024-')[0], line.split(',2024-')[0])
            del multi_part_buffer[seq_id]
            return decoded
        raise ValueError("Missing first part")
    return decode(line.split(',2024-')[0])

# Process a chunk of lines
def process_chunk(chunk, multi_part_buffer, error_log):
    decoded_chunk = []
    i = 0
    while i < len(chunk):
        try:
            decoded = decode_message(chunk, i, multi_part_buffer)
            raw = chunk[i] if "2,1" not in chunk[i] else f"{chunk[i].split(',2024-')[0]} + {chunk[i + 1].split(',2024-')[0]}"
            decoded_chunk.append((decoded, raw))
            i += 2 if "2,1" in chunk[i] else 1
        except Exception as e:
            error_msg = f"Error decoding line {i + 1}: {chunk[i]} - {e}"
            print(error_msg)
            error_log.append(chunk[i])
            i += 1
    return decoded_chunk, multi_part_buffer, error_log

In [3]:
# Write to Parquet
def write_to_parquet(decoded_list, output_file):
    rows = []
    for decoded, raw in decoded_list:
        msg_type = decoded.msg_type
        mmsi = decoded.mmsi
        timestamp = raw.split(',2024-')[1] if ',2024-' in raw else 'N/A'
        row = {
            'msg_type': msg_type,
            'mmsi': mmsi,
            'lat': decoded.lat if msg_type in [1, 2, 3, 18] else None,
            'lon': decoded.lon if msg_type in [1, 2, 3, 18] else None,
            'speed': decoded.speed if msg_type in [1, 2, 3, 18] else None,
            'course': decoded.course if msg_type in [1, 2, 3, 18] else None,
            'vessel_name': getattr(decoded, 'name', 'N/A') if msg_type == 5 else None,
            'ship_type': getattr(decoded, 'ship_type', 'N/A') if msg_type == 5 else None,
            'timestamp': timestamp,
            'raw': raw
        }
        rows.append(row)
    
    # Convert to DataFrame
    df = pd.DataFrame(rows)
    
    # If file exists, append to it
    if os.path.exists(output_file):
        existing_df = pd.read_parquet(output_file)
        df = pd.concat([existing_df, df], ignore_index=True)
    
    # Write to parquet
    df.to_parquet(output_file, index=False)

# Summarize decoded messages
def summarize_decoded(decoded_list, section):
    print(f"\n=== {section} ===")
    msg_types = {}
    mmsi_set = set()
    for decoded, raw in decoded_list:
        msg_type = decoded.msg_type
        mmsi = decoded.mmsi
        msg_types[msg_type] = msg_types.get(msg_type, 0) + 1
        mmsi_set.add(mmsi)
        print(f"Type: {msg_type}, MMSI: {mmsi}, Raw: {raw.split(',2024-')[0]}")
        if msg_type in [1, 2, 3, 18]:
            print(f"  Lat: {decoded.lat}, Lon: {decoded.lon}, Speed: {decoded.speed}")
        elif msg_type == 5:
            print(f"  Vessel Name: {getattr(decoded, 'name', 'N/A')}, Ship Type: {getattr(decoded, 'ship_type', 'N/A')}")
    print(f"Message Types: {msg_types}")
    print(f"Unique MMSIs: {len(mmsi_set)}")

In [4]:
# Main processing
filename = "20240911_06053.txt"
output_file = "decoded_ais_data.parquet"
error_log_file = "ais_errors.txt"
chunk_size = 2000  # Increased to reduce split multi-part messages
multi_part_buffer = {}
error_log = []

# Read the file
with open(filename, 'r') as f:
    lines = [line.strip() for line in f.readlines() if line.strip()]
    total_lines = len(lines)
    print(f"Total lines in file: {total_lines}")

    all_decoded = []
    for start in range(0, total_lines, chunk_size):
        end = min(start + chunk_size, total_lines)
        chunk = lines[start:end]
        decoded_chunk, multi_part_buffer, error_log = process_chunk(chunk, multi_part_buffer, error_log)
        all_decoded.extend(decoded_chunk)
        write_to_parquet(decoded_chunk, output_file)
        
        if start == 0:
            summarize_decoded(decoded_chunk[:min(30, len(decoded_chunk))], "First 30 Lines")
        elif start <= total_lines // 2 < end:
            middle_start = max(0, (total_lines // 2) - start - 5)
            middle_end = min(middle_start + 10, len(decoded_chunk))
            summarize_decoded(decoded_chunk[middle_start:middle_end], "Middle Sample")
        if end == total_lines:
            summarize_decoded(decoded_chunk[-min(20, len(decoded_chunk)):], "Last 20 Lines")

# Write errors to log file
with open(error_log_file, 'w') as f:
    f.write("\n".join(error_log))

print(f"Decoding complete. Results saved to {output_file}. Errors logged to {error_log_file}")

Total lines in file: 364186
Error decoding line 1275: !AIVDM,1,1,,A,,0*26,2024-09-11 00:06:15 - Empty or malformed payload

=== First 30 Lines ===
Type: 1, MMSI: 538011058, Raw: !AIVDM,1,1,,B,1815UdP000Ld<0aj<F76cC5f0@Q6,0*49
  Lat: -24.12966, Lon: -46.271113, Speed: 0.0
Type: 1, MMSI: 710003185, Raw: !AIVDM,1,1,,A,1:U79tHP00LcesOjEcbWdOwf26sd,0*3C
  Lat: -23.874703, Lon: -46.373788, Speed: 0.0
Type: 1, MMSI: 538010074, Raw: !AIVDM,1,1,,A,1815QnP00pLd5bGjAP?ijA;R00RU,0*64
  Lat: -23.988802, Lon: -46.292782, Speed: 5.6
Type: 1, MMSI: 636018551, Raw: !AIVDM,1,1,,B,19NSOMh2i;Ld1LijBk>tq:Kj0<3C,0*19
  Lat: -23.953395, Lon: -46.30716, Speed: 7.5
Type: 1, MMSI: 710028580, Raw: !AIVDM,1,1,,B,1:U8e90P00Ld6EejB@R6hwwj0<2u,0*04
  Lat: -23.9682, Lon: -46.29047, Speed: 0.0
Type: 1, MMSI: 375293000, Raw: !AIVDM,1,1,,A,15Ur4B0000Ld0o5jCGgVlmsd0HQV,0*07
  Lat: -23.937817, Lon: -46.30917, Speed: 0.0
Type: 1, MMSI: 710002542, Raw: !AIVDM,1,1,,A,1:U77KdP00LcvVAjCQGTQOwh0L3G,0*2B
  Lat: -23.93371, Lon: -

In [5]:
# Load and display the first few rows of the Parquet file
df = pd.read_parquet("decoded_ais_data.parquet")
df.head()

Unnamed: 0,msg_type,mmsi,lat,lon,speed,course,vessel_name,ship_type,timestamp,raw
0,1,538011058,-24.12966,-46.271113,0.0,170.9,,,09-11 00:00:01,"!AIVDM,1,1,,B,1815UdP000Ld<0aj<F76cC5f0@Q6,0*4..."
1,1,710003185,-23.874703,-46.373788,0.0,196.9,,,09-11 00:00:01,"!AIVDM,1,1,,A,1:U79tHP00LcesOjEcbWdOwf26sd,0*3..."
2,1,538010074,-23.988802,-46.292782,5.6,45.7,,,09-11 00:00:01,"!AIVDM,1,1,,A,1815QnP00pLd5bGjAP?ijA;R00RU,0*6..."
3,1,636018551,-23.953395,-46.30716,7.5,330.0,,,09-11 00:00:01,"!AIVDM,1,1,,B,19NSOMh2i;Ld1LijBk>tq:Kj0<3C,0*1..."
4,1,710028580,-23.9682,-46.29047,0.0,173.1,,,09-11 00:00:01,"!AIVDM,1,1,,B,1:U8e90P00Ld6EejB@R6hwwj0<2u,0*0..."
