In [1]:
import pandas as pd
import geopandas as gpd
import sqlalchemy as sa

from sqlalchemy import event
from sqlalchemy.sql.functions import GenericFunction
from sqlalchemy.sql import quoted_name, text
from sqlalchemy.types import BLOB, NullType

from geoalchemy2.types import Geometry, _GISType
from geoalchemy2.elements import WKBElement

from pprint import pprint
from oracledb import init_oracle_client

## Vamos brincar de Reflection

In [2]:
src_schema_name = "litoestratigrafia"
src_table_name = "ue_layer_25000"

dst_schema_name = "public"
dst_table_name = f"{src_schema_name}_25000"

In [None]:
# Thin client
init_oracle_client()

with open("oracle-dsn.txt") as f:
    src_engine = sa.create_engine(f.read())

### SDE Version

In [None]:
with src_engine.connect() as conn:
    _table = 'sde.sde_version'

    if conn.dialect.name == 'oracle':
        _table = 'sde.version'        

    major, minor, bugfix = conn.execute(
        text(f"SELECT major, minor, bugfix FROM {_table}"),
        
    ).fetchone()

major, minor, bugfix

### Feature Class props

In [6]:
table_props = {}

# https://desktop.arcgis.com/en/arcmap/latest/manage-data/using-sql-with-gdbs/
with src_engine.connect() as conn:
    # Primary key (RowID)
    table_props['pk_column'] = conn.execute(
        text("SELECT sde.gdb_util.rowid_name(:schema, :table) FROM DUAL"),
        {"table": src_table_name, "schema": src_schema_name}
    ).scalar()

    # table_props['is_archive'] = conn.execute(
    #     text("SELECT sde.gdb_util.IS_ARCHIVE_ENABLED(:schema, :table) FROM DUAL"),
    #     {"table": src_table_name.lower(), "schema": src_schema_name.lower()}
    # ).scalar()

    # If a table is not simple, it should not be edited outside ArcGIS.
    # table_props['is_simple'] = conn.execute(
    #     text("SELECT sde.gdb_util.is_simple(:schema, :table) FROM DUAL"),
    #     {"table": src_table_name, "schema": src_schema_name}
    # ).scalar()

    table_props['is_versioned'] = conn.execute(
        text("SELECT sde.gdb_util.IS_VERSIONED(:schema, :table) FROM DUAL"),
        {"table": src_table_name, "schema": src_schema_name}
    ).scalar()

    table_props['is_replicated'] = conn.execute(
        text("SELECT sde.gdb_util.is_replicated(:schema, :table) FROM DUAL"),
        {"table": src_table_name, "schema": src_schema_name}
    ).scalar()



In [None]:
_table_name = f"{src_schema_name}.{src_table_name}"
geometry_props = []

with src_engine.connect() as conn:
    geometries = [row[0] for row in conn.execute(
        text("SELECT sde.gdb_util.geometry_columns(:schema, :table) FROM DUAL"),
        {"table": src_table_name, "schema": src_schema_name}
    ).fetchall()]

    geometry_prop = {}
    
    for geometry in geometries:
        # column_name
        geometry_prop["name"] = geometry 

        # SRID
        geometry_prop['srid'] = conn.execute(
            text(f"SELECT DISTINCT sde.st_srid({geometry}) FROM {_table_name}")
        ).scalar()

        # Geometry type
        _type = [row[0] for row in conn.execute(
            text(f"SELECT DISTINCT replace(replace(upper(sde.st_geometrytype({geometry})), 'ST_', ''), 'MULTI', '') FROM {_table_name}")
        ).fetchall()]

        assert len(_type) > 0

        geometry_prop['type'] = _type[0] if len(_type) == 1 else 'GEOMETRYCOLLECTION'

        # Simple or multipart
        geometry_prop['is_multi'] = bool(conn.execute(
            text(f"SELECT max(sde.st_numgeometries({geometry})) FROM {_table_name}")
        ).scalar() == 1)

        # Empty geometry
        geometry_prop['has_empty'] = any([row[0] for row in conn.execute(
            text(f"SELECT DISTINCT sde.st_isempty({geometry}) FROM {_table_name}")
        ).fetchall()])

        # 3d
        geometry_prop['is_3d'] = any([row[0] for row in conn.execute(
            text(f"SELECT DISTINCT sde.st_is3d({geometry}) FROM {_table_name}")
        ).fetchall()])

        # Measured
        geometry_prop['is_measured'] = any([row[0] for row in conn.execute(
            text(f"SELECT DISTINCT sde.st_ismeasured({geometry}) FROM {_table_name}")
        ).fetchall()])
        
        # OGC Simple Feature
        geometry_prop['is_simple'] = all([row[0] for row in conn.execute(
            text(f"SELECT DISTINCT sde.st_issimple({geometry}) FROM {_table_name}")
        ).fetchall()])

        geometry_props.append(geometry_prop)

table_props["geometry_columns"] = geometry_props

pprint(table_props)

In [8]:
_geom_col = table_props["geometry_columns"][0]["name"].lower()
_srid = table_props["geometry_columns"][0]["srid"]
_type = table_props["geometry_columns"][0]["type"]
_pk = table_props["pk_column"].lower()

In [None]:
class SDEAsBinary(GenericFunction):
    type = BLOB
    name = quoted_name("SDE.ST_ASBINARY", False)
    identifier = "sde_asbinary"
    inherit_cache=True
    
class SDEFromWKB(GenericFunction):
    type = Geometry
    name = quoted_name("SDE.ST_GEOMFROMWKB", False)
    identifier = "sde_fromwkb"
    inherit_cache=True

class STGeometryOracle(_GISType):
    name = "st_geometry"
    as_binary = SDEAsBinary.identifier
    from_text = SDEFromWKB.identifier
    ElementType = WKBElement
    cache_ok = False

src_metadata = sa.MetaData(schema=src_schema_name)

@event.listens_for(src_metadata, "column_reflect")
def genericize_datatypes(inspector, tablename, column_dict):
    # Checar Numeric com scale=0 e as_decimal=False: < 5 smallint 5 <= x <= 9, int; > 9 bigint
    if not type(column_dict["type"]) is NullType and column_dict["name"] != _geom_col: 
        column_dict["type"] = column_dict["type"].as_generic()
    else: 
        column_dict["type"] = STGeometryOracle(_type, srid=_srid, spatial_index=False)

src_table = sa.Table(
    src_table_name, 
    src_metadata, 
    autoload_with=src_engine, 
)

# Ignore SRC indexes
src_table.indexes = set()

### Show columns

In [None]:
pprint([column for column in src_table.columns])

### Select statement from sources

In [None]:
pprint(str(src_table.select()))

## Load select table in GeoPandas

## Destiny DB

In [None]:
dst_engine = sa.create_engine("postgresql://airflow:airflow@lrnpdbmsd05/airflow_gold")

with dst_engine.connect() as conn:
    print(
        conn.execute(text("SELECT postgis_version()")).scalar()
    )

In [None]:
_dst_table_name = f"{dst_table_name}_sqlalchemy"

dst_metadata = sa.MetaData()
dst_table = src_table.to_metadata(
    dst_metadata, 
    schema=dst_schema_name, 
    name=_dst_table_name
)

sa.Column()

# Add geometry Index
sa.Index(f"{_dst_table_name}_geometry_gist", dst_table.columns[_geom_col], postgresql_using="gist")

# Primary key
dst_table.primary_key = sa.PrimaryKeyConstraint(dst_table.columns[_pk], name=f"{_dst_table_name}_pk")
dst_table.columns[_pk].name = 'fid'
dst_table.columns[_pk].autoincrement = False

# Geometry
dst_table.columns[_geom_col].name = 'geometry'
dst_table.columns[_geom_col].type = Geometry(_type, srid=_srid)
dst_table.columns[_geom_col].key = 'geometry'

pprint([column for column in dst_table.columns])

In [None]:
pprint(str(dst_table.select()))

In [30]:
dst_table.create(bind=dst_engine, checkfirst=True)

### Read data from source

In [None]:
geodata = (
    gpd.read_postgis(
        src_table.select(), 
        src_engine,
        geom_col=_geom_col,
        crs=_srid
    )
    .rename(columns={_pk: "fid"})
    .rename_geometry("geometry")
    .set_index("fid")
)

geodata.info()

### Write to DestinyDB

In [32]:
dst_table_write_to = _dst_table_name

_dst_table_args = {
    "name": dst_table_write_to,
    "con": dst_engine,
    "schema": dst_schema_name,
    "index": True, 
    "chunksize": 2000
}

try:
    geodata.to_postgis(**_dst_table_args)

except ValueError:
    geodata.to_postgis(if_exists='append', **_dst_table_args)