Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions api/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,13 +14,15 @@ import (

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/deneb"
m "github.com/base/blob-archiver/api/metrics"
"github.com/base/blob-archiver/api/version"
"github.com/base/blob-archiver/common/storage"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
Expand Down Expand Up @@ -107,6 +110,7 @@ func NewAPI(dataStoreClient storage.DataStoreReader, beaconClient client.BeaconB
})

r.Get("/eth/v1/beacon/blob_sidecars/{id}", result.blobSidecarHandler)
r.Get("/eth/v1/beacon/blobs/{id}", result.blobsHandler)
r.Get("/eth/v1/node/version", result.versionHandler)

return result
Expand Down Expand Up @@ -264,3 +268,112 @@ func filterBlobs(blobs []*deneb.BlobSidecar, _indices []string) ([]*deneb.BlobSi

return filteredBlobs, nil
}

// filterBlobsByVersionedHashes filters sidecars by versioned hashes query parameter.
// Returns the filtered sidecars in the order they were requested, or all sidecars if no hashes provided.
func filterBlobsByVersionedHashes(sidecars []*deneb.BlobSidecar, _versionedHashes []string) ([]*deneb.BlobSidecar, *httpError) {
var versionedHashes []string
if len(_versionedHashes) == 0 {
return sidecars, nil
} else if len(_versionedHashes) == 1 {
versionedHashes = strings.Split(_versionedHashes[0], ",")
} else {
versionedHashes = _versionedHashes
}

// Build map of commitment hash -> sidecar for quick lookup
// CalcBlobHashV1 requires a sha256 hasher instance
hasher := sha256.New()
hashToSidecar := make(map[[32]byte]*deneb.BlobSidecar)
for _, sidecar := range sidecars {
hasher.Reset()
commitment := kzg4844.Commitment(sidecar.KZGCommitment)
vh := kzg4844.CalcBlobHashV1(hasher, &commitment)
hashToSidecar[vh] = sidecar
}

// Return sidecars in the order of requested hashes
filteredBlobs := make([]*deneb.BlobSidecar, 0, len(versionedHashes))
for _, hashStr := range versionedHashes {
hash := common.HexToHash(hashStr)
var versionedHash [32]byte
copy(versionedHash[:], hash[:])

if sidecar, ok := hashToSidecar[versionedHash]; ok {
filteredBlobs = append(filteredBlobs, sidecar)
}
}

return filteredBlobs, nil
}

// sidecarsToBlobs converts blob sidecars to a Blobs response by extracting only the blob data
func sidecarsToBlobs(sidecars []*deneb.BlobSidecar) v1.Blobs {
blobs := make(v1.Blobs, len(sidecars))
for i, sidecar := range sidecars {
blobs[i] = &sidecar.Blob
}
return blobs
}

// blobsHandler implements the /eth/v1/beacon/blobs/{id} endpoint, using the underlying DataStoreReader
// to fetch blobs instead of the beacon node. This endpoint serves blobs without KZG proofs.
// Filtering by versioned_hashes query parameter is supported (per EIP-4844).
func (a *API) blobsHandler(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
beaconBlockHash, err := a.toBeaconBlockHash(param)
if err != nil {
err.write(w)
return
}

result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash)
if storageErr != nil {
if errors.Is(storageErr, storage.ErrNotFound) {
errUnknownBlock.write(w)
} else {
a.logger.Info("unexpected error fetching blobs", "err", storageErr, "beaconBlockHash", beaconBlockHash.String(), "param", param)
errServerError.write(w)
}
return
}

sidecars := result.BlobSidecars.Data

// Filter by versioned_hashes query parameter (not indices)
filteredSidecars, err := filterBlobsByVersionedHashes(sidecars, r.URL.Query()["versioned_hashes"])
if err != nil {
err.write(w)
return
}

// Convert sidecars to blobs
blobs := sidecarsToBlobs(filteredSidecars)
responseType := r.Header.Get("Accept")

if responseType == sszAcceptType {
w.Header().Set("Content-Type", sszAcceptType)
res, err := blobs.MarshalSSZ()
if err != nil {
a.logger.Error("unable to marshal blobs to SSZ", "err", err)
errServerError.write(w)
return
}

_, err = w.Write(res)

if err != nil {
a.logger.Error("unable to write ssz response", "err", err)
errServerError.write(w)
return
}
} else {
w.Header().Set("Content-Type", jsonAcceptType)
err := json.NewEncoder(w).Encode(blobs)
if err != nil {
a.logger.Error("unable to encode blobs to JSON", "err", err)
errServerError.write(w)
return
}
}
}
135 changes: 132 additions & 3 deletions api/service/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestAPIService(t *testing.T) {
BeaconBlockHash: rootOne,
},
BlobSidecars: storage.BlobSidecars{
Data: blobtest.NewBlobSidecars(t, 2),
Data: blobtest.NewBlobSidecars(t, 2, nil),
},
}

Expand All @@ -89,7 +91,7 @@ func TestAPIService(t *testing.T) {
BeaconBlockHash: rootTwo,
},
BlobSidecars: storage.BlobSidecars{
Data: blobtest.NewBlobSidecars(t, 2),
Data: blobtest.NewBlobSidecars(t, 2, nil),
},
}

Expand All @@ -99,7 +101,7 @@ func TestAPIService(t *testing.T) {
BeaconBlockHash: rootThree,
},
BlobSidecars: storage.BlobSidecars{
Data: blobtest.NewBlobSidecars(t, 8), // More than 6 blobs
Data: blobtest.NewBlobSidecars(t, 8, nil), // More than 6 blobs
},
}

Expand Down Expand Up @@ -364,3 +366,130 @@ func TestHealthHandler(t *testing.T) {

require.Equal(t, 200, response.Code)
}

func TestBlobsHandlerJSON(t *testing.T) {
a, fs, _, cleanup := setup(t)
defer cleanup()

// Pre-populate storage with blob sidecars
testBlobs := blobtest.NewBlobSidecars(t, 3, nil)
data := storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
BlobSidecars: storage.BlobSidecars{
Data: testBlobs,
},
}
err := fs.WriteBlob(context.Background(), data)
require.NoError(t, err)

// Request blobs endpoint with JSON encoding
request := httptest.NewRequest("GET", "/eth/v1/beacon/blobs/"+blobtest.Five.String(), nil)
request.Header.Set("Accept", "application/json")
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, 200, response.Code)
require.Equal(t, "application/json", response.Header().Get("Content-Type"))

var blobs v1.Blobs
err = json.Unmarshal(response.Body.Bytes(), &blobs)
require.NoError(t, err)
require.Equal(t, len(testBlobs), len(blobs))

// Verify blob data matches
for i, blob := range blobs {
require.Equal(t, testBlobs[i].Blob, *blob)
}
}

func TestBlobsHandlerSSZ(t *testing.T) {
a, fs, _, cleanup := setup(t)
defer cleanup()

// Pre-populate storage with blob sidecars
testBlobs := blobtest.NewBlobSidecars(t, 2, nil)
data := storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
BlobSidecars: storage.BlobSidecars{
Data: testBlobs,
},
}
err := fs.WriteBlob(context.Background(), data)
require.NoError(t, err)

// Request blobs endpoint with SSZ encoding
request := httptest.NewRequest("GET", "/eth/v1/beacon/blobs/"+blobtest.One.String(), nil)
request.Header.Set("Accept", "application/octet-stream")
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, 200, response.Code)
require.Equal(t, "application/octet-stream", response.Header().Get("Content-Type"))

// Unmarshal SSZ response
blobs := v1.Blobs{}
err = blobs.UnmarshalSSZ(response.Body.Bytes())
require.NoError(t, err)
require.Equal(t, len(testBlobs), len(blobs))
}

func TestBlobsHandlerWithVersionedHashes(t *testing.T) {
a, fs, _, cleanup := setup(t)
defer cleanup()

// Pre-populate storage with blob sidecars
testBlobs := blobtest.NewBlobSidecars(t, 5, nil)
data := storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Four,
},
BlobSidecars: storage.BlobSidecars{
Data: testBlobs,
},
}
err := fs.WriteBlob(context.Background(), data)
require.NoError(t, err)

// Compute versioned hashes from commitments
hasher := sha256.New()
commitment0 := kzg4844.Commitment(testBlobs[0].KZGCommitment)
vh0 := kzg4844.CalcBlobHashV1(hasher, &commitment0)

hasher.Reset()
commitment2 := kzg4844.Commitment(testBlobs[2].KZGCommitment)
vh2 := kzg4844.CalcBlobHashV1(hasher, &commitment2)

// Request blobs endpoint with versioned_hashes filter
request := httptest.NewRequest("GET", fmt.Sprintf("/eth/v1/beacon/blobs/%s?versioned_hashes=%s&versioned_hashes=%s", blobtest.Four.String(), common.Hash(vh0).Hex(), common.Hash(vh2).Hex()), nil)
request.Header.Set("Accept", "application/json")
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, 200, response.Code)

var blobs v1.Blobs
err = json.Unmarshal(response.Body.Bytes(), &blobs)
require.NoError(t, err)
require.Equal(t, 2, len(blobs))
}

func TestBlobsHandlerNotFound(t *testing.T) {
a, _, _, cleanup := setup(t)
defer cleanup()

// Request blobs endpoint for non-existent block
request := httptest.NewRequest("GET", "/eth/v1/beacon/blobs/"+blobtest.Seven.String(), nil)
request.Header.Set("Accept", "application/json")
response := httptest.NewRecorder()

a.router.ServeHTTP(response, request)

require.Equal(t, 404, response.Code)
}
Loading
Loading