Skip to content

Commit

Permalink
feat: v2 endpoint for pruned snapshot that returns entities as raw by…
Browse files Browse the repository at this point in the history
…tes (latticexyz#215)

* feat: v2 endpoint for pruned snapshot that returns entities as raw bytes

* feat(network): integrate getStateLatestStreamPrunedV2

* fix(utils): handle hex array with length 0

* fix: use hexutil.Decode

* fix: convert entity id uint8 array to string once

* chore: debug invalid entity index

* fix: use decodebig for parsing hex encoded entity id uint256

* feat(snapshot): add GetStateLatestStreamV2 endpoint

* fix(network): call getStateLatestStreamV2 if no prune options are provided

Co-authored-by: alvrs <alvarius@lattice.xyz>
  • Loading branch information
authcall and alvrs committed Oct 28, 2022
1 parent 445da7f commit 759f495
Show file tree
Hide file tree
Showing 8 changed files with 601 additions and 73 deletions.
48 changes: 40 additions & 8 deletions packages/network/src/workers/syncUtils.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { JsonRpcProvider } from "@ethersproject/providers";
import { EntityID, ComponentValue, Components } from "@latticexyz/recs";
import { to256BitString, awaitPromise, range } from "@latticexyz/utils";
import { to256BitString, awaitPromise, range, Uint8ArrayToHexString } from "@latticexyz/utils";
import { BytesLike, Contract, BigNumber } from "ethers";
import { Observable, map, concatMap, of, from } from "rxjs";
import { createDecoder } from "../createDecoder";
import { createTopics } from "../createTopics";
import { fetchEventsInBlockRange } from "../networkUtils";
import {
ECSStateReply,
ECSStateReplyV2,
ECSStateSnapshotServiceClient,
ECSStateSnapshotServiceDefinition,
} from "@latticexyz/services/protobuf/ts/ecs-snapshot/ecs-snapshot";
Expand All @@ -23,7 +24,6 @@ import { CacheStore, createCacheStore, storeEvent, storeEvents } from "./CacheSt
import { abi as ComponentAbi } from "@latticexyz/solecs/abi/Component.json";
import { abi as WorldAbi } from "@latticexyz/solecs/abi/World.json";
import { Component, World } from "@latticexyz/solecs/types/ethers-contracts";
import { SyncState } from "./constants";
import {
ECSStreamBlockBundleReply,
ECSStreamServiceClient,
Expand Down Expand Up @@ -80,6 +80,7 @@ export async function getSnapshotBlockNumber(
* @param worldAddress Address of the World contract to get the snapshot for.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving with {@link CacheStore} containing the snapshot state.
* @deprecated this util will be removed in a future version, use fetchSnapshotChunked instead
*/
export async function fetchSnapshot(
snapshotClient: ECSStateSnapshotServiceClient,
Expand Down Expand Up @@ -115,20 +116,24 @@ export async function fetchSnapshotChunked(
pruneOptions?: { playerAddress: string; hashedComponentId: string }
): Promise<CacheStore> {
const cacheStore = createCacheStore();
const chunkPercentage = 100 / numChunks;
const chunkPercentage = Math.ceil(100 / numChunks);

try {
const response = pruneOptions
? snapshotClient.getStateLatestStreamPruned({
? snapshotClient.getStateLatestStreamPrunedV2({
worldAddress,
chunkPercentage,
pruneAddress: pruneOptions.playerAddress,
pruneComponentId: pruneOptions.hashedComponentId,
pruneAddress: pruneOptions?.playerAddress,
pruneComponentId: pruneOptions?.hashedComponentId,
})
: snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
: snapshotClient.getStateLatestStreamV2({
worldAddress,
chunkPercentage,
});

let i = 0;
for await (const responseChunk of response) {
await reduceFetchedState(responseChunk, cacheStore, decode);
await reduceFetchedStateV2(responseChunk, cacheStore, decode);
setPercentage && setPercentage((i++ / numChunks) * 100);
}
} catch (e) {
Expand All @@ -145,6 +150,7 @@ export async function fetchSnapshotChunked(
* @param cacheStore {@link CacheStore} to store snapshot state into.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving once state is reduced into {@link CacheStore}.
* @deprecated this util will be removed in a future version, use reduceFetchedStateV2 instead
*/
export async function reduceFetchedState(
response: ECSStateReply,
Expand All @@ -161,6 +167,32 @@ export async function reduceFetchedState(
}
}

/**
* Reduces a snapshot response by storing corresponding ECS events into the cache store.
*
* @param response ECSStateReplyV2
* @param cacheStore {@link CacheStore} to store snapshot state into.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving once state is reduced into {@link CacheStore}.
*/
export async function reduceFetchedStateV2(
response: ECSStateReplyV2,
cacheStore: CacheStore,
decode: ReturnType<typeof createDecode>
): Promise<void> {
const { state, blockNumber, stateComponents, stateEntities } = response;
const stateEntitiesHex = stateEntities.map((e) => Uint8ArrayToHexString(e) as EntityID);
const stateComponentsHex = stateComponents.map((e) => to256BitString(e));

for (const { componentIdIdx, entityIdIdx, value: rawValue } of state) {
const component = stateComponentsHex[componentIdIdx];
const entity = stateEntitiesHex[entityIdIdx];
if (entity == undefined) console.log("invalid entity index", stateEntities.length, entityIdIdx);
const value = await decode(component, rawValue);
storeEvent(cacheStore, { type: NetworkEvents.NetworkComponentUpdate, component, entity, value, blockNumber });
}
}

/**
* Create a RxJS stream of {@link NetworkComponentUpdate}s by subscribing to a
* gRPC streaming service.
Expand Down
61 changes: 61 additions & 0 deletions packages/services/pkg/grpc/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"latticexyz/mud/packages/services/pkg/snapshot"
"latticexyz/mud/packages/services/pkg/utils"
pb "latticexyz/mud/packages/services/protobuf/go/ecs-snapshot"
)

Expand Down Expand Up @@ -124,3 +125,63 @@ func (server *ecsSnapshotServer) GetStateBlockLatest(ctx context.Context, in *pb
func (server *ecsSnapshotServer) GetStateAtBlock(ctx context.Context, in *pb.ECSStateRequestAtBlock) (*pb.ECSStateReply, error) {
return nil, fmt.Errorf("not implemented")
}

//
// V2 endpoints
//

func (server *ecsSnapshotServer) GetStateLatestStreamV2(in *pb.ECSStateRequestLatestStream, stream pb.ECSStateSnapshotService_GetStateLatestStreamV2Server) error {
if !snapshot.IsSnaphotAvailableLatest(in.WorldAddress) {
return fmt.Errorf("no snapshot")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(in.WorldAddress)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if in.ChunkPercentage != nil {
chunkPercentage = int(*in.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReplyV2{
State: snapshotChunk.State,
StateComponents: snapshotChunk.StateComponents,
StateEntities: utils.HexStringArrayToBytesArray(snapshotChunk.StateEntities),
StateHash: snapshotChunk.StateHash,
BlockNumber: snapshotChunk.EndBlockNumber,
})
}
return nil
}

func (server *ecsSnapshotServer) GetStateLatestStreamPrunedV2(request *pb.ECSStateRequestLatestStreamPruned, stream pb.ECSStateSnapshotService_GetStateLatestStreamPrunedV2Server) error {
if !snapshot.IsSnaphotAvailableLatest(request.WorldAddress) {
return fmt.Errorf("no snapshot")
}
if len(request.PruneAddress) == 0 {
return fmt.Errorf("address for which to prune for required")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(request.WorldAddress)
latestSnapshotPruned := snapshot.PruneSnapshotOwnedByComponent(latestSnapshot, request.PruneAddress)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if request.ChunkPercentage != nil {
chunkPercentage = int(*request.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshotPruned, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReplyV2{
State: snapshotChunk.State,
StateComponents: snapshotChunk.StateComponents,
StateEntities: utils.HexStringArrayToBytesArray(snapshotChunk.StateEntities),
StateHash: snapshotChunk.StateHash,
BlockNumber: snapshotChunk.EndBlockNumber,
})
}
return nil
}
19 changes: 19 additions & 0 deletions packages/services/pkg/utils/general.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
package utils

import (
"latticexyz/mud/packages/services/pkg/logger"

"github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
)

func Min(a, b int) int {
if a < b {
return a
}
return b
}

func HexStringArrayToBytesArray(strArray []string) [][]byte {
bytesArray := [][]byte{}
for _, str := range strArray {
entityIdBigInt, err := hexutil.DecodeBig(str)
if err != nil {
logger.GetLogger().Error("can't parse entity ID", zap.String("string", str), zap.Error(err))
}
bytesArray = append(bytesArray, entityIdBigInt.Bytes())
}
return bytesArray
}
18 changes: 18 additions & 0 deletions packages/services/proto/ecs-snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,19 @@ service ECSStateSnapshotService {
// Requests the latest ECS state in stream format, which will chunk the state.
rpc GetStateLatestStream (ECSStateRequestLatestStream) returns (stream ECSStateReply) {}

// Requests the latest ECS state in stream format, which will chunk the state.
//
// V2 version optimized to return entities as raw bytes.
rpc GetStateLatestStreamV2 (ECSStateRequestLatestStream) returns (stream ECSStateReplyV2) {}

// Requests the latest ECS state, with aditional pruning.
rpc GetStateLatestStreamPruned (ECSStateRequestLatestStreamPruned) returns (stream ECSStateReply) {}

// Requests the latest ECS state, with aditional pruning.
//
// V2 version optimized to return entities as raw bytes.
rpc GetStateLatestStreamPrunedV2 (ECSStateRequestLatestStreamPruned) returns (stream ECSStateReplyV2) {}

// Requests the latest block number based on the latest ECS state.
rpc GetStateBlockLatest (ECSStateBlockRequestLatest) returns (ECSStateBlockReply) {}

Expand Down Expand Up @@ -87,6 +97,14 @@ message ECSStateReply {
uint32 blockNumber = 5;
}

message ECSStateReplyV2 {
repeated ECSState state = 1;
repeated string stateComponents = 2;
repeated bytes stateEntities = 3;
string stateHash = 4;
uint32 blockNumber = 5;
}

message ECSStateBlockReply {
uint32 blockNumber = 1;
}
Loading

0 comments on commit 759f495

Please sign in to comment.