In [1]:
import orjson

from sqlalchemy import create_engine, Column, String, Integer, func, event, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import sessionmaker
from geoalchemy2 import Geometry 
from tqdm import tqdm
from shapely.wkt import dumps
import shapely
from shapely.wkb import loads as load_wkb
import random

In [2]:
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database

In [3]:
import ray

In [4]:
ray.init()

2024-11-06 16:08:42,279	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.12
Ray version:,2.37.0
Dashboard:,http://127.0.0.1:8265




In [5]:
ray.cluster_resources()

{'accelerator_type:G': 1.0,
 'node:__internal_head__': 1.0,
 'CPU': 16.0,
 'object_store_memory': 7858042060.0,
 'memory': 15716084123.0,
 'node:172.17.185.211': 1.0,
 'GPU': 1.0}

In [6]:
from sqlalchemy_utils import database_exists, create_database
engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

In [7]:
# Initialize Spatialite extension
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
    with dbapi_connection.cursor() as cursor:
        cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')

In [8]:
# Create a base class for our declarative mapping
Base = declarative_base()

# Define your SQLAlchemy model
class GeometryModel(Base):
    __tablename__ = 'geometries'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    geom = Column(Geometry('POLYGON'))

    @property
    def shapely_geom(self):
        return load_wkb(self.geom.desc) if self.geom else None

  Base = declarative_base()


In [9]:
# Create the table
Base.metadata.create_all(engine)

In [10]:
%%time
# -- orm approach
from sqlalchemy.orm import Session

# Getting the total number of rows
with Session(engine) as session:
    total_rows = session.query(GeometryModel).count()
print(total_rows)    

1063260
CPU times: user 17.7 ms, sys: 5.45 ms, total: 23.2 ms
Wall time: 1.52 s


In [11]:
@ray.remote
def query_id_orm(random_index):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None


        with Session(engine) as session:
            #random_row = session.query(GeometryModel).filter_by(id=random_index).first()
            random_row = session.query(GeometryModel).first()
            #--- approach 1 - return only id
            return random_row.id
            #--- approach 2 - inline convert to shapely
            # geom = random_row.shapely_geom #force convert
            # return random_row.id
            #--- approach 3 - inline convert to shapely and compute area
            #return random_row.shapely_geom.area
                

    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [13]:
%%time
futures = [] 
for _ in range(10_000):
    random_index = random.randint(1, total_rows)
    futures.append(query_id_orm.remote(random_index))
    
for f in tqdm(futures):
    ray.get(f)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [00:44<00:00, 222.70it/s]

CPU times: user 13.4 s, sys: 3.08 s, total: 16.4 s
Wall time: 47.5 s





### test multiple

In [None]:
@ray.remote
def query_id_orm_bulk(total_rows,num):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None

        retval=[]
        with Session(engine) as session:
            for _ in range(num):
                random_index = random.randint(1, total_rows)
                random_row = session.query(GeometryModel).filter_by(id=random_index).first()
                retval.append(random_row.id)
        #     #--- approach 1 - return only id
        #     return random_row.id
        #     #--- approach 2 - inline convert to shapely
        #     # geom = random_row.shapely_geom #force convert
        #     # return random_row.id
        #     #--- approach 3 - inline convert to shapely and compute area
        #     #return random_row.shapely_geom.area
        return retval

    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [None]:
%%time
futures = [] 
num=80
for _ in range(128):
    futures.append(query_id_orm_bulk.remote(total_rows,num))
    
for f in tqdm(futures):
    ray.get(f)

In [None]:
len(sum(ray.get(futures),[]))

In [None]:
ray.get(futures)

In [None]:
@ray.remote
def query_id_wkb(random_index):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None

        with engine.connect() as conn:
            random_row = conn.execute(
                text(f'''
                SELECT id,geom
                FROM geometries 
                WHERE id = {random_index}
                ''')
            ).fetchone()
            obj=shapely.wkb.loads(random_row[1])

            return random_row[0]
            #return obj.area

    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [None]:
%%time
futures = [] 
for _ in range(10_000):
    random_index = random.randint(1, total_rows)
    futures.append(query_id_wkb.remote(random_index))
    
for f in tqdm(futures):
    ray.get(f)

In [None]:
@ray.remote
def query_id_geojson(random_index):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None

        with engine.connect() as conn:
            random_row = conn.execute(
                text(f'''
                SELECT id, ST_AsGeoJSON(geom) as geom 
                FROM geometries 
                WHERE id = {random_index}
                ''')
            ).fetchone()
            return len(random_row[1])
            


    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [None]:
%%time
futures = [] 
for _ in range(10_000):
    random_index = random.randint(1, total_rows)
    futures.append(query_id_geojson.remote(random_index))
    
for f in tqdm(futures):
    ray.get(f)

In [None]:
ray.get(futures)

In [None]:
@ray.remote
def query_id_geojson_area(random_index):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None
        
        with engine.connect() as conn:
            random_row = conn.execute(
                text(f'''
                SELECT id, ST_AsGeoJSON(geom) as geom
                FROM geometries
                WHERE id = {random_index}
                ''')
            ).fetchone()
        obj=shapely.from_geojson(random_row[1])
        return obj.area
            


    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [None]:
%%time
futures = [] 
for _ in range(10_000):
    random_index = random.randint(1, total_rows)
    futures.append(query_id_geojson_area.remote(random_index))
    
for f in tqdm(futures):
    ray.get(f)

In [None]:
@ray.remote
def query_id_geojson_dynamic_centroid(random_index):
    from sqlalchemy import create_engine, Column, String, Integer, func, event, text
    from geoalchemy2 import Geometry 

    engine = create_engine('postgresql://postgres@localhost:5333/test')#,echo=True)

    # Initialize Spatialite extension-
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        with dbapi_connection.cursor() as cursor:
            cursor.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
    
    try:

       # Create a base class for our declarative mapping
        Base = declarative_base()
        
        # Define your SQLAlchemy model
        class GeometryModel(Base):
            __tablename__ = 'geometries'
            id = Column(Integer, primary_key=True)
            name = Column(String)
            geom = Column(Geometry('POLYGON'))
        
            @property
            def shapely_geom(self):
                return load_wkb(self.geom.desc) if self.geom else None
        
        with engine.connect() as conn:
            random_row = conn.execute(
                text(f'''
                SELECT id, ST_AsGeoJSON(geom) as geom, ST_AsGeoJSON(ST_Centroid(geom)) as centroid 
                FROM geometries 
                WHERE id = {random_index}
                ''')
            ).fetchone()
        return len(random_row[1])
            


    except Exception as inst:
        print(inst)
        pass
    finally:
        engine.dispose() ##might be needed? --- yes needed

In [None]:
%%time
futures = [] 
for _ in range(10_000):
    random_index = random.randint(1, total_rows)
    futures.append(query_id_geojson_dynamic_centroid.remote(random_index))
    
for f in tqdm(futures):
    ray.get(f)

### below are spatial query tests

In [None]:
%%timeit
random_index = random.randint(1, total_rows)
    
half_bbox_size= 500
# Step 3: Query for the specific row based on the random index
with engine.connect() as conn:
    random_row = conn.execute(
        text(f'''
        SELECT id, ST_AsGeoJSON(ST_Centroid(geom)) as centroid 
        FROM geometries 
        WHERE id = {random_index}
        ''')
    ).fetchone()
    
    centroid_x,centroid_y=orjson.loads(random_row[1])['coordinates']

    bounding_box_polygons = conn.execute(
        text(f'''
        SELECT id, ST_AsGeoJSON(geom) as geom 
        FROM geometries 
        WHERE ST_Intersects(
            geom,
            ST_MakeEnvelope(
                {centroid_x - half_bbox_size}, {centroid_y - half_bbox_size},
                {centroid_x + half_bbox_size}, {centroid_y + half_bbox_size},
                4326
            )
        )
        ''')
     ).fetchall()



print(len(bounding_box_polygons), end= " " )

In [None]:
%%timeit
random_index = random.randint(1, total_rows)
    
half_bbox_size= 6_000
# Step 3: Query for the specific row based on the random index
with engine.connect() as conn:
    random_row = conn.execute(
        text(f'''
        SELECT id, ST_AsGeoJSON(centroid) as centroid 
        FROM geometries 
        WHERE id = {random_index}
        ''')
    ).fetchone()
    
    centroid_x,centroid_y=orjson.loads(random_row[1])['coordinates']

    bounding_box_polygons = conn.execute(
        text(f'''
        SELECT id, ST_AsGeoJSON(geom) as geom 
        FROM geometries 
        WHERE ST_Intersects(
            centroid,
            ST_MakeEnvelope(
                {centroid_x - half_bbox_size}, {centroid_y - half_bbox_size},
                {centroid_x + half_bbox_size}, {centroid_y + half_bbox_size},
                4326
            )
        )
        ''')
     ).fetchall()



print(len(bounding_box_polygons), end= " " )