In [None]:
from s3path import S3Path
from getpass import getpass

# S3 path to the S3 backup bucket, change to your bucket.
base_path = S3Path("/bucket/some_prefix/your_export/")
# Connection string to the Postgres database, change to your database.
postgres_con_string = "postgresql+psycopg2://postgres:{ password }@localhost/postgres"

In [None]:
import sqlalchemy as sql

engine = sql.engine.create_engine(postgres_con_string.format(password=getpass()))

In [None]:
import json

# The RDS export to S3 includes a JSON file with metadata about the export.
# We  will use the file to get a list of tables. Change to your file.
with (base_path / "export_tables_info_your_export_from_1_to_XXX.json").open() as f:
    export_metadata = json.load(f)

In [None]:
export_metadata

In [None]:
from dataclasses import dataclass
from typing import Optional

@dataclass
class Table:
    """A table in the export.
    
    Attributes:
        database: The database name.
        schema: The schema name.
        name: The table name.
        output_schema: The schema name to use for the output.
        output_name: The table name to use for the output.
    """
    database: str
    schema: str
    name: str
    output_schema: Optional[str] = None
    output_name: Optional[str] = None

    def get_S3_path(self, base_path: S3Path) -> S3Path:
        return base_path / self.database / f"{self.schema}.{self.name}"
       

In [None]:
tables = [Table(*_.get("target").split(".")) for _ in export_metadata.get("perTableStatus")]

In [None]:
tables

In [None]:
def copy_table_to_postgres(engine: sql.engine.Engine, table: Table):
    """Copy a selection of tables from S3 to a postgres instance.

    params:
        engine: SQLAlchemy Engine for the postgres instance
        table: Table instance from the RDS-to-S3 back up catalog
    """
    import pandas as pd
    name = table.output_name or table.name
    schema = table.output_schema or table.schema
    with engine.connect() as con:
        if not engine.dialect.has_schema(con, schema):
            con.execute(sql.schema.CreateSchema(schema, if_not_exists=True))
            con.commit()
    
    for path in list(table.get_S3_path(base_path).rglob("*.parquet")):
        pd.read_parquet(path.as_uri()).to_sql(
            name,
            engine,
            schema=schema,
            if_exists="append"
        )

In [None]:
from typing import List

def table_filter(tables: List[Table], matcher: str) -> List[Table]:
    """Filter a list of tables by a database/schema/table name.

        params:
            tables: List of Table instances from the RDS-to-S3 back up catalog
            matcher: A string in the format of "database.schema.table" where
                any of the parts can be replaced with "*" to match any value.

        returns:
            List of Table instances that match the matcher string.
    """
    database, schema, name = matcher.split(".")

    def _filter_(table: Table) -> bool:
        if database == "*":
            return True
        elif table.database == database:
            if schema == "*":
                return True
            elif table.schema == schema:
                if name == "*" or table.name == name:
                    return True
        return False

    return list(filter(_filter_, tables))
    

In [None]:
from dataclasses import asdict

def rename_mapper(table: Table, schema: Optional[str] = None, name: Optional[str] = None) -> Table:
    """Create a new Table instance with a new schema and/or name.

        params:
            table: Table instance from the RDS-to-S3 back up catalog
            schema: New schema name, can include {database}, {schema}, and {name}
            name: New table name, can include {database}, {schema}, and {name}

        returns:
            Table instance with the new schema and/or name.
    """

    available_datafields = {
        **asdict(table)
    }

    if schema:
        table.output_schema = schema.format(**available_datafields)
    if name:
        table.output_name = name.format(**available_datafields)

    return table

In [None]:
from typing import Dict
from tqdm.autonotebook import tqdm

def copy_tables(tables: List[Table], matcher: str, engine: sql.engine.Engine, rename_map: Optional[Dict[str, str]] = None):
    """Copy a selection of tables from S3 to a postgres instance.
    
        params:
            tables: List of Table instances from the RDS-to-S3 back up catalog
            matcher: A string in the format of "database.schema.table" where
                any of the parts can be replaced with "*" to match any value.
            engine: SQLAlchemy Engine for the postgres instance
            rename_map: A dictionary of new schema and/or table names. The values
                should be in the format of "some_new_prefix_{schema}" where any of
                the parts can be replaced with "{database}", "{schema}", and/or
                "{name}" to use the original values from the table.
    """
    table_list = list(table_filter(tables, matcher))
    with tqdm(table_list) as pbar:
        for table in table_list:
            pbar.set_description(f"Copying {table.schema}.{table.name}")
            copy_table_to_postgres(
                engine,
                rename_mapper(table, **rename_map) if rename_map else table
            )
            pbar.update(1)

In [None]:
copy_tables(tables, "notorious.twitter.*", engine, {"schema": "{database}_{schema}"})