In [1]:
# gdb_file = r"D:\data\AIS\global AIS from NOAA\Zone10_2009_01\Zone10_2009_01.gdb"
# outpath = r"D:\data\AIS\global AIS from NOAA"

# Read NOAA collected AIS data from url to file

In [None]:
"""https://apps.dtic.mil/sti/trecms/pdf/AD1193822.pdf"""

In [7]:
import os
from IPython.display import clear_output


""" to make it run - make a req...txt and install later
conda install gdal
pip install h3 polars


"""
# import osgeo
# from osgeo import ogrimport os

import h3
import polars as pl
import zipfile
from io import BytesIO
import urllib.request as urllib2

import fiona
import geopandas as gpd
# thanks to https://stackoverflow.com/questions/11023530/python-to-list-http-files-and-directories
from bs4 import BeautifulSoup
import requests

In [8]:
url = 'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2022/'
ext = 'zip'

# get the url for each file
def listFD(url, ext=''):
    page = requests.get(url).text
    soup = BeautifulSoup(page, 'html.parser')
    return [url + '/' + node.get('href') for node in soup.find_all('a') if node.get('href').endswith(ext)]

In [9]:
# add H3 feature and put timestamps in datetime format
def pl_h3(pf, lat_col='LAT', lon_col='LON', new_col='H3', resolution=16):
    """
    ### TODO: write a preprocessor function that uses this and re-saves the files
    
    this converts lat lon to h3 in polars
    (function to apply a function to 2 columns in polars....)
    """
    return pf.with_columns(pl.struct([lon_col,lat_col]).apply(lambda x: h3.geo_to_h3(lat=x[lat_col], lng=x[lon_col], resolution=7)).alias(new_col))


## Data reading functions

In [10]:
    
# this needs to go inside sav_dat()
def read_gdb_from_zip(gdb_file):
    """
    # if its a geodatabase (gdb)
    ### Function to read very large gdb files into parquet

    """
    def get_gdb_size(gdb_file):
        """
        credit, modified from: 
            https://gis.stackexchange.com/questions/205861/get-row-counts-of-all-tables-in-file-geodatabase-ideally-from-metadata


        """
        #Opens filegdb using ogr driver
        ogdb= ogr.Open(gdb_file)                         

        #counts no. of feature classes in geodatabase
        noOfLyrs = ogdb.GetLayerCount()  

        layer = []
        rows = []
        #loop through feature classes
        for fcIdx in range(0, noOfLyrs):              

            #gets feature class
            fc = ogdb.GetLayer(fcIdx)    

            layer.append(fc.GetName())

            rows.append(fc.GetFeatureCount())


        return list(zip(layer, rows))

    # layers = fiona.listlayers(gdb_file)
    # print(layers)

    import warnings; warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')

    step = 100000
    lat_col = 'LAT'
    lon_col = 'LON'

    layers = get_gdb_size(gdb_file)

    for layer in layers:

        start = 0
        stop = step

        rnd = 0

        while start < layer[-1]:
            rnd+=1

            # dont read past the end of the file
            if stop > layer[-1]:
                stop = layer[-1]

            print('Processing:', layer[0], '\n\t', start, '-', stop)

            filename = f"{egrps['gdb'][0].replace('.gdb', '')}_layer={layer[0]}_range={str(start)+'-'+str(stop)}_h3.parquet"


            print('\tOutfile:', filename)
            if filename in os.listdir(outpath):
                print('\t\tFile already exists')
            else:
                try:
                    gdf = gpd.read_file(gdb_file, 
                          rows=slice(start, stop-1), 
                          engine='fiona',
                          layer=layer[0])

                    # perform h3 conversion only where there is geometry to use
                    if 'geometry' in gdf.columns and not gdf['geometry'].is_null().all():

                        # keep only non-null geometries
                        gdf = gdf.filter(pl.col('geometry').is_not_null())

                        # extract coords from 
                        gdf[lon_col] = gdf['geometry'].apply(lambda x: x.x)
                        gdf[lat_col] = gdf['geometry'].apply(lambda y: y.y)

                        # now that we have read it in, ->polars->h3 encode->parquet file
                        gdf = pl_h3(gdf, 
                                    lat_col=lat_col, 
                                    lon_col=lon_col, 
                                    new_col='H3', 
                                    resolution=16)

                    else:
                        filename = filename.replace('h3', 'NO_COORDS')
                        print('\tNo geometry')
                except:
                    print('  \tFailed to process:', layer[0], '\n\t', start, stop)

                # even if they have no geometry, still save the file
                 # this is a crappy way to convert but all i could make work
                gdf.to_parquet('file.parquet')
                gdf = pl.read_parquet('file.parquet')

                gdf.write_parquet(f'{outpath}{os.sep}{filename}')
                print('\tSuccessfully added file')

            # set extents to iterate in the file
            stop+=step
            start+=step

            if stop >= layer[-1]:
                stop = layer[-1]

            clear_output(wait=True)
    print('Conversion Complete: ', gdb_file)
    
    return True

## File reading and parsing functions

In [9]:
# function to select process by filetype - to add in above

# Get gdb row counts so we can slice






In [11]:
# https://www.rebasedata.com/python-read-gdb
# https://pypi.org/project/poster3/
# https://github.com/dmorrison42/python-poster

# dont actually need these at the moment but can use to stream it in rather than download the file. maybe
from poster3.encode import multipart_encode
from poster3.streaminghttp import register_openers

def save_dat(url, outpath = r"D:\data\AIS\global AIS from NOAA"):

    # read a zip file from a url
    archive = urllib2.urlopen(url).read()
    
    # unzip file
    archive = zipfile.ZipFile(BytesIO(archive))
    
    # get the filetypes so we can account for reading them all
    # get any sub folders
    dirs = list(set([os.path.dirname(x) for x in archive.namelist()]))

    # if there are subfolders check their extensions
    extens = list(set([d.split('.')[-1] for d in dirs]))

    # group folders by extension
    egrps = {e:[d for d in dirs if e in d] for e in extens}

    # get folders without extensions....
    fgrps = [d for d in dirs if d.split('.')[-1] not in extens]
    
    # sort the namelist by folder
    
    # match extensions to folders (really just for gdb)
    
    # based on folder type, read the file(s) and write to folder
    
    print('/tOpened zip from remote source:', )
    
    def read_csv_from_zip():
        # if its a csv file
        """
        output
            h3 encoded parquet file from polars
        """
        
        try:
            # loop through names and save one at a time to save memory
            for f in range(len(archive.namelist())):
                print('\t\tReading file:', archive.namelist()[f])
                pl_h3(pl.read_csv(archive.open(archive.namelist()[f]), 
                                  encoding="utf8-lossy"), 
                      lat_col='LAT', 
                      lon_col='LON', 
                      new_col='H3', 
                      resolution=16).write_parquet(f"{outpath}{os.sep}{archive.namelist()[f].replace('.csv','_h3.parquet')}")
            print('\t\t\tRead csv', f)
        except:
            print('\t\t\tFailed csv', f)
            return False
        
        return True
    
    def read_to_file(name, ext='csv'):
        
        print('read_to_file() | NAME:', name)

        if ext == 'csv':
            print('\t\tCSV read')
            return print('!!!1', read_csv_from_zip())
        elif ext == 'gdb':
            print('\t\tGDB read')
            return print('!!!2', read_gdb_from_zip())

        return True
    
    
    try:
        print('egrps', egrps)
    except:
        pass
    try:
        print('fgrps', fgrps)
    except:
        pass
    
    for e in egrps:
        try:
            read_to_file(egrps[e], ext=e)
            print(e, 'Succeeded')
        except:
            print(e, 'Failed')
    for f in fgrps:
        try:
            read_to_file(fgrps[f], ext=f)
            print(f, 'Succeeded')
        except:
            print(f, 'Failed')
    return archive, egrps, fgrps, extens
    
    
    # what if its something else?!?!
    
        
#                 """ 
#                 limit the fields if you want. not too big though so who cares
#                 ['MMSI','BaseDateTime',
#                 'LAT', 'LON','SOG','COG',
#                 'Heading', 'VesselName', 
#                 'IMO', 'CallSign', 
#                 'VesselType', 'Status',
#                 'Length','Width','Draft',
#                 'Cargo', 'TransceiverClass']
#                 """
            
    return archive, egrps, fgrps

# here is the test run
archive, egrps, fgrps, extens = save_dat(listFD(url.replace('YEAR', str(2009)), ext='zip')[0])

ModuleNotFoundError: No module named 'poster3'

# Main loop

In [28]:
# download AIS data from https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2022/

for i in range(2009, 2022): # these are the available years, 2009-2022
    
    # read file names
    url = f'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/{i}/'
    
    print(f"Reading files from:", url)
    
    urls = listFD(url, ext='zip') # this gets the zip files

    # get the data
    for u in urls:
        try:
            df = pl_h3(pl.from_pandas(save_dat(u)), 
                       lat_col='LAT', 
                       lon_col='LON', 
                       new_col='H3', 
                       resolution=16)
            
            print('\t\t',i,'\n\t\t',u, 'Success')
            
            df.write_parquet(filename)
            
        except:
            print('\t\t',i,'\n\t\t', u, 'Failed')
        
        print('Test')
        break

    # log/store our own minimized and enriched
    break


Reading files from: https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2009/
/tOpened zip from remote source:
egrps {'gdb': ['Zone10_2009_01.gdb']}
fgrps []
read_to_file() | NAME: gdb
		CSV read
		Reading file: Zone10_2009_01.gdb/a00000001.gdbindexes
			Failed csv 0
!!!1 False
gdb Succeeded
		 2009 
		 https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2009//01_January_2009/Zone10_2009_01.zip Failed
Test


Polars found a filename. Ensure you pass a path to the file instead of a python file object when possible for best performance.

In [16]:
df = save_dat(u)
df

/tOpened zip from remote source:
		Reading file: Zone10_2009_01.gdb/a00000001.gdbindexes
			Failed csv 0
False
gdb Succeeded


Polars found a filename. Ensure you pass a path to the file instead of a python file object when possible for best performance.

(<zipfile.ZipFile file=<_io.BytesIO object at 0x7fef118efd10> mode='r'>,
 {'gdb': ['Zone10_2009_01.gdb']},
 [],
 ['gdb'])

In [None]:
# loop to read folders by extension
for ex in extens:
    if ex in egrps.keys():
        print(True)
        pass
    elif ex in fgrps.keys():
        print(True)
        pass

In [None]:
# # Register the streaming http handlers with urllib2
# register_openers()

# # Use multipart encoding for the input files
# datagen, headers = multipart_encode({ 'files[]': open('Zone10_2009_01.gdb', 'rb')})

# datagen, headers = multipart_encode({ 'files[]': open(u, 'rb')})

# # Create the request object
# request = urllib2.Request(u, datagen, headers)

# # Do the request and get the response
# # Here the GDB file gets converted to CSV
# response = urllib2.urlopen(request)

# # Check if an error came back
# if response.info().getheader('Content-Type') == 'application/json':
#     print response.read()
#     sys.exit(1)

# # Write the response to /tmp/output.zip
# with open('/tmp/output.zip', 'wb') as local_file:
#     local_file.write(response.read())

# print 'Conversion result successfully written to /tmp/output.zip!'

In [None]:
# gdf = gpd.read_file(gdb_file, 
#                   rows=slice(int(start), int(stop-1)), 
#                   engine='fiona',
#                   layer=layer)
layer

In [None]:
pl_h3(pl.from_numpy(gdf.to_numpy(), 
                    columns=list(gdf.columns)), # this is atrocious, triple conversion, but cant be helped til geopolars and geoparquet work
      lat_col='LAT', 
      lon_col='LON', 
      new_col='H3', 
      resolution=16).write_parquet(f"{outpath}{os.sep}{archive.namelist()[f].replace('.csv','_h3.parquet')}")


In [None]:
# import pyarrow as pa

# import geopolars as gpl

# reader = pa.ipc.open_file(gdb_file)
# table = reader.read_all()

# df = gpl.from_arrow(table)
# geom = df.get_column("geometry")
# out = geom.centroid()
# print(out)

In [11]:
gdb_file

'D:\\data\\AIS\\global AIS from NOAA\\Zone10_2009_01\\Zone10_2009_01.gdb'