Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select query does not work as expected #2124

Closed
bgruenefeld opened this issue Nov 25, 2020 · 9 comments · Fixed by #2150
Closed

Select query does not work as expected #2124

bgruenefeld opened this issue Nov 25, 2020 · 9 comments · Fixed by #2150
Assignees

Comments

@bgruenefeld
Copy link

i have created a data model with the following structure:

root.storagegroup.city.device0001
The time series measured by the device is the energy consumption in 15 minute intervals.
So for example:
root.storagegroup.city.device0001.consumption

There are currently 1000 devices in my data model:

root.storagegroup.city.device0001
root.storagegroup.city.device0002
...
...
root.storagegroup.city.device1000

I would like to query the time series data in a time range (3 days) for all devices of a city with a SQL query.
For this I use the following SQL string:

select consumption from root.storagegroup.city where time > 1546300800000 and time < 1546560000000

The result of this query should contain 1000 time series with 288 (3 x 96) values each.
However, for some time series the result provides far too many values up to 11.808

I have modified the query i used a little bit and find out that the number of expected data points is correct for some time series and too large for others.

for example:

select count(comsumption) from root.storagegroup.city.device0222 where time > 1546300800000 and time < 1546560000000

result is 288 (which is the expected amount)

but the following query

select count(comsumption) from root.storagegroup.city.device0020 where time > 1546300800000 and time < 1546560000000
result is 11.808 which is by far much more than expected 288 ( intersstingly 41 * 288 = 11.808)

So my question is:
How can this happens?
Do I make a mistake in my SQL query or is the error somewhere else?

  • OS: Windows 10
  • Version 0.11.0 RC3
@HTHou
Copy link
Contributor

HTHou commented Nov 26, 2020

Hi, thanks for your feedback. This is a weird issue we haven't meet before.
Could you please let us know how did you insert the data? JDBC, native api or Python client?
It seems like a problem in inserting process.

@bgruenefeld
Copy link
Author

hi,

and thanks for your prompt reply.
I insert data with the Python client with

def insert_tablet(self, tablet):

from the Session Class.

I write data sequentiell and use one session object for all writes.
In one write operation i write 35.040 datapoints per time series (consumption data for one Year -> 96 x365 )

@qiaojialin
Copy link
Member

Hi, it probably due to the concurrency. The session object is not thread-safe. If you want to share some Sessions in multiple threads, please use the SessionPool.

If you still have a problem, please attach the python file, and we would help to check it :)

@bgruenefeld
Copy link
Author

bgruenefeld commented Nov 26, 2020

hi,
followed your advice and openend for each write operation a separate session (as i didn´t found the SessionPool).
The result was the same.
Here is the relevant python code excerpt:

import datetime
from datetime import datetime
from iotdb_session.utils.IoTDBConstants import *
from iotdb_session.utils.Tablet import Tablet
from iotdb_session.Session import Session

def open_connection():
    try:        
        connection = Session(host="127.0.0.1", port="6667", user='root', password='root')        
        connection.open(False)           
    except Exception as e:
        print( e )        
        connection.close()

    return connection

param: data is an array with the following structure
[
[12345678,1.3,1.0],
[12345678,1.3,1.0]
]
param device_id e.g. root.storagegroup.city.device0020

def save_tablet(device_id, data):  
    measurements_ = []
    values      = []
    data_types_ = []
    timestamps  = []

    measurements_.append("comsumption")
    measurements_.append("status")
    data_types_.append(TSDataType.FLOAT)
    data_types_.append(TSDataType.FLOAT)

    for datapoint in data:
        values.append([datapoint[1], datapoint[2]])
        timestamps.append(datapoint[0])    

    tablet_ = Tablet(device_id, measurements_, data_types_, values, timestamps)
    try:  
        connection = open_connection() 
        connection.insert_tablet(tablet_)
        connection.close() 
    except Exception as e:
        print('an error occured')  
        print(e)  
        connection.close()
    
        
def create_data(ts_prefix, amount_timeseries):  
    data = create_timeseries_data(1546300800000, amount)
      
    for index in range(1, amount_timeseries):        
        device_id = ts_prefix + index                                                                                              
        save_tablet(device_id, data)

@SteveYurongSu
Copy link
Member

Hi @bgruenefeld ,

I tried to reproduce the case you met using the Java session API and the Python client, but both of them worked well on my MacBook.

Here is my code, you can have a test on your computer. Let me know if you meet the issue again, then I will do a detail check. :D

OS: Mac Big Sur
IoTDB Server: 0.11.0
IoTDB Python Client: 0.11.0 (https://pypi.org/project/apache-iotdb/)

Python client:

from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType
from iotdb.utils.Tablet import Tablet

connection = None

def open_connection():
    global connection
    try:        
        connection = Session(host="127.0.0.1", port="6667", user='root', password='root')        
        connection.open(False)           
    except Exception as e:
        print( e )        
        connection.close()
        

def create_timeseries_data(timestamp, amount):
    ret = []
    for i in range(amount):
        ret.append([timestamp + i * 900000, 1.3, 1.0])
    return ret


def save_tablet(device_id, data):  
    global connection

    measurements_ = []
    values      = []
    data_types_ = []
    timestamps  = []

    measurements_.append("comsumption")
    measurements_.append("status")
    data_types_.append(TSDataType.FLOAT)
    data_types_.append(TSDataType.FLOAT)

    for datapoint in data:
        values.append([datapoint[1], datapoint[2]])
        timestamps.append(datapoint[0])    

    tablet_ = Tablet(device_id, measurements_, data_types_, values, timestamps)
    try:  
        connection.insert_tablet(tablet_)
    except Exception as e:
        print('an error occured')  
        print(e)  
        connection.close()


def create_data(ts_prefix, amount_timeseries):  
    data = create_timeseries_data(1546300800000, 288)
      
    for index in range(1, amount_timeseries):        
        device_id = ts_prefix + str(index)
        save_tablet(device_id, data)


def query(ts_prefix, amount_timeseries):
    for index in range(1, amount_timeseries):   
        sql = "select count(*) from %s where time >= 1546300800000 and time < 1546560000000" % (ts_prefix + str(index))
        session_data_set = connection.execute_query_statement(sql)
        while session_data_set.has_next():
            fields = session_data_set.next().get_fields()
            print(fields[0].get_long_value(), fields[1].get_long_value())
            assert 288 == fields[0].get_long_value() and 288 == fields[1].get_long_value()
        session_data_set.close_operation_handle()


if __name__ == "__main__":
    ts_prefix = "root.storagegroup.city.device"

    open_connection()
    create_data(ts_prefix, 1000)
    query(ts_prefix, 1000)
    connection.close()

    print("All executions done!!")

Java session:

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;

public class SessionExample {

  private static Session session;

  public static void main(String[] args)
      throws IoTDBConnectionException, StatementExecutionException {
    session = new Session("127.0.0.1", 6667, "root", "root");
    session.open(false);

    try {
      session.setStorageGroup("root.storagegroup");
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }

    createTimeseries();
    insertRecords();
    query();
    session.close();
  }

  private static void createTimeseries()
      throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 1000; ++i) {
      String path = "root.storagegroup.city.device";
      if (i < 10) {
        path += "000";
      } else if (i < 100) {
        path += "00";
      } else if (i < 1000) {
        path += "0";
      }
      path += i;

      String comsumption = path + ".comsumption";
      if (!session.checkTimeseriesExists(comsumption)) {
        session.createTimeseries(comsumption, TSDataType.FLOAT, TSEncoding.GORILLA,
            CompressionType.SNAPPY);
      }
      System.out.println(comsumption);

      String status = path + ".status";
      if (!session.checkTimeseriesExists(status)) {
        session.createTimeseries(status, TSDataType.FLOAT, TSEncoding.GORILLA,
            CompressionType.SNAPPY);
      }
      System.out.println(status);
    }
  }

  private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 1000; ++i) {
      String deviceId = "root.storagegroup.city.device";
      if (i < 10) {
        deviceId += "000";
      } else if (i < 100) {
        deviceId += "00";
      } else if (i < 1000) {
        deviceId += "0";
      }
      deviceId += i;

      List<String> measurements = new ArrayList<>();
      measurements.add("comsumption");
      measurements.add("status");

      List<String> deviceIds = new ArrayList<>();
      List<List<String>> measurementsList = new ArrayList<>();
      List<List<Object>> valuesList = new ArrayList<>();
      List<Long> timestamps = new ArrayList<>();
      List<List<TSDataType>> typesList = new ArrayList<>();

      for (long time = 1546300800000L; time < 1546560000000L; time += 900000L) {
        List<Object> values = new ArrayList<>();
        List<TSDataType> types = new ArrayList<>();
        values.add(1.3f);
        values.add(1.0f);
        types.add(TSDataType.FLOAT);
        types.add(TSDataType.FLOAT);

        deviceIds.add(deviceId);
        measurementsList.add(measurements);
        timestamps.add(time);
        typesList.add(types);
        valuesList.add(values);
      }

      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
    }
  }

  private static void query() throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 1000; ++i) {
      String deviceId = "root.storagegroup.city.device";
      if (i < 10) {
        deviceId += "000";
      } else if (i < 100) {
        deviceId += "00";
      } else if (i < 1000) {
        deviceId += "0";
      }
      deviceId += i;

      SessionDataSet dataSet = session.executeQueryStatement(String.format(
          "select count(comsumption), count(status) from %s where time >= 1546300800000 and time < 1546560000000",
          deviceId));
      System.out.println(dataSet.getColumnNames());
      while (dataSet.hasNext()) {
        System.out.println(dataSet.next());
      }
      dataSet.closeOperationHandle();
    }
  }
}

@SteveYurongSu
Copy link
Member

BTW, it's seemed that you used the iotdb-session (https://pypi.org/project/iotdb-session-0.10.2/ ?). I am not sure if it is compatible with the v0.11 server.

@bgruenefeld
Copy link
Author

Hi @SteveYurongSu

thanks for your work, but the issue is still persist.
Unfortunately my description was a bit unclear.
I try to store 35.040 DP with each tablet.
Maybe you ca do a quick by changing the number in the createData method:

`

def create_data(ts_prefix, amount_timeseries):  
    data = create_timeseries_data(1546300800000, 35040)
  
    for index in range(1, amount_timeseries):        
        device_id = ts_prefix + str(index)
        save_tablet(device_id, data)

`

query the data for a device i get

IoTDB> select count(comsumption) from root.storagegroup.city.device1
+-------------------------------------------------+
|count(root.storagegroup.city.device1.comsumption)|
+-------------------------------------------------+
| 1261440|
+-------------------------------------------------+
Total line number = 1
It costs 0,012s
IoTDB>

is there maybe a limit in batch size when storing data by a tablet?

@SteveYurongSu
Copy link
Member

Hi @bgruenefeld ,

I reproduced the case successfully! I'm sure that it's a bug, we will fix it ASAP.

Thanks for reporting this!

@SteveYurongSu
Copy link
Member

I think the bug is introduced by the merge strategy. When I set the compaction_strategy to NO_COMPACTION, everything works well.

Here is the code to reproduce the case. (set the compaction_strategy to LEVEL_COMPACTION)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb;

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;

public class SessionExample {

  private static Session session;

  public static void main(String[] args)
      throws IoTDBConnectionException, StatementExecutionException {
    session = new Session("127.0.0.1", 6667, "root", "root");
    session.open(false);

    try {
      session.setStorageGroup("root.storagegroup");
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }

    createTimeseries();
    insertRecords();
    query();
    session.close();
  }

  private static void createTimeseries()
      throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 10; ++i) {
      String path = "root.storagegroup.city.device";
      if (i < 10) {
        path += "000";
      } else if (i < 100) {
        path += "00";
      } else if (i < 1000) {
        path += "0";
      }
      path += i;

      String comsumption = path + ".comsumption";
      if (!session.checkTimeseriesExists(comsumption)) {
        session.createTimeseries(comsumption, TSDataType.FLOAT, TSEncoding.GORILLA,
            CompressionType.SNAPPY);
      }
      System.out.println(comsumption);

      String status = path + ".status";
      if (!session.checkTimeseriesExists(status)) {
        session.createTimeseries(status, TSDataType.FLOAT, TSEncoding.GORILLA,
            CompressionType.SNAPPY);
      }
      System.out.println(status);
    }
  }

  private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 10; ++i) {
      String deviceId = "root.storagegroup.city.device";
      if (i < 10) {
        deviceId += "000";
      } else if (i < 100) {
        deviceId += "00";
      } else if (i < 1000) {
        deviceId += "0";
      }
      deviceId += i;

      List<String> measurements = new ArrayList<>();
      measurements.add("comsumption");
      measurements.add("status");

      List<String> deviceIds = new ArrayList<>();
      List<List<String>> measurementsList = new ArrayList<>();
      List<List<Object>> valuesList = new ArrayList<>();
      List<Long> timestamps = new ArrayList<>();
      List<List<TSDataType>> typesList = new ArrayList<>();

      for (long time = 1546300800000L; time < 1546300800000L + 900000L * 35040; time += 900000L) {
        List<Object> values = new ArrayList<>();
        List<TSDataType> types = new ArrayList<>();
        values.add(1.3f);
        values.add(1.0f);
        types.add(TSDataType.FLOAT);
        types.add(TSDataType.FLOAT);

        deviceIds.add(deviceId);
        measurementsList.add(measurements);
        timestamps.add(time);
        typesList.add(types);
        valuesList.add(values);
      }

      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
    }
  }

  private static void query() throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 1; i <= 10; ++i) {
      String deviceId = "root.storagegroup.city.device";
      if (i < 10) {
        deviceId += "000";
      } else if (i < 100) {
        deviceId += "00";
      } else if (i < 1000) {
        deviceId += "0";
      }
      deviceId += i;

      SessionDataSet dataSet = session.executeQueryStatement(String.format(
          "select count(comsumption), count(status) from %s where time >= 1546300800000 and time < 1546560000000",
          deviceId));
      System.out.println(dataSet.getColumnNames());
      while (dataSet.hasNext()) {
        System.out.println(dataSet.next());
      }
      dataSet.closeOperationHandle();
    }
  }
}

The bug is first introduced by the commit db4318302.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants