Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres connection support #306

Open
jonathandune opened this issue Oct 16, 2023 · 11 comments
Open

postgres connection support #306

jonathandune opened this issue Oct 16, 2023 · 11 comments
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@jonathandune
Copy link

I've built from source with --features "database" but running :

./roapi --table "postgres://

Returns :

{ source: Some(TableError { source: Extension { msg: "unsupported extension in uri: postgres://postgres:dune@localhost:5432/postgres" } })

I do see postgres in

postgres {},
though

Does ROAPI support connecting to a postgres database ?

@holicc
Copy link
Contributor

holicc commented Oct 17, 2023

@jonathandune ROAPI supports Postgres. The error you encountered is because your schema is not correct. Change the URL to postgresql://postgres:dune@localhost:5432/postgres.

@jonathandune
Copy link
Author

Thanks, indeed just realised this by looking at

"postgresql" => Some(TableLoadOption::postgres {}),

I am know getting a panic :
./roapi --table "postgresql://postgres:dune@localhost:5432/postgres"
[2023-10-17T08:21:53Z INFO roapi::context] loading uri(postgresql://postgres:dune@localhost:5432/postgres) as table postgres
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /Users/jonathan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/postgres-0.19.7/src/connection.rs:66:22
stack backtrace:
0: 0x1060daef0 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h782e083efa4700b5
1: 0x104dfb7f8 - core::fmt::write::hdcd44492977669f9
2: 0x1060c8ba4 - std::io::Write::write_fmt::h7f32f27b72970ad5
3: 0x1060dad10 - std::sys_common::backtrace::print::h6868989d298a5d93
4: 0x1060c9a4c - std::panicking::default_hook::{{closure}}::he58f0ab435f26824
5: 0x1060c9718 - std::panicking::default_hook::h0f0ccb5a1a0c4466
6: 0x1060ca34c - std::panicking::rust_panic_with_hook::hf392a622566eb9b6
7: 0x1060db46c - std::panicking::begin_panic_handler::{{closure}}::h07b7cf65243520e5
8: 0x1060db400 - std::sys_common::backtrace::__rust_end_short_backtrace::h4972088065ba8a0d
9: 0x1060c9f48 - _rust_begin_unwind
10: 0x1064736f0 - core::panicking::panic_fmt::hc67fbbe16e2d8abd
11: 0x105bc96c8 - postgres::client::Client::simple_query::h60c2e9dc0ad79aad
12: 0x104d6c134 - r2d2::Pool::get::h187342ef99e68157
13: 0x104d1c0b0 - <connectorx::sources::postgres::PostgresSource<P,C> as connectorx::sources::Source>::fetch_metadata::h11cfb7f0932e2cee
14: 0x104da7b9c - connectorx::dispatcher::Dispatcher<S,D,TP>::run::hb7e68ff2f5a3d964
15: 0x104d99908 - connectorx::get_arrow::get_arrow::he4d30bdaeb7f87e2
16: 0x104cf8824 - columnq::table::database::imp::::to_mem_table::he6f7249e7cbcfa3d
17: 0x104686090 - columnq::table::load::{{closure}}::h551117c6361446a1
18: 0x104697c70 - columnq::columnq::ColumnQ::load_table::{{closure}}::h084c8f95586f07c2
19: 0x1045ea32c - roapi::startup::Application::build::{{closure}}::hd413c2f4aaff7301
20: 0x104709e2c - roapi::main::{{closure}}::h8001bfde547176a0
21: 0x10470608c - roapi::main::h88b9d870bf2f24d6
22: 0x10453372c - std::sys_common::backtrace::__rust_begin_short_backtrace::h87db7e2494fcaf82
23: 0x104535ea0 - std::rt::lang_start::{{closure}}::he3e2d9a6e41ef503
24: 0x10470ccc8 - _main

The DB is up and running and have other processes connected to it so not sure what the issue is.
Is this due to Tokio ?

@holicc
Copy link
Contributor

holicc commented Oct 17, 2023

This error arises from crate (postgres):

/// Opens a connection to a PostgreSQL database.
    pub fn connect<T>(&self, tls: T) -> Result<Client, Error>
    where
        T: MakeTlsConnect<Socket> + 'static + Send,
        T::TlsConnect: Send,
        T::Stream: Send,
        <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
    {
        let runtime = runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap(); // FIXME don't unwrap

        let (client, connection) = runtime.block_on(self.config.connect(tls))?;

        let connection = Connection::new(runtime, connection, self.notice_callback.clone());
        Ok(Client::new(connection, client))
    }

We can see that the code would create a new Runtime, but our own program is already in a Runtime, so that causes this error.

None of those are part of ROAPI, so I have no idea how to fix this now. :(

@houqp
Copy link
Member

houqp commented Oct 18, 2023

It looks like we are using the wrong connectorx api, we are effectively calling a blocking api from an async function, would be better to call the async self.config.connect(tls) method directly from roapi.

@holicc
Copy link
Contributor

holicc commented Oct 20, 2023

It looks like we are using the wrong connectorx api, we are effectively calling a blocking api from an async function, would be better to call the async self.config.connect(tls) method directly from roapi.

Maybe we should call the connectorx api in an independent thread?

pub async fn to_mem_table(
            &self,
            t: &TableSource,
        ) -> Result<datafusion::datasource::MemTable, ColumnQError> {
            debug!("loading database table data...");
            let queries = CXQuery::naked(format!("SELECT * FROM {}", t.name));
            let source = SourceConn::try_from(t.get_uri_str())
                .map_err(|e| ColumnQError::Database(e.to_string()))?;
            let hd = tokio::runtime::Handle::current();
            hd.spawn_blocking(move || {
                let destination = connectorx::get_arrow::get_arrow(&source, None, &[queries])
                    .map_err(|e| ColumnQError::Database(e.to_string()))?;
                Ok(datafusion::datasource::MemTable::try_new(
                    destination.arrow_schema(),
                    vec![destination
                        .arrow()
                        .map_err(|e| ColumnQError::Database(e.to_string()))?],
                )?)
            })
            .await
            .map_err(|e| ColumnQError::Database(e.to_string()))?
        }

@houqp
Copy link
Member

houqp commented Oct 31, 2023

@holicc that feels wrong, do they not have an async api we can call directly from an async context?

@anvarknian
Copy link

anvarknian commented Nov 11, 2023

Hi there,
Any news concerning the Postgresql connector ?
Could this work ?
I've added a runtime parameter to the connect function, and I've replaced runtime::Builder::new_current_thread() with runtime.clone() to clone the existing runtime handle.

use tokio::runtime::Runtime;
pub fn connect<T>(&self, tls: T, runtime: &Runtime) -> Result<Client, Error>
where
    T: MakeTlsConnect<Socket> + 'static + Send,
    T::TlsConnect: Send,
    T::Stream: Send,
    <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
    let (client, connection) = runtime.block_on(self.config.connect(tls))?;

    let connection = Connection::new(runtime.clone(), connection, self.notice_callback.clone());
    Ok(Client::new(connection, client))
}

@houqp
Copy link
Member

houqp commented Nov 12, 2023

@anvarknian i am not actively working on this right now, I am ok with merging a workaround to unblock this in the short run, but the proper way to fix this is to invoke an async api from their end. Have you filed an upstream connectorx ticket to get some ideas on how to best achieve this?

@houqp houqp added bug Something isn't working help wanted Extra attention is needed labels Nov 20, 2023
@chrisfw
Copy link

chrisfw commented Mar 26, 2024

@houqp , Hi, I just wanted to confirm that using postgres from roapi is still not advisable? On a side note, in order to enable connecting to a database in the first place, do I need to compile from source with --features "database" as indicated in the first post on this thread?

Currently, when I try to connect to a postgres database with roapi version 0.11.1, I get the error below:

thread 'main' panicked at /private/tmp/roapi-20240325-5739-76oqyr/roapi-roapi-v0.11.1/roapi/src/startup.rs:63:14:
Failed to create Roapi context: Whatever { source: Some(TableError { source: InvalidUri { msg: "failed to register `uri(postgresql://roapi:sqlapi@10.1.5.39:5432/roapi)` as table `user`, unsupported table format `postgres`" } }), message: "Failed to load table", backtrace: Backtrace [{ fn: "std::backtrace::Backtrace::create" }, { fn: "roapi::startup::Application::build::{{closure}}" }, { fn: "roapi::main::{{closure}}" }, { fn: "roapi::main" }, { fn: "std::sys_common::backtrace::__rust_begin_short_backtrace" }, { fn: "_main" }] }

Thanks for any clarity you can provide.
Regards,
Chris

@houqp
Copy link
Member

houqp commented Apr 3, 2024

@chrisfw yes, you need to compile from source for now, the official build doesn't have the mysql database feature enabled by default.

@chrisfw
Copy link

chrisfw commented Apr 3, 2024

Thanks for your response @houqp. The same holds true for postgres, correct? Can you please provide the exact flag and value that need to be passed? (--features "postgres" ?) Also, is the postgres support stable as it seemed there were some connector-x threading issues?

Regards,
Chris

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

5 participants