# AMSDAL Glue: Multiple Postgres Connections Example

This notebook demonstrates how to use AMSDAL Glue to connect to two postgres databases, one with existing tables and records, 
and another one with no tables.

This example demonstrates how to:

1. Initialize AMSDAL Glue
2. Register connections
3. Register a new schema/table in PostgreSQL database
4. Fetch all schemas/tables from both connections
5. Insert multiple records into `shipping` table
6. Fetch data by single command using joins for tables from different databases


## Databases and data overview

The existing database will contain the following tables and data:


### Table `customers`

| customer_id | first_name | last_name | age | country |
|-------------|------------|-----------|-----|---------|
| 1           | John       | Doe       | 31  | USA     |
| 2           | Robert     | Luna      | 22  | USA     |
| 3           | David      | Robinson  | 22  | UK      |
| 4           | John       | Reinhardt | 25  | UK      |
| 5           | Betty      | Doe       | 28  | UAE     |

### Table `orders`

| order_id | item      | amount | customer_id |
|----------|-----------|--------|-------------|
| 1        | Keyboard  | 400    | 4           |
| 2        | Mouse     | 300    | 4           |
| 3        | Monitor   | 12000  | 3           |
| 4        | Keyboard  | 400    | 1           |
| 5        | Mousepad  | 250    | 2           |

In this example we will also create a new `shipping` table with the following data in a second database:

### Table `shipping`

| shipping_id | status    | customer_id |
|-------------|-----------|-------------|
| 1           | Pending   | 2           |
| 2           | Pending   | 4           |
| 3           | Delivered | 3           |
| 4           | Pending   | 5           |
| 5           | Delivered | 1           |

## Prerequisites

We will run two postgres databases in docker containers, so make sure you have installed Docker locally.

In the same directory as this notebook, you can find a `docker-compose.yml` file that defines two postgres databases.

In order to start the databases, run the following command:

```bash
docker compose up
```

This command will run docker containers with two postgres databases in the foreground. You can stop them by pressing `Ctrl+C`.

Alternatively, you can run the command in the background:

```bash
docker compose up -d
```

To stop the databases, run the following command:

```bash
docker compose down
```

## Install AMSDAL Glue

Now we can start with installing AMSDAL Glue.

In [1]:
!pip install "amsdal-glue[postgres-binary]"


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


AMSDAL Glue also supports the installation of the `psycopg` package from source, that is recommended for production but requires system build tools and extra system libraries (Details [here](https://www.psycopg.org/psycopg3/docs/basic/install.html#local-installation)). To install it, run the following command:

```bash
pip install "amsdal-glue[postgres]"
```

## Initialize default AMSDAL Glue services and containers

The AMSDAL Glue provides a set of ready-to-use services and containers that can be used to connect to databases and execute queries.

Let's initialize the default AMSDAL Glue services and containers.

In [2]:
from amsdal_glue import init_default_containers

init_default_containers()
print("That's it! AMSDAL Glue is ready to use.")

That's it! AMSDAL Glue is ready to use.


## Register connections

Now it's time to register connections to the databases.

In [5]:
from amsdal_glue import Container, ConnectionManager, DefaultConnectionPool, PostgresConnection

existing_db_pool = DefaultConnectionPool(
    PostgresConnection,
    dsn="postgres://db_user:db_password@localhost:5432/db_name_1",
)
new_db_pool = DefaultConnectionPool(
    PostgresConnection,
    dsn="postgres://db_user:db_password@localhost:5433/db_name_2",
)

connection_mng = Container.managers.get(ConnectionManager)
connection_mng.register_connection_pool(existing_db_pool)
connection_mng.register_connection_pool(new_db_pool, schema_name="shipping")

print('Connections registered.')

Connections registered.


Note, above we have created two Postgres connection pools and registered the `existing_db_pool` as a default one, and the `new_db_pool` linked to the `shipping` schema name only. It means, any queries to the `shipping` schema will be executed using the `new_db_pool` connection.

## Now we can fetch all schemas/tables from both connections:

In [12]:
from amsdal_glue import Container, SchemaQueryOperation
from amsdal_glue.interfaces import SchemaQueryService

query_service = Container.services.get(SchemaQueryService)
result = query_service.execute(SchemaQueryOperation(filters=None))

print('Success:', result.success)
print('Error details:', result.message)
print('Found schemas:', len(result.schemas))

for idx, schema in enumerate((result.schemas or [])):
    print(f'Schema {idx + 1}:', schema, end='\n\n')

Success: True
Error details: None
Found schemas: 2
Schema 1: Schema(name='customers', version=<Version.LATEST: 'LATEST'>, namespace='', extends=None, properties=[PropertySchema(name='customer_id', type=<class 'int'>, required=True, description=None, default="nextval('customers_customer_id_seq'::regclass)"), PropertySchema(name='age', type=<class 'int'>, required=False, description=None, default=None), PropertySchema(name='first_name', type=<class 'str'>, required=True, description=None, default=None), PropertySchema(name='last_name', type=<class 'str'>, required=True, description=None, default=None), PropertySchema(name='country', type=<class 'str'>, required=False, description=None, default=None)], constraints=[PrimaryKeyConstraint(name='customers_pkey', fields=['customer_id'])], indexes=[IndexSchema(name='idx_full_name', fields=['first_name', 'last_name'], condition=None)])

Schema 2: Schema(name='orders', version=<Version.LATEST: 'LATEST'>, namespace='', extends=None, properties=[Pr

As we can see, we got two tables from existing database. Now, let's create a new table in second database.

## Create the shipping table

In [14]:
from amsdal_glue import Container, SchemaCommand, RegisterSchema, Version
from amsdal_glue.interfaces import SchemaCommandService
from amsdal_glue import Schema, PropertySchema, PrimaryKeyConstraint

shipping_schema = Schema(
    name="shipping",
    version=Version.LATEST,
    properties=[
        PropertySchema(
            name="shipping_id",
            type=int,
            required=True,
        ),
        PropertySchema(
            name="status",
            type=str,
            required=True,
        ),
        PropertySchema(
            name="customer_id",
            type=int,
            required=True,
        ),
    ],
    constraints=[
        PrimaryKeyConstraint(name="pk_shipping", fields=["shipping_id"]),
    ],
)

service = Container.services.get(SchemaCommandService)
result = service.execute(
    SchemaCommand(
        mutations=[
            RegisterSchema(schema=shipping_schema),
        ]
    ),
)

print('Success:', result.success)
print('Error details:', result.message)

Success: True
Error details: None


Let's run the schema query again to check if the new table was created.

In [15]:
from amsdal_glue import Container, SchemaQueryOperation
from amsdal_glue.interfaces import SchemaQueryService

query_service = Container.services.get(SchemaQueryService)
result = query_service.execute(SchemaQueryOperation(filters=None))

print('Success:', result.success)
print('Error details:', result.message)
print('Found schemas:', len(result.schemas))

for idx, schema in enumerate((result.schemas or [])):
    print(f'Schema {idx + 1}:', schema, end='\n\n')

Success: True
Error details: None
Found schemas: 3
Schema 1: Schema(name='customers', version=<Version.LATEST: 'LATEST'>, namespace='', extends=None, properties=[PropertySchema(name='customer_id', type=<class 'int'>, required=True, description=None, default="nextval('customers_customer_id_seq'::regclass)"), PropertySchema(name='age', type=<class 'int'>, required=False, description=None, default=None), PropertySchema(name='first_name', type=<class 'str'>, required=True, description=None, default=None), PropertySchema(name='last_name', type=<class 'str'>, required=True, description=None, default=None), PropertySchema(name='country', type=<class 'str'>, required=False, description=None, default=None)], constraints=[PrimaryKeyConstraint(name='customers_pkey', fields=['customer_id'])], indexes=[IndexSchema(name='idx_full_name', fields=['first_name', 'last_name'], condition=None)])

Schema 2: Schema(name='orders', version=<Version.LATEST: 'LATEST'>, namespace='', extends=None, properties=[Pr

Perfect! Now we have two tables in the `existing_db_pool` and one table in the `new_db_pool`. It's time to insert some records into the `shipping` table.

## Insert multiple records into `shipping` table

In [17]:
from amsdal_glue import Container, DataCommand, InsertData, Version, Data, SchemaReference
from amsdal_glue.interfaces import DataCommandService

service = Container.services.get(DataCommandService)
result = service.execute(
    DataCommand(
        mutations=[
            InsertData(
                schema=SchemaReference(name="shipping", version=Version.LATEST),
                data=[
                    Data(
                        data={
                            "shipping_id": 1,
                            "status": "Pending",
                            "customer_id": 2,
                        }
                    ),
                    Data(
                        data={
                            "shipping_id": 2,
                            "status": "Pending",
                            "customer_id": 4,
                        }
                    ),
                    Data(
                        data={
                            "shipping_id": 3,
                            "status": "Delivered",
                            "customer_id": 3,
                        }
                    ),
                    Data(
                        data={
                            "shipping_id": 4,
                            "status": "Pending",
                            "customer_id": 5,
                        }
                    ),
                    Data(
                        data={
                            "shipping_id": 5,
                            "status": "Delivered",
                            "customer_id": 1,
                        }
                    ),
                ],
            ),
        ],
    ),
)

print('Success:', result.success)
print('Error details:', result.message)

Error executing mutation: INSERT INTO "shipping" ("customer_id", "shipping_id", "status") VALUES (%s, %s, %s), (%s, %s, %s), (%s, %s, %s), (%s, %s, %s), (%s, %s, %s) with params: [2, 1, 'Pending', 4, 2, 'Pending', 3, 3, 'Delivered', 5, 4, 'Pending', 1, 5, 'Delivered']
Traceback (most recent call last):
  File "/Users/emiltemorov/work/amsdal_tools/amsdal-glue/libs/connections/src/amsdal_glue_connections/sql/connections/postgres_connection.py", line 385, in execute
    cursor = self.connection.execute(query, args)
  File "/Users/emiltemorov/work/amsdal_tools/.venv/glue/lib/python3.10/site-packages/psycopg/connection.py", line 251, in execute
    raise ex.with_traceback(None)
psycopg.errors.UniqueViolation: duplicate key value violates unique constraint "pk_shipping"
DETAIL:  Key (shipping_id)=(1) already exists.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/emiltemorov/work/amsdal_tools/amsdal-glue/libs/connection

Success: False
Error details: Error executing mutation: INSERT INTO "shipping" ("customer_id", "shipping_id", "status") VALUES (%s, %s, %s), (%s, %s, %s), (%s, %s, %s), (%s, %s, %s), (%s, %s, %s) with params: [2, 1, 'Pending', 4, 2, 'Pending', 3, 3, 'Delivered', 5, 4, 'Pending', 1, 5, 'Delivered']


OK, now let's fetch data by single command using joins for tables from different databases.

## Fetch customers with their orders and shipping status

Let's firs of all define the query to fetch the data:

In [18]:
from amsdal_glue import (
    QueryStatement,
    Version,
    SchemaReference,
    JoinQuery,
    JoinType,
    FieldReference,
    Field,
    Conditions,
    Condition,
    FieldLookup,
    OrderByQuery,
    OrderDirection
)

query = QueryStatement(
    only=[
        FieldReference(field=Field(name="customer_id"), table_name="c"),
        FieldReference(field=Field(name="first_name"), table_name="c"),
        FieldReference(field=Field(name="status"), table_name="s"),
    ],
    table=SchemaReference(name="customers", alias="c", version=Version.LATEST),
    joins=[
        JoinQuery(
            table=SchemaReference(
                name="shipping", alias="s", version=Version.LATEST
            ),
            on=Conditions(
                Condition(
                    field=FieldReference(
                        field=Field(name="customer_id"), table_name="s"
                    ),
                    lookup=FieldLookup.EQ,
                    value=FieldReference(
                        field=Field(name="customer_id"), table_name="c"
                    ),
                ),
            ),
            join_type=JoinType.INNER,
        ),
    ],
    order_by=[
        OrderByQuery(
            field=FieldReference(field=Field(name="customer_id"), table_name="c"),
            direction=OrderDirection.ASC,
        ),
        OrderByQuery(
            field=FieldReference(field=Field(name="shipping_id"), table_name="s"),
            direction=OrderDirection.ASC,
        ),
    ],
)

print("Query is defined:", query)

Query is defined: QueryStatement(table=SchemaReference(name='customers', version=<Version.LATEST: 'LATEST'>, alias='c', namespace=None), only=[c.customer_id, c.first_name, s.status], distinct=False, annotations=None, aggregations=None, joins=[JoinQuery(table=SchemaReference(name='shipping', version=<Version.LATEST: 'LATEST'>, alias='s', namespace=None), on=Condition(field=s.customer_id, lookup===, value=c.customer_id, negate=False), join_type=<JoinType.INNER: 'INNER'>)], where=None, group_by=None, order_by=[OrderByQuery(field=c.customer_id, direction=<OrderDirection.ASC: 'ASC'>), OrderByQuery(field=s.shipping_id, direction=<OrderDirection.ASC: 'ASC'>)], limit=None)


Now we can execute the query:

In [19]:
from amsdal_glue import Container, DataQueryOperation
from amsdal_glue.interfaces import DataQueryService

service = Container.services.get(DataQueryService)
result = service.execute(DataQueryOperation(query=query))

print('Success:', result.success)
print('Error details:', result.message)
print('Found records:', len(result.data))
print('Customers report:')

for row in result.data:
    print(
        f'{row.data["first_name"]} (ID: {row.data["customer_id"]}) - Shipping status: {row.data["status"]}'
    )

Success: True
Error details: None
Found records: 5
Customers report:
John (ID: 1) - Shipping status: Delivered
Robert (ID: 2) - Shipping status: Pending
David (ID: 3) - Shipping status: Delivered
John (ID: 4) - Shipping status: Pending
Betty (ID: 5) - Shipping status: Pending


That's it! We have successfully fetched the data from two different databases using a single query.