Skip to content

Commit

Permalink
Upgrade to 0.10.6 (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Mar 8, 2021
1 parent 2d8f465 commit 00d6b57
Show file tree
Hide file tree
Showing 23 changed files with 1,399 additions and 293 deletions.
54 changes: 45 additions & 9 deletions cupid/io/table/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import json
import logging
import time
Expand Down Expand Up @@ -41,7 +42,8 @@

class TableSplit(object):
__slots__ = '_handle', '_split_index', '_split_file_start', '_split_file_end', \
'_schema_file_start', '_schema_file_end'
'_schema_file_start', '_schema_file_end', '_meta_row_count', \
'_meta_raw_size'

def __init__(self, **kwargs):
if 'split_proto' in kwargs:
Expand All @@ -51,6 +53,12 @@ def __init__(self, **kwargs):
self._split_file_end = split_pb.splitFileEnd
self._schema_file_start = split_pb.schemaFileStart
self._schema_file_end = split_pb.schemaFileEnd

if kwargs.get('meta_proto'):
meta_pb = kwargs.pop('meta_proto')
self._meta_row_count = sum(bm.rowCount for bm in meta_pb.blockMeta)
self._meta_raw_size = sum(bm.rawSize for bm in meta_pb.blockMeta)

for k in self.__slots__:
if k in kwargs:
setattr(self, k, kwargs[k])
Expand Down Expand Up @@ -83,6 +91,14 @@ def schema_file_start(self):
def schema_file_end(self):
return self._schema_file_end

@property
def meta_row_count(self):
return self._meta_row_count

@property
def meta_raw_size(self):
return self._meta_raw_size

@property
def split_proto(self):
return subprocess_pb.InputSplit(
Expand All @@ -94,12 +110,12 @@ def split_proto(self):
)

def _register_reader(self):
controller = CupidRpcController()
channel = SandboxRpcChannel()
stub = subprocess_pb.CupidSubProcessService_Stub(channel)

req = subprocess_pb.RegisterTableReaderRequest(inputTableHandle=self._handle,
inputSplit=self.split_proto)
controller = CupidRpcController()
resp = stub.RegisterTableReader(controller, req, None)
if controller.Failed():
raise CupidError(controller.ErrorText())
Expand Down Expand Up @@ -343,7 +359,6 @@ def commit(self, overwrite=False):


def create_download_session(session, table_or_parts, split_size=None, split_count=None, columns=None):
controller = CupidRpcController()
channel = CupidTaskServiceRpcChannel(session)
stub = task_service_pb.CupidTaskService_Stub(channel)

Expand Down Expand Up @@ -383,32 +398,53 @@ def create_download_session(session, table_or_parts, split_size=None, split_coun
splitSize=split_size,
splitCount=split_count,
tableInputInfos=table_pbs,
allowNoColumns=True,
requireSplitMeta=True,
)

controller = CupidRpcController()
resp = stub.SplitTables(controller, request, None)
if controller.Failed():
raise CupidError(controller.ErrorText())
logger.info(
"[CupidTask] splitTables call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
session.lookup_name,
str(request), str(resp),
session.lookup_name, str(request), str(resp),
)
)
handle = resp.inputTableHandle

controller = CupidRpcController()
channel = SandboxRpcChannel()
stub = subprocess_pb.CupidSubProcessService_Stub(channel)

req = subprocess_pb.GetSplitsRequest(inputTableHandle=handle)
req = subprocess_pb.GetSplitsMetaRequest(
inputTableHandle=handle,
)
controller = CupidRpcController()
resp = stub.GetSplitsMeta(controller, req, None)
logger.info(
"[CupidTask] getSplitsMeta call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
session.lookup_name, str(request), str(resp),
)
)
if controller.Failed():
split_meta = itertools.repeat(None)
logger.warning('Failed to get results of getSplitsMeta, '
'may running on an old service')
else:
split_meta = resp.inputSplitsMeta

req = subprocess_pb.GetSplitsRequest(inputTableHandle=handle)
controller = CupidRpcController()
resp = stub.GetSplits(controller, req, None)
if controller.Failed():
raise CupidError(controller.ErrorText())

input_splits = []
for info in resp.inputSplits:
input_splits.append(TableSplit(split_proto=info, handle=handle, columns=columns))
for info, meta in zip(resp.inputSplits, split_meta):
input_splits.append(TableSplit(
split_proto=info, meta_proto=meta, handle=handle, columns=columns))
logger.info(
"[SubProcess] getSplits call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
Expand Down
43 changes: 27 additions & 16 deletions cupid/proto/cupid_subprocess_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ service CupidSubProcessService
// Moving the open&close TMP files into staging Directory for further ddl action
rpc CommitTableFiles(CommitTableFilesRequest) returns (CommitTableFilesResponse);

// Internal GetInformation Call
// Cupid Worker Directly talk to CupidTask
rpc InternalGetInformation(InternalGetInformationRequest) returns (InternalGetInformationResponse);
// Get Splits Meta
rpc GetSplitsMeta(GetSplitsMetaRequest) returns (GetSplitsMetaResponse);
}

message InputSplit
Expand All @@ -33,6 +32,31 @@ message InputSplit
optional uint64 splitFileEnd = 3;
optional uint64 schemaFileStart = 4;
optional uint64 schemaFileEnd = 5;
optional string project = 6;
optional string table = 7;
optional string partitionSpec = 8;
}

message BlockMeta
{
optional string fileName = 1;
optional uint64 rowCount = 2;
optional uint64 rawSize = 3;
}

message InputSplitMeta
{
repeated BlockMeta blockMeta = 1;
}

message GetSplitsMetaRequest
{
optional string inputTableHandle = 1;
}

message GetSplitsMetaResponse
{
repeated InputSplitMeta inputSplitsMeta = 1;
}

message GetSplitsRequest
Expand Down Expand Up @@ -93,16 +117,3 @@ message CommitTableFilesResponse
{

}

message InternalGetInformationRequest
{
optional string requestProtoBufStr = 1;
optional string projectName = 2;
optional string instanceId = 3;
}

message InternalGetInformationResponse
{
optional string responseProtoBufStr = 1;
}

0 comments on commit 00d6b57

Please sign in to comment.