Skip to content

Commit

Permalink
First test builds successfully
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Oct 1, 2023
1 parent 8874fbe commit 7402412
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 163 deletions.
150 changes: 57 additions & 93 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

#include <azure/identity/default_azure_credential.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/files/datalake.hpp>

#include "arrow/buffer.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/future.h"
#include "arrow/util/string.h"


namespace arrow {
namespace fs {

Expand Down Expand Up @@ -81,7 +81,8 @@ struct AzurePath {
// Expected input here => s = synapsemlfs/testdir/testfile.txt,
// http://127.0.0.1/accountName/pathToBlob
auto src = internal::RemoveTrailingSlash(s);
if (arrow::internal::StartsWith(src, "https://127.0.0.1") || arrow::internal::StartsWith(src, "http://127.0.0.1")) {
if (arrow::internal::StartsWith(src, "https://127.0.0.1") ||
arrow::internal::StartsWith(src, "http://127.0.0.1")) {
RETURN_NOT_OK(FromLocalHostString(&src));
}
auto input_path = std::string(src.data());
Expand Down Expand Up @@ -174,11 +175,9 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re
class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient>& path_client,
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient>& file_client,
const io::IOContext& io_context, const AzurePath& path, int64_t size = kNoSize)
: path_client_(std::move(path_client)),
file_client_(std::move(file_client)),
: file_client_(std::move(file_client)),
io_context_(io_context),
path_(path),
content_length_(size) {}
Expand All @@ -189,7 +188,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
return Status::OK();
}
try {
auto properties = path_client_->GetProperties();
auto properties = file_client_->GetProperties();
if (properties.Value.IsDirectory) {
return ::arrow::fs::internal::NotAFile(path_.full_path);
}
Expand Down Expand Up @@ -225,12 +224,11 @@ class ObjectInputFile final : public io::RandomAccessFile {
}

Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
const io::IOContext& io_context) override {
const io::IOContext& io_context) override {
return metadata_;
}

Status Close() override {
path_client_ = nullptr;
file_client_ = nullptr;
closed_ = true;
return Status::OK();
Expand Down Expand Up @@ -312,7 +310,6 @@ class ObjectInputFile final : public io::RandomAccessFile {
}

protected:
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client_;
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client_;
const io::IOContext io_context_;
AzurePath path_;
Expand All @@ -331,108 +328,73 @@ class ObjectInputFile final : public io::RandomAccessFile {
class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
bool is_hierarchical_namespace_enabled_;
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeServiceClient> service_client_;
AzureOptions options_;

explicit Impl(AzureOptions options, io::IOContext io_context)
: io_context_(io_context), options_(std::move(options)) {}

Status Init() {
// TODO: GH-18014 Delete this once we have a proper implementation. This just
// initializes a pointless Azure blob service client with a fake endpoint to ensure
// the build will fail if the Azure SDK build is broken.
auto default_credential = std::make_shared<Azure::Identity::DefaultAzureCredential>();
auto service_client = Azure::Storage::Blobs::BlobServiceClient(
"http://fake-blob-storage-endpoint", default_credential);
if (options_.backend == AzureBackend::Azurite) {
// gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled
// throws error in azurite
is_hierarchical_namespace_enabled_ = false;
}
service_client_ =
std::make_shared<Azure::Storage::Files::DataLake::DataLakeServiceClient>(
options_.account_dfs_url, options_.storage_credentials_provider);
return Status::OK();
}

const AzureOptions& options() const { return options_; }

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
AzureBlobFileSystem* fs) {
AzureFileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));

if (path.empty()) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
if (!is_hierarchical_namespace_enabled_) {
if (path.path_to_file_parts.size() > 1) {
return Status::IOError(
"Invalid Azure Blob Storage path provided,"
" hierarchical namespace not enabled in storage account");
}
}
ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + path.full_path));
if (!response) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client;
ARROW_ASSIGN_OR_RAISE(
path_client, InitPathClient<Azure::Storage::Files::DataLake::DataLakePathClient>(
options_, dfs_endpoint_url_ + path.full_path, path.container,
path.path_to_file));

std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client;
ARROW_ASSIGN_OR_RAISE(
file_client, InitPathClient<Azure::Storage::Files::DataLake::DataLakeFileClient>(
options_, dfs_endpoint_url_ + path.full_path, path.container,
path.path_to_file));

auto ptr = std::make_shared<ObjectInputFile>(path_client, file_client,
fs->io_context(), path);

// TODO: Wrap this is a try catch and return
// `::arrow::fs::internal::NotAFile(blob_client.GetUrl());` if it fails because the
// file does not exist.
auto file_client =
std::make_shared<Azure::Storage::Files::DataLake::DataLakeFileClient>(
std::move(service_client_->GetFileSystemClient(path.container)
.GetFileClient(path.path_to_file)));
// auto file_client =
// std::make_shared<Azure::Storage::Files::DataLake::DataLakeFileClient>(options_.account_dfs_url,
// path.container, path.path_to_file, options_.storage_credentials_provider);

auto ptr = std::make_shared<ObjectInputFile>(file_client, fs->io_context(), path);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
AzureBlobFileSystem* fs) {
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}

ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path()));

if (!is_hierarchical_namespace_enabled_) {
if (path.path_to_file_parts.size() > 1) {
return Status::IOError(
"Invalid Azure Blob Storage path provided, hierarchical namespace"
" not enabled in storage account");
}
}
ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + info.path()));
if (!response) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client;
ARROW_ASSIGN_OR_RAISE(
path_client,
InitPathClient<Azure::Storage::Files::DataLake::DataLakePathClient>(
options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file));

std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client;
ARROW_ASSIGN_OR_RAISE(
file_client,
InitPathClient<Azure::Storage::Files::DataLake::DataLakeFileClient>(
options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file));

auto ptr = std::make_shared<ObjectInputFile>(path_client, file_client, fs->io_context(),
path, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}

protected:
AzureOptions options_;
// Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
// AzureFileSystem* fs) {
// if (info.type() == FileType::NotFound) {
// return ::arrow::fs::internal::PathNotFound(info.path());
// }
// if (info.type() != FileType::File && info.type() != FileType::Unknown) {
// return ::arrow::fs::internal::NotAFile(info.path());
// }

// ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path()));

// std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client;
// ARROW_ASSIGN_OR_RAISE(
// path_client,
// InitPathClient<Azure::Storage::Files::DataLake::DataLakePathClient>(
// options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file));

// std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client;
// ARROW_ASSIGN_OR_RAISE(
// file_client,
// InitPathClient<Azure::Storage::Files::DataLake::DataLakeFileClient>(
// options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file));

// auto ptr = std::make_shared<ObjectInputFile>(path_client, file_client, fs->io_context(),
// path, info.size());
// RETURN_NOT_OK(ptr->Init());
// return ptr;
// }
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -491,7 +453,8 @@ Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(

Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
const FileInfo& info) {
return impl_->OpenInputFile(info, this);
// return impl_->OpenInputFile(info, this);
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
}

Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
Expand All @@ -501,7 +464,8 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(

Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
const FileInfo& info) {
return impl_->OpenInputFile(info, this);
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
// return impl_->OpenInputFile(info, this);
}

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
Expand Down
Loading

0 comments on commit 7402412

Please sign in to comment.