Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp with timezone cannot be supported by GROUP BY #7604

Closed
Tracked by #3148
acking-you opened this issue Sep 20, 2023 · 7 comments
Closed
Tracked by #3148

Timestamp with timezone cannot be supported by GROUP BY #7604

acking-you opened this issue Sep 20, 2023 · 7 comments
Labels
bug Something isn't working

Comments

@acking-you
Copy link
Contributor

acking-you commented Sep 20, 2023

Describe the bug

When I use TIMESTAMP with time zone, the group by operation reports an error:

Error: External(External(ArrowError(InvalidArgumentError("column types must match schema types, expected Timestamp(Millisecond, Some(\"+08:00\")) but found Timestamp(Millisecond, None) at column index 0"))))

To Reproduce

schema:

Schema::new(vec![
    Field::new("a", DataType::Timestamp(TimeUnit::Millisecond, Some("+08:00".into())),true),
    Field::new("b", DataType::UInt32, true)
])

sql:

select min(a),b from test_table group by b

Expected behavior

just working

Additional context

No response

@acking-you acking-you added the bug Something isn't working label Sep 20, 2023
@alamb
Copy link
Contributor

alamb commented Sep 20, 2023

@acking-you - this looks like a real issue. Why did you close the issue?

@acking-you
Copy link
Contributor Author

@acking-you - this looks like a real issue. Why did you close the issue?

I closed because I thought I had found the cause of the problem. After my attempts I realized that the problem is still not solved, so I think I may need to reopen the issue again😂

@acking-you acking-you reopened this Sep 21, 2023
@alamb
Copy link
Contributor

alamb commented Sep 21, 2023

Thank you @acking-you

Is it possible to post a reproducer for your issue?

I tried this:

DataFusion CLI v31.0.0
❯ create table t(a timestamp, b int) as values ('2023-01-01', 5), ('2023-01-02', 6);
0 rows in set. Query took 0.003 seconds.

❯ create table test_table as select arrow_cast(a, 'Timestamp(Microsecond, Some("+08:00"))') as a, b from t;
0 rows in set. Query took 0.002 seconds.

❯ select min(a),b from test_table group by b;
+---------------------------+---+
| MIN(test_table.a)         | b |
+---------------------------+---+
| 2023-01-02T00:00:00+08:00 | 6 |
| 2023-01-01T00:00:00+08:00 | 5 |
+---------------------------+---+
2 rows in set. Query took 0.003 seconds.

❯ select arrow_typeof(a), arrow_typeof(b) from test_table;
+----------------------------------------+----------------------------+
| arrow_typeof(test_table.a)             | arrow_typeof(test_table.b) |
+----------------------------------------+----------------------------+
| Timestamp(Microsecond, Some("+08:00")) | Int32                      |
| Timestamp(Microsecond, Some("+08:00")) | Int32                      |
+----------------------------------------+----------------------------+
2 rows in set. Query took 0.001 seconds.

@acking-you
Copy link
Contributor Author

@acking-you - this looks like a real issue. Why did you close the issue?

When I run the following code with version 30.0.0 it gives me an error, but 31.0.0 behaves fine:

use arrow::array::{Array, ArrayRef, TimestampMillisecondArray};
use arrow_schema::TimeUnit;
use async_trait::async_trait;
use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
    project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
    SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::*;
use datafusion_expr::Expr;
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() -> Result<()> {
    // create our custom datasource and adding some users
    let db = CustomDataSource::default();
    db.populate_users();
    let ctx = SessionContext::new();
    ctx.register_table("test_table", Arc::new(db))?;
    let df = ctx
        .sql("select min(date),b from test_table group by b")
        .await?;
    df.show().await?;
    Ok(())
}

/// A User, with an id and a bank account
#[derive(Clone, Debug)]
struct User {
    id: u8,
    bank_account: u64,
    date: i64,
}

/// A custom datasource, used to represent a datastore with a single index
#[derive(Clone)]
pub struct CustomDataSource {
    inner: Arc<Mutex<CustomDataSourceInner>>,
}

struct CustomDataSourceInner {
    data: HashMap<u8, User>,
    bank_account_index: BTreeMap<u64, u8>,
}

impl Debug for CustomDataSource {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.write_str("custom_db")
    }
}

impl CustomDataSource {
    pub(crate) async fn create_physical_plan(
        &self,
        projections: Option<&Vec<usize>>,
        schema: SchemaRef,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
    }

    pub(crate) fn populate_users(&self) {
        self.add_user(User {
            id: 1,
            bank_account: 9_000,
            date: 0,
        });
        self.add_user(User {
            id: 2,
            bank_account: 100,
            date: 1,
        });
        self.add_user(User {
            id: 3,
            bank_account: 100,
            date: 2,
        });
    }

    fn add_user(&self, user: User) {
        let mut inner = self.inner.lock().unwrap();
        inner.bank_account_index.insert(user.bank_account, user.id);
        inner.data.insert(user.id, user);
    }
}

impl Default for CustomDataSource {
    fn default() -> Self {
        CustomDataSource {
            inner: Arc::new(Mutex::new(CustomDataSourceInner {
                data: Default::default(),
                bank_account_index: Default::default(),
            })),
        }
    }
}

#[async_trait]
impl TableProvider for CustomDataSource {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        SchemaRef::new(Schema::new(vec![
            Field::new("a", DataType::UInt8, false),
            Field::new("b", DataType::UInt64, true),
            Field::new(
                "date",
                DataType::Timestamp(TimeUnit::Millisecond, Some("+08:00".into())),
                false,
            ),
        ]))
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    async fn scan(
        &self,
        _state: &SessionState,
        projection: Option<&Vec<usize>>,
        // filters and limit can be used here to inject some push-down operations if needed
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        return self.create_physical_plan(projection, self.schema()).await;
    }
}

#[derive(Debug, Clone)]
struct CustomExec {
    db: CustomDataSource,
    projected_schema: SchemaRef,
}

impl CustomExec {
    fn new(
        projections: Option<&Vec<usize>>,
        schema: SchemaRef,
        db: CustomDataSource,
    ) -> Self {
        let projected_schema = project_schema(&schema, projections).unwrap();
        Self {
            db,
            projected_schema,
        }
    }
}

impl DisplayAs for CustomExec {
    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
        todo!()
    }
}

impl ExecutionPlan for CustomExec {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.projected_schema.clone()
    }

    fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
        datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
    }

    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
        None
    }

    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(self)
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        let users: Vec<User> = {
            let db = self.db.inner.lock().unwrap();
            db.data.values().cloned().collect()
        };
        let mut id_array = UInt8Builder::new();
        let mut account_array = UInt64Builder::new();
        let mut columns: Vec<ArrayRef> = vec![];
        for field in &self.projected_schema.fields {
            match field.data_type() {
                DataType::UInt8 => {
                    for u in users.iter() {
                        id_array.append_value(u.id);
                    }
                    columns.push(Arc::new(id_array.finish()));
                }
                DataType::Timestamp(TimeUnit::Millisecond, _) => unsafe {
                    let mut values = Vec::new();
                    for u in users.iter() {
                        values.push(u.date);
                    }
                    columns.push(Arc::new(
                        TimestampMillisecondArray::from_iter_values(values)
                            .with_timezone("+08:00"),
                    ));
                },
                DataType::UInt64 => {
                    for u in users.iter() {
                        account_array.append_value(u.bank_account);
                    }
                    columns.push(Arc::new(account_array.finish()));
                }
                _ => {
                    unreachable!("")
                }
            }
        }

        Ok(Box::pin(MemoryStream::try_new(
            vec![RecordBatch::try_new(
                self.projected_schema.clone(),
                columns,
            )?],
            self.schema(),
            None,
        )?))
    }

    fn statistics(&self) -> Statistics {
        Statistics::default()
    }
}

@alamb
Copy link
Contributor

alamb commented Sep 21, 2023

When I run the following code with version 30.0.0 it gives me an error, but 31.0.0 behaves fine:

This makes sense -- @tustvold and others made significant changes in cleaning up timestamp handling and timezone support in the last release (and the underlying arrow-rs releases)

@acking-you
Copy link
Contributor Author

When I run the following code with version 30.0.0 it gives me an error, but 31.0.0 behaves fine:

This makes sense -- @tustvold and others made significant changes in cleaning up timestamp handling and timezone support in the last release (and the underlying arrow-rs releases)

I tested locally with 28.0.0 and got no error then I closed the issue, but the online version is 30.0.0, then I tested 30.0.0 and got an error, then I had to re-open the issue. so far I tested 31.0.0 and it's fine, then I closed the issue again.😂

@alamb
Copy link
Contributor

alamb commented Sep 21, 2023

Thank you very much for checking @acking-you 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants