Skip to content

Commit

Permalink
Merge pull request #2818 from hannesmuehleisen/nodeparquet
Browse files Browse the repository at this point in the history
Adding Parquet extension to node module
  • Loading branch information
Mytherin committed Dec 22, 2021
2 parents f9e6e86 + 57309ee commit 20a3c6a
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 22 deletions.
9 changes: 9 additions & 0 deletions extension/parquet/include/parquet_file_metadata_cache.hpp
Expand Up @@ -31,5 +31,14 @@ class ParquetFileMetadataCache : public ObjectCacheEntry {

//! read time
time_t read_time;

public:
static string ObjectType() {
return "parquet_metadata";
}

string GetObjectType() override {
return ObjectType();
}
};
} // namespace duckdb
12 changes: 8 additions & 4 deletions extension/parquet/parquet-extension.cpp
Expand Up @@ -139,11 +139,15 @@ class ParquetScanFunction {
FileSystem &fs = FileSystem::GetFileSystem(context);
for (idx_t file_idx = 1; file_idx < bind_data.files.size(); file_idx++) {
auto &file_name = bind_data.files[file_idx];
auto metadata = std::dynamic_pointer_cast<ParquetFileMetadataCache>(cache.Get(file_name));
auto metadata = cache.Get<ParquetFileMetadataCache>(file_name);
if (!metadata) {
// missing metadata entry in cache, no usable stats
return nullptr;
}
auto handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ, FileSystem::DEFAULT_LOCK,
FileSystem::DEFAULT_COMPRESSION, FileSystem::GetFileOpener(context));
// but we need to check if the metadata cache entries are current
if (!metadata || (fs.GetLastModifiedTime(*handle) >= metadata->read_time)) {
// we need to check if the metadata cache entries are current
if (fs.GetLastModifiedTime(*handle) >= metadata->read_time) {
// missing or invalid metadata entry in cache, no usable stats overall
return nullptr;
}
Expand Down Expand Up @@ -543,4 +547,4 @@ std::string ParquetExtension::Name() {
return "parquet";
}

} // namespace duckdb
} // namespace duckdb
5 changes: 2 additions & 3 deletions extension/parquet/parquet_reader.cpp
Expand Up @@ -377,8 +377,7 @@ ParquetReader::ParquetReader(ClientContext &context_p, string file_name_p, const
if (!ObjectCache::ObjectCacheEnabled(context_p)) {
metadata = LoadMetadata(allocator, *file_handle);
} else {
metadata =
std::dynamic_pointer_cast<ParquetFileMetadataCache>(ObjectCache::GetObjectCache(context_p).Get(file_name));
metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file_name);
if (!metadata || (last_modify_time + 10 >= metadata->read_time)) {
metadata = LoadMetadata(allocator, *file_handle);
ObjectCache::GetObjectCache(context_p).Put(file_name, metadata);
Expand Down Expand Up @@ -765,4 +764,4 @@ bool ParquetReader::ScanInternal(ParquetReaderScanState &state, DataChunk &resul
return true;
}

} // namespace duckdb
} // namespace duckdb
13 changes: 12 additions & 1 deletion src/include/duckdb/storage/object_cache.hpp
Expand Up @@ -23,11 +23,13 @@ class ObjectCacheEntry {
public:
virtual ~ObjectCacheEntry() {
}

virtual string GetObjectType() = 0;
};

class ObjectCache {
public:
shared_ptr<ObjectCacheEntry> Get(string key) {
shared_ptr<ObjectCacheEntry> GetObject(const string &key) {
lock_guard<mutex> glock(lock);
auto entry = cache.find(key);
if (entry == cache.end()) {
Expand All @@ -36,6 +38,15 @@ class ObjectCache {
return entry->second;
}

template <class T>
shared_ptr<T> Get(const string &key) {
shared_ptr<ObjectCacheEntry> object = GetObject(key);
if (!object || object->GetObjectType() != T::ObjectType()) {
return nullptr;
}
return std::static_pointer_cast<T, ObjectCacheEntry>(object);
}

void Put(string key, shared_ptr<ObjectCacheEntry> value) {
lock_guard<mutex> glock(lock);
cache[key] = move(value);
Expand Down
2 changes: 0 additions & 2 deletions third_party/thrift/thrift/config.h

This file was deleted.

2 changes: 1 addition & 1 deletion third_party/thrift/thrift/protocol/TCompactProtocol.h
Expand Up @@ -240,7 +240,7 @@ class TCompactProtocolFactoryT : public TProtocolFactory {
void setContainerSizeLimit(int32_t container_limit) { container_limit_ = container_limit; }

std::shared_ptr<TProtocol> getProtocol(std::shared_ptr<TTransport> trans) override {
std::shared_ptr<Transport_> specific_trans = std::dynamic_pointer_cast<Transport_>(trans);
std::shared_ptr<Transport_> specific_trans = std::static_pointer_cast<Transport_>(trans);
TProtocol* prot;
if (specific_trans) {
prot = new TCompactProtocolT<Transport_>(specific_trans, string_limit_, container_limit_);
Expand Down
2 changes: 0 additions & 2 deletions third_party/thrift/thrift/protocol/TCompactProtocol.tcc
Expand Up @@ -21,8 +21,6 @@

#include <limits>

#include "thrift/config.h"

/*
* TCompactProtocol::i*ToZigzag depend on the fact that the right shift
* operator on a signed integer is an arithmetic (sign-extending) shift.
Expand Down
14 changes: 12 additions & 2 deletions third_party/thrift/thrift/thrift-config.h
Expand Up @@ -17,8 +17,18 @@
* under the License.
*/

#ifndef THRIFT_CONFIG_H
#define THRIFT_CONFIG_H


#ifdef _WIN32
//#include <thrift/windows/config.h>
#if defined(_M_IX86) || defined(_M_X64)
#define ARITHMETIC_RIGHT_SHIFT 1
#define SIGNED_RIGHT_SHIFT_IS 1
#endif
#else
#include "thrift/config.h"
#define SIGNED_RIGHT_SHIFT_IS 1
#define ARITHMETIC_RIGHT_SHIFT 1
#endif

#endif
3 changes: 2 additions & 1 deletion tools/nodejs/.gitignore
Expand Up @@ -5,4 +5,5 @@ src/duckdb.hpp
lib/binding
package-lock.json
test/support/big.db*
test/tmp/*
test/tmp/*
src/parquet-amalgamation*
1 change: 1 addition & 0 deletions tools/nodejs/binding.gyp
Expand Up @@ -8,6 +8,7 @@
"src/connection.cpp",
"src/statement.cpp",
"src/utils.cpp",
"src/parquet-amalgamation.cpp",
"src/duckdb.cpp" # comment this out to build against existing lib
],
"include_dirs": [
Expand Down
9 changes: 3 additions & 6 deletions tools/nodejs/configure
Expand Up @@ -6,12 +6,9 @@ set -x
cd `dirname $0`

if [ ! -f "../../scripts/amalgamation.py" ]; then
echo "Could find neither the amalgamation build script"
echo "Could not find the amalgamation build script"
exit 1
fi

(cd ../.. && python scripts/amalgamation.py --source=tools/nodejs/src/duckdb.cpp --header=tools/nodejs/src/duckdb.hpp)
# (cd ../.. && python extension/parquet/parquet_amalgamation.py --source=tools/rpkg/src/parquet-extension.cpp --header=tools/rpkg/src/parquet-extension.h)
# cp src/parquet-extension.h src/parquet-extension.h.tmp
# sed 's/duckdb[.]hpp/duckdb.h/g' src/parquet-extension.h.tmp > src/parquet-extension.h
# rm src/parquet-extension.h.tmp
(cd ../.. && python scripts/amalgamation.py --extended --source=tools/nodejs/src/duckdb.cpp --header=tools/nodejs/src/duckdb.hpp)
(cd ../.. && python scripts/parquet_amalgamation.py && cp src/amalgamation/parquet-amalgamation* tools/nodejs/src/)
3 changes: 3 additions & 0 deletions tools/nodejs/src/database.cpp
@@ -1,4 +1,5 @@
#include "duckdb_node.hpp"
#include "parquet-amalgamation.hpp"

namespace node_duckdb {

Expand Down Expand Up @@ -28,6 +29,8 @@ struct OpenTask : public Task {
void DoWork() override {
try {
Get<Database>().database = duckdb::make_unique<duckdb::DuckDB>(filename);
duckdb::ParquetExtension extension;
extension.Load(*Get<Database>().database);
success = true;

} catch (std::exception &ex) {
Expand Down
16 changes: 16 additions & 0 deletions tools/nodejs/test/parquet.js
@@ -0,0 +1,16 @@
var sqlite3 = require('..');
var assert = require('assert');
var helper = require('./support/helper');

describe('can query parquet', function() {
var db;

before(function(done) {
db = new sqlite3.Database(':memory:', done);
});

it('should be able to read parquet files', function(done) {
db.run("select * from parquet_scan('../../data/parquet-testing/userdata1.parquet')", done);
});

});

0 comments on commit 20a3c6a

Please sign in to comment.