Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Parquet extension to node module #2818

Merged
merged 10 commits into from Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions extension/parquet/include/parquet_file_metadata_cache.hpp
Expand Up @@ -31,5 +31,14 @@ class ParquetFileMetadataCache : public ObjectCacheEntry {

//! read time
time_t read_time;

public:
static string ObjectType() {
return "parquet_metadata";
}

string GetObjectType() override {
return ObjectType();
}
};
} // namespace duckdb
12 changes: 8 additions & 4 deletions extension/parquet/parquet-extension.cpp
Expand Up @@ -139,11 +139,15 @@ class ParquetScanFunction {
FileSystem &fs = FileSystem::GetFileSystem(context);
for (idx_t file_idx = 1; file_idx < bind_data.files.size(); file_idx++) {
auto &file_name = bind_data.files[file_idx];
auto metadata = std::dynamic_pointer_cast<ParquetFileMetadataCache>(cache.Get(file_name));
auto metadata = cache.Get<ParquetFileMetadataCache>(file_name);
if (!metadata) {
// missing metadata entry in cache, no usable stats
return nullptr;
}
auto handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ, FileSystem::DEFAULT_LOCK,
FileSystem::DEFAULT_COMPRESSION, FileSystem::GetFileOpener(context));
// but we need to check if the metadata cache entries are current
if (!metadata || (fs.GetLastModifiedTime(*handle) >= metadata->read_time)) {
// we need to check if the metadata cache entries are current
if (fs.GetLastModifiedTime(*handle) >= metadata->read_time) {
// missing or invalid metadata entry in cache, no usable stats overall
return nullptr;
}
Expand Down Expand Up @@ -543,4 +547,4 @@ std::string ParquetExtension::Name() {
return "parquet";
}

} // namespace duckdb
} // namespace duckdb
5 changes: 2 additions & 3 deletions extension/parquet/parquet_reader.cpp
Expand Up @@ -377,8 +377,7 @@ ParquetReader::ParquetReader(ClientContext &context_p, string file_name_p, const
if (!ObjectCache::ObjectCacheEnabled(context_p)) {
metadata = LoadMetadata(allocator, *file_handle);
} else {
metadata =
std::dynamic_pointer_cast<ParquetFileMetadataCache>(ObjectCache::GetObjectCache(context_p).Get(file_name));
metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file_name);
if (!metadata || (last_modify_time + 10 >= metadata->read_time)) {
metadata = LoadMetadata(allocator, *file_handle);
ObjectCache::GetObjectCache(context_p).Put(file_name, metadata);
Expand Down Expand Up @@ -765,4 +764,4 @@ bool ParquetReader::ScanInternal(ParquetReaderScanState &state, DataChunk &resul
return true;
}

} // namespace duckdb
} // namespace duckdb
13 changes: 12 additions & 1 deletion src/include/duckdb/storage/object_cache.hpp
Expand Up @@ -23,11 +23,13 @@ class ObjectCacheEntry {
public:
virtual ~ObjectCacheEntry() {
}

virtual string GetObjectType() = 0;
};

class ObjectCache {
public:
shared_ptr<ObjectCacheEntry> Get(string key) {
shared_ptr<ObjectCacheEntry> GetObject(const string &key) {
lock_guard<mutex> glock(lock);
auto entry = cache.find(key);
if (entry == cache.end()) {
Expand All @@ -36,6 +38,15 @@ class ObjectCache {
return entry->second;
}

template <class T>
shared_ptr<T> Get(const string &key) {
shared_ptr<ObjectCacheEntry> object = GetObject(key);
if (!object || object->GetObjectType() != T::ObjectType()) {
return nullptr;
}
return std::static_pointer_cast<T, ObjectCacheEntry>(object);
}

void Put(string key, shared_ptr<ObjectCacheEntry> value) {
lock_guard<mutex> glock(lock);
cache[key] = move(value);
Expand Down
2 changes: 0 additions & 2 deletions third_party/thrift/thrift/config.h

This file was deleted.

2 changes: 1 addition & 1 deletion third_party/thrift/thrift/protocol/TCompactProtocol.h
Expand Up @@ -240,7 +240,7 @@ class TCompactProtocolFactoryT : public TProtocolFactory {
void setContainerSizeLimit(int32_t container_limit) { container_limit_ = container_limit; }

std::shared_ptr<TProtocol> getProtocol(std::shared_ptr<TTransport> trans) override {
std::shared_ptr<Transport_> specific_trans = std::dynamic_pointer_cast<Transport_>(trans);
std::shared_ptr<Transport_> specific_trans = std::static_pointer_cast<Transport_>(trans);
TProtocol* prot;
if (specific_trans) {
prot = new TCompactProtocolT<Transport_>(specific_trans, string_limit_, container_limit_);
Expand Down
2 changes: 0 additions & 2 deletions third_party/thrift/thrift/protocol/TCompactProtocol.tcc
Expand Up @@ -21,8 +21,6 @@

#include <limits>

#include "thrift/config.h"

/*
* TCompactProtocol::i*ToZigzag depend on the fact that the right shift
* operator on a signed integer is an arithmetic (sign-extending) shift.
Expand Down
14 changes: 12 additions & 2 deletions third_party/thrift/thrift/thrift-config.h
Expand Up @@ -17,8 +17,18 @@
* under the License.
*/

#ifndef THRIFT_CONFIG_H
#define THRIFT_CONFIG_H


#ifdef _WIN32
//#include <thrift/windows/config.h>
#if defined(_M_IX86) || defined(_M_X64)
#define ARITHMETIC_RIGHT_SHIFT 1
#define SIGNED_RIGHT_SHIFT_IS 1
#endif
#else
#include "thrift/config.h"
#define SIGNED_RIGHT_SHIFT_IS 1
#define ARITHMETIC_RIGHT_SHIFT 1
#endif

#endif
3 changes: 2 additions & 1 deletion tools/nodejs/.gitignore
Expand Up @@ -5,4 +5,5 @@ src/duckdb.hpp
lib/binding
package-lock.json
test/support/big.db*
test/tmp/*
test/tmp/*
src/parquet-amalgamation*
1 change: 1 addition & 0 deletions tools/nodejs/binding.gyp
Expand Up @@ -8,6 +8,7 @@
"src/connection.cpp",
"src/statement.cpp",
"src/utils.cpp",
"src/parquet-amalgamation.cpp",
"src/duckdb.cpp" # comment this out to build against existing lib
],
"include_dirs": [
Expand Down
9 changes: 3 additions & 6 deletions tools/nodejs/configure
Expand Up @@ -6,12 +6,9 @@ set -x
cd `dirname $0`

if [ ! -f "../../scripts/amalgamation.py" ]; then
echo "Could find neither the amalgamation build script"
echo "Could not find the amalgamation build script"
exit 1
fi

(cd ../.. && python scripts/amalgamation.py --source=tools/nodejs/src/duckdb.cpp --header=tools/nodejs/src/duckdb.hpp)
# (cd ../.. && python extension/parquet/parquet_amalgamation.py --source=tools/rpkg/src/parquet-extension.cpp --header=tools/rpkg/src/parquet-extension.h)
# cp src/parquet-extension.h src/parquet-extension.h.tmp
# sed 's/duckdb[.]hpp/duckdb.h/g' src/parquet-extension.h.tmp > src/parquet-extension.h
# rm src/parquet-extension.h.tmp
(cd ../.. && python scripts/amalgamation.py --extended --source=tools/nodejs/src/duckdb.cpp --header=tools/nodejs/src/duckdb.hpp)
(cd ../.. && python scripts/parquet_amalgamation.py && cp src/amalgamation/parquet-amalgamation* tools/nodejs/src/)
3 changes: 3 additions & 0 deletions tools/nodejs/src/database.cpp
@@ -1,4 +1,5 @@
#include "duckdb_node.hpp"
#include "parquet-amalgamation.hpp"

namespace node_duckdb {

Expand Down Expand Up @@ -28,6 +29,8 @@ struct OpenTask : public Task {
void DoWork() override {
try {
Get<Database>().database = duckdb::make_unique<duckdb::DuckDB>(filename);
duckdb::ParquetExtension extension;
extension.Load(*Get<Database>().database);
success = true;

} catch (std::exception &ex) {
Expand Down
16 changes: 16 additions & 0 deletions tools/nodejs/test/parquet.js
@@ -0,0 +1,16 @@
var sqlite3 = require('..');
var assert = require('assert');
var helper = require('./support/helper');

describe('can query parquet', function() {
var db;

before(function(done) {
db = new sqlite3.Database(':memory:', done);
});

it('should be able to read parquet files', function(done) {
db.run("select * from parquet_scan('../../data/parquet-testing/userdata1.parquet')", done);
});

});