Skip to content

Commit

Permalink
Convert Datastore to GAX
Browse files Browse the repository at this point in the history
[closes #850]
  • Loading branch information
quartzmo committed Oct 7, 2016
1 parent 1aa0a15 commit 6aa2102
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 574 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module Datastore
#
class Dataset
##
# @private The gRPC Service object.
# @private The Service object.
attr_accessor :service

##
Expand Down
114 changes: 46 additions & 68 deletions google-cloud-datastore/lib/google/cloud/datastore/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
# limitations under the License.


require "google/cloud/errors"
require "google/cloud/datastore/credentials"
require "google/datastore/v1/datastore_pb"
require "google/cloud/core/grpc_backoff"
require "google/cloud/datastore/version"
require "google/datastore/v1/datastore_services_pb"
require "google/cloud/datastore/v1/datastore_api"

module Google
module Cloud
module Datastore
##
# @private Represents the gRPC Datastore service, including all the API
# @private Represents the GAX Datastore service, including all the API
# methods.
class Service
attr_accessor :project, :credentials, :host, :retries, :timeout
Expand All @@ -37,20 +39,25 @@ def initialize project, credentials, host: nil, retries: nil,
@timeout = timeout
end

def creds
def channel
GRPC::Core::Channel.new host, nil, chan_creds
end

def chan_creds
return credentials if insecure?
GRPC::Core::ChannelCredentials.new.compose \
GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

def service
return mocked_service if mocked_service
@datastore ||= begin
require "google/datastore/v1/datastore_services_pb"

Google::Datastore::V1::Datastore::Stub.new(
host, creds, timeout: timeout)
end
@service ||= \
V1::DatastoreApi.new(
service_path: host,
channel: channel,
timeout: timeout,
app_name: "google-cloud-datastore",
app_version: Google::Cloud::Datastore::VERSION)
end
attr_accessor :mocked_service

Expand All @@ -62,99 +69,63 @@ def insecure?
# Allocate IDs for incomplete keys.
# (This is useful for referencing an entity before it is inserted.)
def allocate_ids *incomplete_keys
allocate_req = Google::Datastore::V1::AllocateIdsRequest.new(
project_id: project,
keys: incomplete_keys
)

execute { service.allocate_ids allocate_req }
execute { service.allocate_ids project, incomplete_keys }
end

##
# Look up entities by keys.
def lookup *keys, consistency: nil, transaction: nil
lookup_req = Google::Datastore::V1::LookupRequest.new(
project_id: project,
keys: keys
)
lookup_req.read_options = generate_read_options consistency,
transaction

execute { service.lookup lookup_req }
read_options = generate_read_options consistency, transaction

execute { service.lookup project, read_options, keys }
end

# Query for entities.
def run_query query, namespace = nil, consistency: nil, transaction: nil
run_req = Google::Datastore::V1::RunQueryRequest.new(
project_id: project)
if query.is_a? Google::Datastore::V1::Query
run_req["query"] = query
elsif query.is_a? Google::Datastore::V1::GqlQuery
run_req["gql_query"] = query
else
fail ArgumentError, "Unable to query with a #{query.class} object."
gql_query = nil
if query.is_a? Google::Datastore::V1::GqlQuery
gql_query = query
query = nil
end
run_req.read_options = generate_read_options consistency, transaction

run_req.partition_id = Google::Datastore::V1::PartitionId.new(
read_options = generate_read_options consistency, transaction
partition_id = Google::Datastore::V1::PartitionId.new(
namespace_id: namespace) if namespace

execute { service.run_query run_req }
execute do
service.run_query project,
partition_id,
read_options,
query: query,
gql_query: gql_query
end
end

##
# Begin a new transaction.
def begin_transaction
tx_req = Google::Datastore::V1::BeginTransactionRequest.new(
project_id: project
)

execute { service.begin_transaction tx_req }
execute { service.begin_transaction project }
end

##
# Commit a transaction, optionally creating, deleting or modifying
# some entities.
def commit mutations, transaction: nil
commit_req = Google::Datastore::V1::CommitRequest.new(
project_id: project,
mode: :NON_TRANSACTIONAL,
mutations: mutations
)
if transaction
commit_req.mode = :TRANSACTIONAL
commit_req.transaction = transaction
mode = transaction.nil? ? :NON_TRANSACTIONAL : :TRANSACTIONAL
execute do
service.commit project, mode, mutations, transaction: transaction
end

execute { service.commit commit_req }
end

##
# Roll back a transaction.
def rollback transaction
rb_req = Google::Datastore::V1::RollbackRequest.new(
project_id: project,
transaction: transaction
)

execute { service.rollback rb_req }
execute { service.rollback project, transaction }
end

def inspect
"#{self.class}(#{@project})"
end

##
# Performs backoff and error handling
def execute
require "grpc" # Ensure GRPC is loaded before rescuing exception
Google::Cloud::Core::GrpcBackoff.new(retries: retries).execute do
yield
end
rescue GRPC::BadStatus => e
raise Google::Cloud::Error.from_error(e)
end

protected

def generate_read_options consistency, transaction
Expand All @@ -170,6 +141,13 @@ def generate_read_options consistency, transaction
end
nil
end

def execute
require "grpc" # Ensure GRPC is loaded before rescuing exception
yield
rescue GRPC::BadStatus => e
raise Google::Cloud::Error.from_error(e)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Transaction < Dataset

##
# @private Creates a new Transaction instance.
# Takes a Connection and Service instead of project and Credentials.
# Takes a Service instead of project and Credentials.
def initialize service
@service = service
reset!
Expand Down
Loading

0 comments on commit 6aa2102

Please sign in to comment.