Skip to content

Commit

Permalink
feat: allow specific snapshot chunk ratio (latticexyz#212)
Browse files Browse the repository at this point in the history
* feat: allow specific snapshot chunk ratio

* feat(network): add snapshotNumChunks config option to setupMUDNetwork and SyncWorker

Co-authored-by: alvrs <alvarius@lattice.xyz>
  • Loading branch information
authcall and alvrs committed Oct 19, 2022
1 parent 4ae41e3 commit aa6e0d4
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 116 deletions.
1 change: 1 addition & 0 deletions packages/network/src/types.ts
Expand Up @@ -126,6 +126,7 @@ export type SyncWorkerConfig = {
fetchSystemCalls?: boolean;
cacheInterval?: number;
cacheAgeThreshold?: number;
snapshotNumChunks?: number;
};

export enum ContractSchemaValue {
Expand Down
8 changes: 6 additions & 2 deletions packages/network/src/workers/SyncWorker.ts
Expand Up @@ -184,8 +184,12 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig

if (syncFromSnapshot) {
this.setLoadingState({ state: SyncState.INITIAL, msg: "Fetching initial state from snapshot", percentage: 0 });
initialState = await fetchSnapshotChunked(snapshotClient, worldContract.address, decode, (percentage: number) =>
this.setLoadingState({ percentage })
initialState = await fetchSnapshotChunked(
snapshotClient,
worldContract.address,
decode,
config.snapshotNumChunks,
(percentage: number) => this.setLoadingState({ percentage })
);
} else {
this.setLoadingState({ state: SyncState.INITIAL, msg: "Fetching initial state from cache", percentage: 0 });
Expand Down
8 changes: 5 additions & 3 deletions packages/network/src/workers/syncUtils.ts
Expand Up @@ -110,16 +110,18 @@ export async function fetchSnapshotChunked(
snapshotClient: ECSStateSnapshotServiceClient,
worldAddress: string,
decode: ReturnType<typeof createDecode>,
setPercentage: (percentage: number) => void
numChunks = 10,
setPercentage?: (percentage: number) => void
): Promise<CacheStore> {
const cacheStore = createCacheStore();
const chunkPercentage = 100 / numChunks;

try {
const response = snapshotClient.getStateLatestStream({ worldAddress });
const response = snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
let i = 0;
for await (const responseChunk of response) {
await reduceFetchedState(responseChunk, cacheStore, decode);
setPercentage((i++ / 10) * 100);
setPercentage && setPercentage((i++ / numChunks) * 100);
}
} catch (e) {
console.error(e);
Expand Down
2 changes: 2 additions & 0 deletions packages/services/cmd/ecs-snapshot/main.go
Expand Up @@ -50,6 +50,7 @@ var (
initialSyncBlockBatchSize = flag.Int64("initial-sync-block-batch-size", 10, "Number of blocks to fetch data for when performing an initial sync")
initialSyncBlockBatchSyncTimeout = flag.Int64("initial-sync-block-batch-sync-timeout", 100, "Time in milliseconds to wait between calls to fetch batched log data when performing an initial sync")
initialSyncSnapshotInterval = flag.Int64("initial-sync-snapshot-interval", 5000, "Block number interval for how often to take intermediary snapshots when performing an initial sync")
defaultSnapshotChunkPercentage = flag.Int("default-snapshot-chunk-percentage", 10, "Default percentage for RPCs that request a snapshot in chunks. Default to 10, i.e. 10 percent chunks")
metricsPort = flag.Int("metrics-port", 6060, "Prometheus metrics http handler port. Defaults to port 6060")
)

Expand All @@ -68,6 +69,7 @@ func main() {
InitialSyncBlockBatchSize: *initialSyncBlockBatchSize,
InitialSyncBlockBatchSyncTimeout: time.Duration(*initialSyncBlockBatchSyncTimeout) * time.Millisecond,
InitialSyncSnapshotInterval: *initialSyncSnapshotInterval,
DefaultSnapshotChunkPercentage: *defaultSnapshotChunkPercentage,
}

// Parse world addresses to listen to.
Expand Down
15 changes: 10 additions & 5 deletions packages/services/pkg/grpc/snapshot.go
Expand Up @@ -50,16 +50,21 @@ func (server *ecsSnapshotServer) GetStateLatest(ctx context.Context, in *pb.ECSS
}

// GetStateLatestStream is a gRPC endpoint that returns the latest snapshot, if any, for a given
// WorldAddress provided via ECSStateRequestLatest. The snapshot is sent chunked via a stream over
// WorldAddress provided via ECSStateRequestLatestStream. The snapshot is sent chunked via a stream over
// the wire.
func (server *ecsSnapshotServer) GetStateLatestStream(in *pb.ECSStateRequestLatest, stream pb.ECSStateSnapshotService_GetStateLatestStreamServer) error {
func (server *ecsSnapshotServer) GetStateLatestStream(in *pb.ECSStateRequestLatestStream, stream pb.ECSStateSnapshotService_GetStateLatestStreamServer) error {
if !snapshot.IsSnaphotAvailableLatest(in.WorldAddress) {
return fmt.Errorf("no snapshot")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(in.WorldAddress)
// Respond in fraction chunks, e.g. in chunks of 10%.
chunkFraction := 10
latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkFraction)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if in.ChunkPercentage != nil {
chunkPercentage = int(*in.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReply{
Expand Down
2 changes: 2 additions & 0 deletions packages/services/pkg/snapshot/core.go
Expand Up @@ -27,6 +27,8 @@ type SnapshotServerConfig struct {
// snapshots when performing an initial sync. This is useful in case the snapshot service
// disconnects or fails while perfoming a lengthy initial sync.
InitialSyncSnapshotInterval int64
// Default to use when chunking snapshot to send snapshot in chunks over the wire.
DefaultSnapshotChunkPercentage int
}

// Start starts the process of processing data from an Ethereum client, reducing the ECS state, and
Expand Down
8 changes: 7 additions & 1 deletion packages/services/proto/ecs-snapshot.proto
Expand Up @@ -29,7 +29,7 @@ service ECSStateSnapshotService {
rpc GetStateLatest (ECSStateRequestLatest) returns (ECSStateReply) {}

// Requests the latest ECS state in stream format, which will chunk the state.
rpc GetStateLatestStream (ECSStateRequestLatest) returns (stream ECSStateReply) {}
rpc GetStateLatestStream (ECSStateRequestLatestStream) returns (stream ECSStateReply) {}

// Requests the latest block number based on the latest ECS state.
rpc GetStateBlockLatest (ECSStateBlockRequestLatest) returns (ECSStateBlockReply) {}
Expand All @@ -46,6 +46,12 @@ message ECSStateRequestLatest {
string worldAddress = 1;
}

// The request message for the latest chunked ECS state.
message ECSStateRequestLatestStream {
string worldAddress = 1;
optional uint32 chunkPercentage = 2;
}

// The request message for the latest block based on latest ECS state.
message ECSStateBlockRequestLatest {
string worldAddress = 1;
Expand Down

0 comments on commit aa6e0d4

Please sign in to comment.