-
Notifications
You must be signed in to change notification settings - Fork 0
/
arrow.cpp
123 lines (92 loc) · 3.81 KB
/
arrow.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "arrow.hpp"
#include <arrow/filesystem/api.h>
#include <arrow/result.h>
#include <arrow/status.h>
static const std::string localname = "/proc/self/fd";
std::shared_ptr<arrow::Buffer> ArrowFileSystem::ReadFile(
const std::string & name, const std::size_t datasize) {
std::shared_ptr<arrow::fs::LocalFileSystem> localfs =
std::make_shared<arrow::fs::LocalFileSystem>();
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(localname, localfs);
std::shared_ptr<arrow::io::InputStream> inputStream =
*fs->OpenInputStream(name);
std::shared_ptr<arrow::Buffer> buffer = *inputStream->Read(datasize);
return buffer;
}
void ArrowFileSystem::CreateDirectory(
const std::string & base, const std::string & name) {
std::shared_ptr<arrow::fs::LocalFileSystem> localfs =
std::make_shared<arrow::fs::LocalFileSystem>();
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, localfs);
arrow::Status status = fs->CreateDir(name);
// if (!status.ok()) {
// std::cout << "Error: " << status.message() << std::endl;
//}
}
void ArrowFileSystem::DeleteDirectory(
const std::string & base, const std::string & name) {
std::shared_ptr<arrow::fs::LocalFileSystem> localfs =
std::make_shared<arrow::fs::LocalFileSystem>();
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, localfs);
arrow::Status status = fs->DeleteDir(name);
}
void ArrowFileSystem::Move(const std::string & base,
const std::string & from,
const std::string & to) {
std::shared_ptr<arrow::fs::LocalFileSystem> localfs =
std::make_shared<arrow::fs::LocalFileSystem>();
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, localfs);
arrow::Status status = fs->Move(from, to);
}
arrow::fs::S3Options s3Options;
ArrowS3::ArrowS3() {
arrow::fs::S3GlobalOptions s3GlobalOptions{arrow::fs::S3LogLevel::Off};
arrow::Status status = arrow::fs::InitializeS3(s3GlobalOptions);
s3Options.ConfigureAccessKey("randomstring", "randomstring");
s3Options.scheme = "http";
s3Options.endpoint_override = "127.0.0.1:9000";
}
std::shared_ptr<arrow::Buffer> ArrowS3::ReadFile(
const std::string & base, const std::size_t datasize) {
std::string::size_type index = base.find('/');
const std::string bucketname = base.substr(0, index);
const std::string name = base.substr(index + 1);
std::shared_ptr<arrow::fs::S3FileSystem> s3fs =
*arrow::fs::S3FileSystem::Make(s3Options);
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(bucketname, s3fs);
std::shared_ptr<arrow::io::InputStream> inputStream =
*fs->OpenInputStream(name);
std::shared_ptr<arrow::Buffer> buffer = *inputStream->Read(datasize);
return buffer;
}
void ArrowS3::CreateDirectory(
const std::string & base, const std::string & name) {
std::shared_ptr<arrow::fs::S3FileSystem> s3fs =
*arrow::fs::S3FileSystem::Make(s3Options);
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, s3fs);
arrow::Status status = fs->CreateDir(name);
}
void ArrowS3::DeleteDirectory(
const std::string & base, const std::string & name) {
std::shared_ptr<arrow::fs::S3FileSystem> s3fs =
*arrow::fs::S3FileSystem::Make(s3Options);
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, s3fs);
arrow::Status status =
fs->DeleteDir(name); // DeleteDir only delete content
}
void ArrowS3::Move(const std::string & base,
const std::string & src,
const std::string & dest) {
std::shared_ptr<arrow::fs::S3FileSystem> s3fs =
*arrow::fs::S3FileSystem::Make(s3Options);
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::SubTreeFileSystem>(base, s3fs);
arrow::Status status = fs->Move(src, dest);
}