diff --git a/eth2/beacon/operations/attestation_pool.py b/eth2/beacon/operations/attestation_pool.py index 103dabfb48..2145842c36 100644 --- a/eth2/beacon/operations/attestation_pool.py +++ b/eth2/beacon/operations/attestation_pool.py @@ -1,7 +1,57 @@ +from typing import Tuple + +from eth_utils import ValidationError + +from eth2.beacon.state_machines.forks.serenity.block_validation import ( + validate_attestation_slot, +) from eth2.beacon.types.attestations import Attestation +from eth2.beacon.typing import CommitteeIndex, SigningRoot, Slot +from eth2.configs import Eth2Config from .pool import OperationPool +def is_valid_slot( + attestation: Attestation, current_slot: Slot, config: Eth2Config +) -> bool: + try: + validate_attestation_slot( + attestation.data.slot, + current_slot, + config.SLOTS_PER_EPOCH, + config.MIN_ATTESTATION_INCLUSION_DELAY, + ) + except ValidationError: + return False + else: + return True + + class AttestationPool(OperationPool[Attestation]): - pass + def get_valid_attestation_by_current_slot( + self, slot: Slot, config: Eth2Config + ) -> Tuple[Attestation, ...]: + return tuple( + filter( + lambda attestation: is_valid_slot(attestation, slot, config), + self._pool_storage.values(), + ) + ) + + def get_acceptable_attestations( + self, + slot: Slot, + committee_index: CommitteeIndex, + beacon_block_root: SigningRoot, + ) -> Tuple[Attestation, ...]: + return tuple( + filter( + lambda attestation: ( + beacon_block_root == attestation.data.beacon_block_root + and slot == attestation.data.slot + and committee_index == attestation.data.index + ), + self._pool_storage.values(), + ) + ) diff --git a/eth2/beacon/scripts/quickstart_state/keygen_100_validators.yaml b/eth2/beacon/scripts/quickstart_state/keygen_100_validators.yaml new file mode 100644 index 0000000000..d24cded37e --- /dev/null +++ b/eth2/beacon/scripts/quickstart_state/keygen_100_validators.yaml @@ -0,0 +1,199 @@ +- {privkey: '0x25295f0d1d592a90b333e26e85149708208e9f8e8bc18f6c77bd62f8ad7a6866', + pubkey: '0xa99a76ed7796f7be22d5b7e85deeb7c5677e88e511e0b337618f8c4eb61349b4bf2d153f649f7b53359fe8b94a38e44c'} +- {privkey: '0x51d0b65185db6989ab0b560d6deed19c7ead0e24b9b6372cbecb1f26bdfad000', + pubkey: '0xb89bebc699769726a318c8e9971bd3171297c61aea4a6578a7a4f94b547dcba5bac16a89108b6b6a1fe3695d1a874a0b'} +- {privkey: '0x315ed405fafe339603932eebe8dbfd650ce5dafa561f6928664c75db85f97857', + pubkey: '0xa3a32b0f8b4ddb83f1a0a853d81dd725dfe577d4f4c3db8ece52ce2b026eca84815c1a7e8e92a4de3d755733bf7e4a9b'} +- {privkey: '0x25b1166a43c109cb330af8945d364722757c65ed2bfed5444b5a2f057f82d391', + pubkey: '0x88c141df77cd9d8d7a71a75c826c41a9c9f03c6ee1b180f3e7852f6a280099ded351b58d66e653af8e42816a4d8f532e'} +- {privkey: '0x3f5615898238c4c4f906b507ee917e9ea1bb69b93f1dbd11a34d229c3b06784b', + pubkey: '0x81283b7a20e1ca460ebd9bbd77005d557370cabb1f9a44f530c4c4c66230f675f8df8b4c2818851aa7d77a80ca5a4a5e'} +- {privkey: '0x055794614bc85ed5436c1f5cab586aab6ca84835788621091f4f3b813761e7a8', + pubkey: '0xab0bdda0f85f842f431beaccf1250bf1fd7ba51b4100fd64364b6401fda85bb0069b3e715b58819684e7fc0b10a72a34'} +- {privkey: '0x1023c68852075965e0f7352dee3f76a84a83e7582c181c10179936c6d6348893', + pubkey: '0x9977f1c8b731a8d5558146bfb86caea26434f3c5878b589bf280a42c9159e700e9df0e4086296c20b011d2e78c27d373'} +- {privkey: '0x3a941600dc41e5d20e818473b817a28507c23cdfdb4b659c15461ee5c71e41f5', + pubkey: '0xa8d4c7c27795a725961317ef5953a7032ed6d83739db8b0e8a72353d1b8b4439427f7efa2c89caa03cc9f28f8cbab8ac'} +- {privkey: '0x066e3bdc0415530e5c7fed6382d5c822c192b620203cf669903e1810a8c67d06', + pubkey: '0xa6d310dbbfab9a22450f59993f87a4ce5db6223f3b5f1f30d2c4ec718922d400e0b3c7741de8e59960f72411a0ee10a7'} +- {privkey: '0x2b3b88a041168a1c4cd04bdd8de7964fd35238f95442dc678514f9dadb81ec34', + pubkey: '0x9893413c00283a3f9ed9fd9845dda1cea38228d22567f9541dccc357e54a2d6a6e204103c92564cbc05f4905ac7c493a'} +- {privkey: '0x2e62dbea7fe3127c3b236a92795dd633be51ee7cdfe5424882a2f355df497117', + pubkey: '0x876dd4705157eb66dc71bc2e07fb151ea53e1a62a0bb980a7ce72d15f58944a8a3752d754f52f4a60dbfc7b18169f268'} +- {privkey: '0x2042dc809c130e91906c9cb0be2fec0d6afaa8f22635efc7a3c2dbf833c1851a', + pubkey: '0xaec922bd7a9b7b1dc21993133b586b0c3041c1e2e04b513e862227b9d7aecaf9444222f7e78282a449622ffc6278915d'} +- {privkey: '0x15283c540041cd85c4533ee47517c8bb101c6207e9acbba2935287405a78502c', + pubkey: '0x9314c6de0386635e2799af798884c2ea09c63b9f079e572acc00b06a7faccce501ea4dfc0b1a23b8603680a5e3481327'} +- {privkey: '0x03c85e538e1bb30235a87a3758c5571753ca1308b7dee321b74c19f78423999b', + pubkey: '0x903e2989e7442ee0a8958d020507a8bd985d3974f5e8273093be00db3935f0500e141b252bd09e3728892c7a8443863c'} +- {privkey: '0x45a577d5cab31ac5cfff381500e09655f0799f29b130e6ad61c1eec4b15bf8dd', + pubkey: '0x84398f539a64cbe01cfcd8c485ea51cd6657b94df93ee9b5dc61e1f18f69da6ca9d4dba63c956a81c68d5d4d4277a60f'} +- {privkey: '0x03cffafa1cbaa7e585eaee07a9d35ae57f6dfe19a9ea53af9c37e9f3dfac617c', + pubkey: '0x872c61b4a7f8510ec809e5b023f5fdda2105d024c470ddbbeca4bc74e8280af0d178d749853e8f6a841083ac1b4db98f'} +- {privkey: '0x67496f1d63498dc62da0bf641f55125f6fc971ed1f08f7e9649e75709525fd55', + pubkey: '0x8f467e5723deac7659e1ca273e28410cbaa6d495ab66ae77014f4cd21c64b6b5ab9987c9b5537fe0279bd063fe609be7'} +- {privkey: '0x1e892380d153a5032cd54041b76de0a5f0f26dee3f189f829d5d33e720ba3934', + pubkey: '0x8dde8306920812b32def3b663f7c540b49180345d3bcb8d3770790b7dc80030ebc06497feebd1bcf017d918f00bfa88f'} +- {privkey: '0x5a6ca99e594d26a4c8268441dcdb261f00c63e653991bf77f3e6d661dd1d7a0c', + pubkey: '0xab8d3a9bcc160e518fac0756d3e192c74789588ed4a2b1debf0c78f78479ca8edb05b12ce21103076df6af4eb8756ff9'} +- {privkey: '0x31b5d11b313d1736237139f0c56c5503b9786ce425fbf514446e44152c794d26', + pubkey: '0x8d5d3672a233db513df7ad1e8beafeae99a9f0199ed4d949bbedbb6f394030c0416bd99b910e14f73c65b6a11fe6b62e'} +- {privkey: '0x46fbedc2776c0d5db0da0d74b0a6ca45940596db7754dd87f1dbeeac396bd707', + pubkey: '0xa1c76af1545d7901214bb6be06be5d9e458f8e989c19373a920f0018327c83982f6a2ac138260b8def732cb366411ddc'} +- {privkey: '0x2abf4b942eaef1bd2e92e98228890e50c408e54e0c7972c1ce67f60a5ae6fdc1', + pubkey: '0x8dd74e1bb5228fc1fca274fda02b971c1003a4f409bbdfbcfec6426bf2f52addcbbebccdbf45eee6ae11eb5b5ee7244d'} +- {privkey: '0x6327b1e58c41d60dd7c3c8b9634204255707c2d12e2513c345001d8926745eea', + pubkey: '0x954eb88ed1207f891dc3c28fa6cfdf8f53bf0ed3d838f3476c0900a61314d22d4f0a300da3cd010444dd5183e35a593c'} +- {privkey: '0x02a07f22259210b143686ee70a8dea2399ce18165fab780beaccdd486ddf12f4', + pubkey: '0xaf344fce60dbd5fb850070e6e76a065e1a32485245ef4f413135a86ae703da88407c5d01c71f6bb06a151ff96cca7191'} +- {privkey: '0x45113325259c7fad43feca5a07d1182e80d27dec21b069b7aee357965b07b947', + pubkey: '0xae241af60691fda1cf8ca44d49573c55818c53b6141800cca2d488b9a3fba71c0f869179fff50c084657831fbeb42bf4'} +- {privkey: '0x4894c61db725b9210c3acd58136797e7295d59b3a1497735fb59d5c5264bd89e', + pubkey: '0x96746aaba64dc87835ba709332f4d5d7837ada092b439c49d251aecf92aab5dc132e917bf6f59799bc093f976a7bc021'} +- {privkey: '0x392414fca0757c30af12c4a63afaeee64cf8a92254bd097ecd9c7696b333305a', + pubkey: '0xb9d1d914df3d4565465c3fd52b5b96e637f9980570cabf5b5d4aadf5a329ac36ad672819d997e735f5052e28b1f0c104'} +- {privkey: '0x3e1c4fb1a25381ad757a5a2c98a522d89c796e9ad009ef00c632efbc859a9623', + pubkey: '0x963528adb5322c2e2c54dc296ffddd2861bb103cbf64646781dfa8a3c2d8a8eda7079d2b3e95600028c44365afbf8879'} +- {privkey: '0x2799ceccbdaf1e36679b413193a363bfe6d2d35c8cf6ff6151165707461eaed7', + pubkey: '0xb245d63d3f9d8ea1807a629fcb1b328cb4d542f35a3d5bc478be0df389dddd712fc4c816ba3fede9a96320ae6b24a7d8'} +- {privkey: '0x27cf8e217d8481db8bc343bb6f5eb2993dc43743a2653e221df6db97be2cf004', + pubkey: '0xa98ed496c2f464226500a6ce04602ff9ef133ed6316f372f6c744aee165149f7e578b12780e0eacec307ae6907351d99'} +- {privkey: '0x16782f17ec7cdbc9973e86b179ba8d779afb8e6c28cd5b9caab657fe183f64c1', + pubkey: '0xae00fc3de831b09661a0ac02873c45c84cb2b58cffb6430a3f607e4c3fa1e0932397f11307cd169cdc6f79c463527260'} +- {privkey: '0x575ace3c2bf7a175b526d296e8b022357c7ceb8e799d1029d5b267d8598f449f', + pubkey: '0xa4855c83d868f772a579133d9f23818008417b743e8447e235d8eb78b1d8f8a9f63f98c551beb7de254400f89592314d'} +- {privkey: '0x2c083f2c8fc923fa2bd32a70ab72b4b46247e8c1f347adc30b2f8036a355086c', + pubkey: '0xa9cf360aa15fb1d1d30ee2b578dc5884823c19661886ae8b892775ccb3bd96b7d7345569a2aa0b14e4d015c54a6a0c54'} +- {privkey: '0x178676a9cb7a49e14facd1460d15cd18219c926df8491b118bceccbac3103328', + pubkey: '0xaef9162ee6f29ee82fbfe387756d84f9ac472eb8709217aaf28f5ef0ea273f6210e531496470b30d2b7747216e3672d5'} +- {privkey: '0x271625ab9d8ff3d0edddea855033c1d7921842bf39a424b99a6978c2a86b1d65', + pubkey: '0xb7e6e187ed813d950a9a17d1e70c03e4de2903596c4c5ff326848515c985deee38198efebc265300cd4f1d6bd7b5d264'} +- {privkey: '0x03549ec8a97668607b66b4d211345c1deb974ee4c6c55e26f98ee70059381bcb', + pubkey: '0x81054bd51ce57a8415f0c8e0f2fbf94f5a8464552baa33263c20a4da062e5ed994a4d32c171106d2008cd063f48f6fe2'} +- {privkey: '0x4f4c9809b9e51b656ce9083ac5cf551986a9891395156d3e8effc125b3e2b015', + pubkey: '0xaecc56f2b1c4011d450214d3e1254479d583a6a5c2c06fbc049512731f76227d140df9f36a3f76b4ccb4df1342403573'} +- {privkey: '0x6b0a7df36e52c9f48f814305c4d66849862e73f69bef9e7b78ddd873905de882', + pubkey: '0x9243ef5ed3bd28892d1ef4f7aaf29faeb9c0e725673cd38e308bd756f20a9ee09de5cd9822e5e77bd03b734ef8a92695'} +- {privkey: '0x5cc36b430ddf7f0f7ed6f97460e1aa2e5e5ff14f47d8f205b8fd50a2bab8ea0b', + pubkey: '0x925b1fb57c06b5668567bd5aa196531032d6f8918dd4f702017c11b59288e3bdb98e3820ac22780f73580a4119de4bbc'} +- {privkey: '0x0a5c398398c00865a46863d12f31d61a2ffbf4e22155741fce6959c815e9abd4', + pubkey: '0x9648b83a4f09b4ca2021f0c193c5c41df1465715761bca52671ca790a3e92d67686b97b3d54c6110409779df887bd9c6'} +- {privkey: '0x37e63244dc9308ed78074cf7f2a2172c05ee5a9959b87b18b6716de78675d2a3', + pubkey: '0xa34febc12af07316580b480364f90a76313ccce7927bbe263e27ea270853b02ad4d1428caf55363f3ebebac622cb9fd6'} +- {privkey: '0x27a4c9d59b290cab5451cb80ffee9cf80240d55d18d9ebb4fbca13a1a9814db7', + pubkey: '0xb8cd1cef89aa1567a6058957442a698cf1b267130606f749451152959a5dfb50d243890d4adc2c3309f7696d54af1260'} +- {privkey: '0x4157396c007870224940ee87f1bd0f30ea79e3f9593aab1fa9ad76504cbbf312', + pubkey: '0x92a93728c252a45ef587ca53a037593912599d82e2b8aa1b734b99d500a0ac8c142092ea8b3c2c34a28dc8ddf337a249'} +- {privkey: '0x5ad466a0839a7281813355ace68ee2f429a4ab845b0bd3ffb09532428a24fa6b', + pubkey: '0xb7ee0ef26144de04d9cc80864b869b7ecafbf1b7c0050403cc3c3b514368713b8bb708c464568a18c837e1fd21d09063'} +- {privkey: '0x0ee9b0ad9813c67fc03c881f6d57d90277d32ad16b5bd291062022fa3c516965', + pubkey: '0xafc0fa2ed6a270de6122a19d4600380b7f9b5e974d16f095f1702f55792ecab0128b155a69f17ad64a6de0a7063642ec'} +- {privkey: '0x52425f5eb9fb138f0dc65e7b1109fda1e650169f3bda1a0dfe99b4d6c0d95260', + pubkey: '0xa5869ba554d1432b09ee677c117511291b9901f169e870831f457caa6ccfab376cb1fe33813bdb495cf4afec9ea35fdf'} +- {privkey: '0x46163dcef9972c338569240f57d78b084ef236164888f4b71d2ba69f5eed158c', + pubkey: '0x92f43d79d9f488010b310a54f3fc2e7f4be191ca06d93e588c30c8abf59a52190e060b285ac626eb13cd95bbcc3a0a2a'} +- {privkey: '0x015df385e001203ead487b94a70ec856e4d99daff72dcace6935fca9611b21be', + pubkey: '0x9698d9519a02b64f230e5a2520401799c2ca7d69ab23a6d9817943147264bf00d409264b928718245efff4f7ee97dd5c'} +- {privkey: '0x6dd12d588d1c05ba40e80880ac7e894aa20babdbf16da52eae26b3f267d68032', + pubkey: '0xa852816b8e463178eea5acebb4b86d0acb6d8c6812cf313296bd271ea4d2fd89d281e5fc296df4df49019169bdf96922'} +- {privkey: '0x48f140a19ff1a607157319b01518ef2e2f1e123ad0b04568b501ece30abe5c25', + pubkey: '0x8a298ee1ac0466ecaa04d5798048c6e192409af63217f32fd7e07794cfcdcd8deca055b9782dd1ad45a578a9ec10606c'} +- {privkey: '0x39ac35d1ec70fc7a972a9971257f105a4621d92ee67d28a90559e9f34deeb5ba', + pubkey: '0xae4d49364e4a36760cc74a675500055b9aed99bc19d31abb953ea156bb5a76dcf36769d15341b850114a30ffc8057780'} +- {privkey: '0x05b1b035d3cdcb6daae3559e8f4b642eb0295271ec5e21815797e522669d9c42', + pubkey: '0xb397692ccbf442bfe078174c85dbad7fd605e4ff1caf2904b31e4a4c79d6444813ad9b2093ac8fbd4dd59ec7a4c8c006'} +- {privkey: '0x05a4ab58f5631cc428686ec91c0c032753a1aa5231f509b7e405396d53b2178d', + pubkey: '0x87c9f7605d07550b46c79add5ea4e39de5014c03833669257bd6666b7ec838f53800104779940d8cdd884275a0f6a3ef'} +- {privkey: '0x4d943d10c772529ec625643a4af1730d669a323a393d7887680bd4e79d33c2d7', + pubkey: '0xb08f7feb86786c37661afb9951a959c9b465fd11ca98fcbc908fcf49144084051f6c363e2eb4459da2c2d03d84175692'} +- {privkey: '0x12a9900709b897ef39a073e15eda05ff7d6a5b1194fb98f4dabcbc515f9211a4', + pubkey: '0xa48cc260df1df875176cb17493a5b53d669c091da74d5075acb8952a641b1b7ef68d01f009c1a365d2fa80937c79dd6b'} +- {privkey: '0x20b4f56dfc0a8d1583331501baeb1905d5db9a7cbff7448daf988d916110b70c', + pubkey: '0xac9f4df3f20a16a9fefad08817fcbc9a6ee17f7512db006414b4aa6f234c2313585ef72c5776df55fa6284af4bc3f631'} +- {privkey: '0x17e5bf33a71ca12d6bf386334a05e5455ef7d0ff8bcbde8a34fdb89423cbeeb7', + pubkey: '0x94f0c8535601596eb2165adb28ebe495891a3e4ea77ef501e7790cccb281827d377a5a8d4c200e3595d3f38f8633b480'} +- {privkey: '0x2e1e774ca15f348968c0672b179b43bc50b98dca17c2215eac33513c7d970bbf', + pubkey: '0xb5bb0162a4f27d1bab4c7dc3d20f5a75d6ee98c56bcd309a1f0f307685ad47ffb8a35bfdf8431b9b954b59662a74c478'} +- {privkey: '0x2847a370e2c5fa8d377d5a2394b469109e937c0dbe50143ead86a5c2254aa873', + pubkey: '0x8826e820179fd321819e78ffee16f50ac528db2da71ad8c269f60b878bc4887c79c0545b3d750e86e490d5ba9083cb70'} +- {privkey: '0x45662b78ca207899aba931a3cde6e8b09d5b5fee8eba985fb4a93c0d05e03ec2', + pubkey: '0x92977e71396633d442f61e16a0cfcf8ffad0af93c9f1b7fdf4f7ccb816de052925fc192922d6252d325ef9fa2e0595d2'} +- {privkey: '0x363649c297fee725b9f601aa4f1b4c8df2ca728f0629313e48f471aaeb11ee1a', + pubkey: '0x91ae4686b0d20470409f020eaca826c3efc6c1926ed25d05e6f0f7916391ec89c2341917277c437ac8fffffe94b68111'} +- {privkey: '0x6eeb32dd0fe010051825e3ef409b1a7c66ff6daa9c61eb351c5d760684de8e6a', + pubkey: '0x8a0d241955104bedacb3b829162f2b457915c2beb9018ede8ef8ea80f401b471c42354358da9e62b51c38d54263a78a9'} +- {privkey: '0x2660f3d479c1708e620a11c91b101e87df87551a6bbf88635fc7d6cfa130f173', + pubkey: '0x80a2be2c7dbce8ddc2eba03522697587c375a5a9e92d4b31ed9e3c34bee047095d93e3c70b1662b3faa301f5b19978e5'} +- {privkey: '0x6edac0cf64bfd91bc691c4165efe1eb5cf80672ac06d2096f72a48a5dad4bd', pubkey: '0x86a73886aa0114bbdbba346cb7c07376c81b549a4802c24d98ebbc54a6a1b5d2ac874ef657cfb27c3644fcb85f97a2b5'} +- {privkey: '0x64e58b50d9d846d40a650337f0f38e36811af7c2bc4868c752edaa74793fce93', + pubkey: '0xa98c264dfc3bc3ed635df5dbfd54909e77600cd68480ec201d9f5c416580591daaa9735b04743e10e7fc6370a8189775'} +- {privkey: '0x55318f8f4b4c6455b9988c880b8698e6fb49702cfc34813fc2a2c862bf717303', + pubkey: '0x8bb7aa61aa8bbd2b7825d28c340da89b625381232dcf2742276b4e3a2e4a0f42ef68794fdf005d94014636732fba2f40'} +- {privkey: '0x5754d71500f0fe33cbc9d5b322b2a9f84101869d2d984696e93c335508744064', + pubkey: '0x8bb9e1693eab1496d7583bf22fb1f2a475934c63b4d94118940617aa187bc277f738223e0ec1ce8a5566035d9bcc5470'} +- {privkey: '0x31dbe97b2f57d7309c06540909b9c77aa52dc1795130e4b1ad34b66e6202eff6', + pubkey: '0xafe6eface52fb6de91055a81abf9aa6e42ce2ef36fd8ae0d09aec6e5d8bd40a065dfccda6104af94df3f7a5854559ef4'} +- {privkey: '0x2cc88587046359f41649b4e3bd10b236c5acde1202e547f2db56c47dca103008', + pubkey: '0xaa241b2afbb33f92a5d281aec9c8bac8997c1dddc051455fc0f334de48320f160b5029b552495aed21ed9ce252aab499'} +- {privkey: '0x02011606373959d9a2242d30aa53f8954db1960fc87ebee3475ccaa5f87c94f6', + pubkey: '0x974b2aed17665e51c1c091998ca9649875330947de3d2733a5bd2eda69b0c593cdac2e416993a87f9a17aec1ccdc2368'} +- {privkey: '0x09b2ff66b54529a7e98ce54db6dd36a465992727ea797a0e148ef43c6562d138', + pubkey: '0xa3177a98f653cea646f525f0f13348efb27e0d3d0cd824704c91d8d959096d259c9e577298f444acc629920c9619be50'} +- {privkey: '0x17874d3f4d7c1bb5dd1893dd73233cd222dbdca0e381557cdd0f38b88f4f6afd', + pubkey: '0xa8a18565733e70663c77bc0c80e08f50de908cc048152f1e7dae85d8cc218afbdd337d7d33a44e25400be2f06907c64a'} +- {privkey: '0x13dfa8f7b9de6932f65ec6b48e47d4f070997e456c0519be24eec69cd33ff69d', + pubkey: '0x902ff56a7a4c5b6cc57708ea7b0b72cb54e4b821c95373f503648185f15208f6ca6281677fa0ecc14f911d7b7ca04f4e'} +- {privkey: '0x011c96ea072abedcb063f9098823d3cf3141443625d5d6088e9f0cea9867cda2', + pubkey: '0x98f011f9a4dff94eb0352ff6e21b7df45e2a112bd5d789b5729111b89b368e7ed554e4d1c16b72f4d105090173cafed2'} +- {privkey: '0x33c10e17c93e4498b21ccdf965f4c32c136cbab58478398d658974ea3214f8b1', + pubkey: '0xabef42538a17a55804b634aac9d211b92b5768c4cc1263342ca287323bb3d5c768080451d1b5d652e9f8646fbb35f57c'} +- {privkey: '0x6f3c66f6b767ef8179cdee041e4b8bbf275bb536e33405867c6a3feda42d8163', + pubkey: '0xa8e3c2d3ac4e0e3c83380577ff7b7b5b2a98571e0d04ddebc0a6c472ce3bc5cc6a6733be728a0ee17da74b7691d2679d'} +- {privkey: '0x1565241e323a6ea930a89f9a7439cda5dd2f0dde9e0e954f5275a7e282c95e78', + pubkey: '0x98f620aadc4e58392b5b583fed96c452b54c39ba3a9fe8c277f625fae7e1317d034f732995fd88c1461463edd0f2b86d'} +- {privkey: '0x060bf14fd937e1879965f0112e51f23e6d0e0c9754e54365ee7dc5469f20cff8', + pubkey: '0xa7f5d408af436d71ec7acfe9a4592679649d326c00ac92c6f3332423be30c3601d232f265078f1f2a5d6d6cde08de7d7'} +- {privkey: '0x47bdd693031adba4aea3f633ed2feca9481dcba764584cd2d4c1a6891c275007', + pubkey: '0xa8be337b3d0e6be415dcb037b246831f9966aacef62b69d6b609e4ff8208bc536c6473bc9fe9e3bec9a8665c8caa05c5'} +- {privkey: '0x667b5769b27672cacd415ad77b9ba3ba1d022c893c6e2f0ab8e24205bf6cf5d0', + pubkey: '0x93bb1c86717fa7303f65cb8c45c9fcc8fecb88428b7cd1dd59967a132109c25ab5c97888e46c5d471ff911c573f45a34'} +- {privkey: '0x466623e36f41384901c45b651c92acbfcce03b3f4cf7f897749d4bf83ea910c3', + pubkey: '0x815042c33c1a43c1ee58a58ee074bc93a13c23a035dedee6879730220379d0c03ff4a3829240b6c34e56feb55cd322df'} +- {privkey: '0x3b44858957059c54511242fdf54643d7893220c572606e53d24ab929fc384b34', + pubkey: '0x8be11e9ead2e1bb5be7e2ec066ff83589558a5d9373666b3fc518a6a6639b3baecb87f8f34895f63e8d09d270d93ce04'} +- {privkey: '0x1dc3883e9afe3e0fe97bfd6071d1963cbf9f445605008ad769464529c9fc02ef', + pubkey: '0x8bf2630491d2a480ec243b00d65d76e69615e67d3df5d8c14ca7506edd8e896a9083e8ee9e4129af0f6d896a3225c08c'} +- {privkey: '0x0805ed1790e3dd052b756bdb5eac7a965be25a597d14bdc668aa7c6d6f1fab5c', + pubkey: '0x914b56f41c411fbfca9dc9763f44daf253c103b162457d07954fd0af768b5e74692b4639c22455fb81d71f7ed6144514'} +- {privkey: '0x4ddd7598823c542c2feacfea3e716ce0e36bfaaf3c6c71580517cbae66257aaf', + pubkey: '0x8794388915e86e4988363cdd4289ad19182209c873cbbbf5a80ff5c99f93acb839807787a77ad2b603f074405d7ed08b'} +- {privkey: '0x5470813f7deef638dc531188ca89e36976d536f680e89849cd9077fd096e20bc', + pubkey: '0xa3862121db5914d7272b0b705e6e3c5336b79e316735661873566245207329c30f9a33d4fb5f5857fc6fd0a368186972'} +- {privkey: '0x4f7bca4e548132f40dd4586ae016897603b5871b6f4d01ca4d395e194f5a9f84', + pubkey: '0x96ef954b331a534199f4f113d993a50ec7a781fc5aa2a181ea0bdbfd4c5c557abfebfcc02604d5aef52ba64afbe0ff18'} +- {privkey: '0x535d4992cc0d0de81761dd9b9e14578a55e6d259c3b512c2a3fe2d2853d6005f', + pubkey: '0x96c8d3dd08724624017f178393d176b425dab9dfa1cc3f62c7669337446baa601e0aa261c00c76bde07ba9a1a3582c0a'} +- {privkey: '0x15aea41481c6fdea498c16ada8c2c258a3f1673a1207daebce4e4593811fc639', + pubkey: '0x92bd81b8e9099b9ca87a2033fdd84475752dc34a0fae0a8e50aabf4d3baff9cd45ed56508c837023944350f53dbc4ac7'} +- {privkey: '0x46f045b904466b25a95612a5343cf8185239a9ed2cd8954b033cad3f70b0ea9c', + pubkey: '0x83802cd575a3cea7e3e38fc1a73d94a9e4fdb999b8494e7929309c009d79a23edb1ba091ac02588f130e0585fb106540'} +- {privkey: '0x73158319c625231bc6a0ba2ba4c0df7a27f9230ed22b58a69de0d81fb0d404ad', + pubkey: '0xb451eb0ff4990917aba6e3d80c34aee91ea1ce49053f38ae174cef107cb9acc595d0ca3fefcb804c9dd04510c630cabe'} +- {privkey: '0x1c9cb815d122bf7160af9bc86743e27f9caee9a52bb0fb5a552f03319207f303', + pubkey: '0xa7f711233af57440e9ea700113fc4dbaef97e7da7741dd2e38ae668a7f2685d4585d54a9e6712ff1b87c69dbb181abf7'} +- {privkey: '0x5f2613ef4d3da2b3ee86567ce526a30d235a1d02199a8f17a285ab865e292d75', + pubkey: '0xaca5e4979f281b5ab0ea0f549d6dcc34989607c335e94efedeffc7e73b393f42c7b11d76144a750f82600b21d10b6777'} +- {privkey: '0x1562a8a05beafc2b20925aad11db32708ea3a6e89c2bdf18cac691caf88f4f40', + pubkey: '0x984620db3658a19769475080998db9e7f5bcd4255a89a70b5ecf7db01226f213836d091a3b37eb96e4937966b094a291'} +- {privkey: '0x5c66d8cb61df373d8798150bd5e933492b45afa4cb8f655ffbc25ccc4e1ca70a', + pubkey: '0x8f1ef3639aea57fef705847e251b785bb608a848f42d9107c494cbc696be35642f6552fb83174ca2e73632568a5667f4'} +- {privkey: '0x4d0aaa64c8692e125bb1437e2b28ca97ae8724e8415a767e3affd9c3e8c72dcb', + pubkey: '0x8967da3c8071ba2bf632cd40ae08fbbf0a203c47c02af1948fc232a7a743c0c0cfbe51606b89f102f2f6de7f039fb155'} +- {privkey: '0x4708d70df024dd5263bf45c5ade217924755190486625a2c222bd9903686cec7', + pubkey: '0x8d58f7e2e58471b46d20a66a61f4cde3c78ab6c0505517c615e08d8ef5adf59b65fa2b01ea2395c84584a6f10d6cee2f'} +- {privkey: '0x289fd7f7e6d7ded306e85f88fd22216212faf047b2c94d5ac829966d58fe0cc9', + pubkey: '0x8db9f236d3483af79703244c7034b5267a0546c3c840d4e91fdcdd466373d62d960553982225ca5f7666dd7375a29c19'} +- {privkey: '0x3d55976d72d9f61b3d08620f1bdbd9b9d219cdd84d2fae829cbd7dfafdc91e8d', + pubkey: '0xb7721412ae5a793f34ac8866698b221c67ef8272eba44d3030512ec3f7ed8ffcb620b58f17809690d5276423e849827f'} +- {privkey: '0x2404bc074be939b1d0c3b0a86cc7497d930e5f7c1077cb3a9762840faad60eb3', + pubkey: '0x99f6e5b80dc52407f0436d3474bd5da5ff23a19cb188b933af6312d9793cbfd54f9e72596c5d481a1ed8d705b81c1f0e'} diff --git a/eth2/beacon/tools/builder/aggregator.py b/eth2/beacon/tools/builder/aggregator.py index 389942da82..c281ab7cb0 100644 --- a/eth2/beacon/tools/builder/aggregator.py +++ b/eth2/beacon/tools/builder/aggregator.py @@ -1,20 +1,32 @@ +from typing import Sequence + from eth_typing import BLSSignature +from eth_utils import ValidationError from ssz import get_hash_tree_root, uint64 from eth2._utils.bls import bls from eth2._utils.hash import hash_eth2 +from eth2.beacon.attestation_helpers import ( + validate_indexed_attestation_aggregate_signature, +) from eth2.beacon.committee_helpers import get_beacon_committee +from eth2.beacon.epoch_processing_helpers import ( + get_attesting_indices, + get_indexed_attestation, +) from eth2.beacon.helpers import compute_epoch_at_slot, get_domain from eth2.beacon.signature_domain import SignatureDomain +from eth2.beacon.types.aggregate_and_proof import AggregateAndProof +from eth2.beacon.types.attestations import Attestation from eth2.beacon.types.states import BeaconState -from eth2.beacon.typing import CommitteeIndex, Slot +from eth2.beacon.typing import Bitfield, CommitteeIndex, Slot from eth2.configs import CommitteeConfig # TODO: TARGET_AGGREGATORS_PER_COMMITTEE is not in Eth2Config now. TARGET_AGGREGATORS_PER_COMMITTEE = 16 -def slot_signature( +def get_slot_signature( state: BeaconState, slot: Slot, privkey: int, config: CommitteeConfig ) -> BLSSignature: """ @@ -54,3 +66,123 @@ def is_aggregator( committee = get_beacon_committee(state, slot, index, config) modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE) return int.from_bytes(hash_eth2(signature)[0:8], byteorder="little") % modulo == 0 + + +def get_aggregate_from_valid_committee_attestations( + attestations: Sequence[Attestation] +) -> Attestation: + """ + Return the aggregate attestation. + + The given attestations SHOULD have the same `data: AttestationData` and are valid. + """ + signatures = [attestation.signature for attestation in attestations] + aggregate_signature = bls.aggregate_signatures(signatures) + + all_aggregation_bits = [ + attestation.aggregation_bits for attestation in attestations + ] + aggregation_bits = tuple(map(any, zip(*all_aggregation_bits))) + + assert len(attestations) > 0 + + return Attestation.create( + data=attestations[0].data, + aggregation_bits=Bitfield(aggregation_bits), + signature=aggregate_signature, + ) + + +# +# Validation +# + + +def validate_aggregate_and_proof( + state: BeaconState, + aggregate_and_proof: AggregateAndProof, + attestation_propagation_slot_range: int, + config: CommitteeConfig, +) -> None: + """ + Validate aggregate_and_proof + + Reference: https://github.com/ethereum/eth2.0-specs/blob/master/specs/networking/p2p-interface.md#global-topics # noqa: E501 + """ + attestation = aggregate_and_proof.aggregate + + validate_attestation_propagation_slot_range( + state, attestation, attestation_propagation_slot_range + ) + + attesting_indices = get_attesting_indices( + state, attestation.data, attestation.aggregation_bits, config + ) + if aggregate_and_proof.aggregator_index not in attesting_indices: + raise ValidationError( + f"The aggregator index ({aggregate_and_proof.aggregator_index}) is not within" + f" the aggregate's committee {attesting_indices}" + ) + + if not is_aggregator( + state, + attestation.data.slot, + attestation.data.index, + aggregate_and_proof.selection_proof, + config, + ): + raise ValidationError( + f"The given validator {aggregate_and_proof.aggregator_index}" + " is not a selected aggregator" + ) + + validate_aggregator_proof(state, aggregate_and_proof, config) + + validate_attestation_signature(state, attestation, config) + + +def validate_attestation_propagation_slot_range( + state: BeaconState, + attestation: Attestation, + attestation_propagation_slot_range: int, +) -> None: + if ( + attestation.data.slot + attestation_propagation_slot_range < state.slot + or attestation.data.slot > state.slot + ): + raise ValidationError( + "attestation.data.slot should be within the last" + " {attestation_propagation_slot_range} slots. Got" + f" attestationdata.slot={attestation.data.slot}," + f" current slot={state.slot}" + ) + + +def validate_aggregator_proof( + state: BeaconState, aggregate_and_proof: AggregateAndProof, config: CommitteeConfig +) -> None: + slot = aggregate_and_proof.aggregate.data.slot + pubkey = state.validators[aggregate_and_proof.aggregator_index].pubkey + domain = get_domain( + state, + SignatureDomain.DOMAIN_BEACON_ATTESTER, + config.SLOTS_PER_EPOCH, + message_epoch=compute_epoch_at_slot(slot, config.SLOTS_PER_EPOCH), + ) + message_hash = get_hash_tree_root(slot, sedes=uint64) + + bls.validate( + message_hash=message_hash, + pubkey=pubkey, + signature=aggregate_and_proof.selection_proof, + domain=domain, + ) + + +def validate_attestation_signature( + state: BeaconState, attestation: Attestation, config: CommitteeConfig +) -> None: + indexed_attestation = get_indexed_attestation(state, attestation, config) + validate_indexed_attestation_aggregate_signature( + state, indexed_attestation, config.SLOTS_PER_EPOCH + ) diff --git a/eth2/beacon/tools/builder/proposer.py b/eth2/beacon/tools/builder/proposer.py index 35ad8f75a0..2eea434ba9 100644 --- a/eth2/beacon/tools/builder/proposer.py +++ b/eth2/beacon/tools/builder/proposer.py @@ -82,6 +82,9 @@ def create_block_on_state( parent_block=parent_block, block_params=FromBlockParams(slot=slot) ) + # MAX_ATTESTATIONS + attestations = attestations[: config.MAX_ATTESTATIONS] + # TODO: Add more operations randao_reveal = _generate_randao_reveal(privkey, slot, state, config) eth1_data = state.eth1_data diff --git a/eth2/beacon/tools/factories.py b/eth2/beacon/tools/factories.py index 1988b3a876..3c251a16f8 100644 --- a/eth2/beacon/tools/factories.py +++ b/eth2/beacon/tools/factories.py @@ -46,9 +46,13 @@ def _create( is disabled. """ override_lengths(cls.config) + if "num_validators" in kwargs: + num_validators = kwargs["num_validators"] + else: + num_validators = cls.num_validators if kwargs["genesis_state"] is None: - keymap = mk_keymap_of_size(cls.num_validators) + keymap = mk_keymap_of_size(num_validators) genesis_state, genesis_block = create_mock_genesis( config=cls.config, pubkeys=tuple(keymap.keys()), diff --git a/eth2/beacon/types/aggregate_and_proof.py b/eth2/beacon/types/aggregate_and_proof.py index 857ea23f7d..539b2e9866 100644 --- a/eth2/beacon/types/aggregate_and_proof.py +++ b/eth2/beacon/types/aggregate_and_proof.py @@ -16,27 +16,29 @@ class AggregateAndProof(HashableContainer): fields = [ - ("index", uint64), - ("selection_proof", bytes96), + ("aggregator_index", uint64), ("aggregate", Attestation), + ("selection_proof", bytes96), ] @classmethod def create( cls: Type[TAggregateAndProof], - index: ValidatorIndex = default_validator_index, - selection_proof: BLSSignature = EMPTY_SIGNATURE, + aggregator_index: ValidatorIndex = default_validator_index, aggregate: Attestation = default_attestation, + selection_proof: BLSSignature = EMPTY_SIGNATURE, ) -> TAggregateAndProof: return super().create( - index=index, selection_proof=selection_proof, aggregate=aggregate + aggregator_index=aggregator_index, + aggregate=aggregate, + selection_proof=selection_proof, ) def __str__(self) -> str: return ( - f"index={self.index}," - f" selection_proof={humanize_hash(self.selection_proof)}," + f"aggregator_index={self.aggregator_index}," f" aggregate={self.aggregate}," + f" selection_proof={humanize_hash(self.selection_proof)}," ) diff --git a/eth2/beacon/typing.py b/eth2/beacon/typing.py index cc0b7e87e8..39e524d8dc 100644 --- a/eth2/beacon/typing.py +++ b/eth2/beacon/typing.py @@ -37,11 +37,27 @@ def __str__(self) -> str: SigningRoot = NewType("SigningRoot", Hash32) +# +# Networkinig +# + +# CommitteeIndex % ATTESTATION_SUBNET_COUNT +SubnetId = NewType("SubnetId", int) + + +# +# Helpers +# + + class FromBlockParams(NamedTuple): slot: Slot = None -# defaults to emulate "zero types" +# +# Defaults to emulate "zero types" +# + default_slot = Slot(0) default_epoch = Epoch(0) default_committee_index = CommitteeIndex(0) diff --git a/tests/components/eth2/beacon/test_validator.py b/tests/components/eth2/beacon/test_validator.py index b42dd7023a..9aafc41d08 100644 --- a/tests/components/eth2/beacon/test_validator.py +++ b/tests/components/eth2/beacon/test_validator.py @@ -62,22 +62,47 @@ async def broadcast_beacon_block(self, block): async def broadcast_attestation(self, attestation): pass + async def broadcast_attestation_to_subnet(self, attestation, subnet_id): + pass + + async def broadcast_beacon_aggregate_and_proof(self, aggregate_and_proof): + pass + + +async def get_validator(event_loop, event_bus, indices, num_validators=None) -> Validator: + if num_validators is not None: + chain = BeaconChainFactory(num_validators=num_validators) + else: + chain = BeaconChainFactory() -async def get_validator(event_loop, event_bus, indices) -> Validator: - chain = BeaconChainFactory() validator_privkeys = { index: mk_key_pair_from_seed_index(index)[1] for index in indices } - def get_ready_attestations_fn(slot): - return () + # Mock attestation pool + unaggregated_attestation_pool = set() + aggregated_attestation_pool = set() + + def get_ready_attestations_fn(slot, is_aggregated): + return tuple(unaggregated_attestation_pool) + + def get_aggregatable_attestations_fn(slot, committee_index): + return tuple(unaggregated_attestation_pool) + + def import_attestation_fn(attestation, is_aggregated): + if is_aggregated: + aggregated_attestation_pool.add(attestation) + else: + unaggregated_attestation_pool.add(attestation) v = Validator( chain=chain, p2p_node=FakeNode(), validator_privkeys=validator_privkeys, get_ready_attestations_fn=get_ready_attestations_fn, + get_aggregatable_attestations_fn=get_aggregatable_attestations_fn, + import_attestation_fn=import_attestation_fn, event_bus=event_bus, ) asyncio.ensure_future(v.run(), loop=event_loop) @@ -335,13 +360,13 @@ async def test_validator_get_committee_assigment(event_loop, event_bus): state = alice.chain.get_head_state() epoch = compute_epoch_at_slot(state.slot, state_machine.config.SLOTS_PER_EPOCH) - assert alice.this_epoch_assignment[alice_indices[0]][0] == -1 - alice._get_this_epoch_assignment(alice_indices[0], epoch) - assert alice.this_epoch_assignment[alice_indices[0]][0] == epoch + assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == -1 + alice._get_local_current_epoch_assignment(alice_indices[0], epoch) + assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == epoch @pytest.mark.asyncio -async def test_validator_attest(event_loop, event_bus, monkeypatch): +async def test_validator_attest(event_loop, event_bus): alice_indices = [i for i in range(NUM_VALIDATORS)] alice = await get_validator(event_loop=event_loop, event_bus=event_bus, indices=alice_indices) head = alice.chain.get_canonical_head() @@ -349,7 +374,7 @@ async def test_validator_attest(event_loop, event_bus, monkeypatch): state = alice.chain.get_head_state() epoch = compute_epoch_at_slot(state.slot, state_machine.config.SLOTS_PER_EPOCH) - assignment = alice._get_this_epoch_assignment(alice_indices[0], epoch) + assignment = alice._get_local_current_epoch_assignment(alice_indices[0], epoch) attestations = await alice.attest(assignment.slot) assert len(attestations) == 1 @@ -371,6 +396,53 @@ async def test_validator_attest(event_loop, event_bus, monkeypatch): ) +@pytest.mark.asyncio +async def test_validator_aggregate(event_loop, event_bus): + num_validators = 50 + alice_indices = [i for i in range(num_validators)] + alice = await get_validator( + event_loop=event_loop, + event_bus=event_bus, + indices=alice_indices, + num_validators=num_validators, + ) + alice.skip_block( + slot=alice.chain.get_canonical_head().slot + 100, + state=alice.chain.get_head_state(), + state_machine=alice.chain.get_state_machine(), + ) + state_machine = alice.chain.get_state_machine() + state = alice.chain.get_head_state() + head = alice.chain.get_canonical_head() + + epoch = compute_epoch_at_slot(state.slot, state_machine.config.SLOTS_PER_EPOCH) + assignment = alice._get_local_current_epoch_assignment(alice_indices[0], epoch) + + attested_attsetation = await alice.attest(assignment.slot) + assert len(attested_attsetation) >= 1 + + aggregate_and_proofs = await alice.aggregate(assignment.slot) + assert len(aggregate_and_proofs) >= 1 + for aggregate_and_proof in aggregate_and_proofs: + attestation = aggregate_and_proof.aggregate + assert attestation.data.slot == assignment.slot + assert attestation.data.beacon_block_root == head.signing_root + assert attestation.data.index == assignment.committee_index + + # Advance the state and validate the attestation + config = state_machine.config + future_state = state_machine.state_transition.apply_state_transition( + state, + future_slot=assignment.slot + config.MIN_ATTESTATION_INCLUSION_DELAY, + ) + validate_attestation( + future_state, + attestation, + config, + ) + # break + + @pytest.mark.asyncio async def test_validator_include_ready_attestations(event_loop, event_bus, monkeypatch): # Alice controls all validators @@ -386,7 +458,7 @@ async def test_validator_include_ready_attestations(event_loop, event_bus, monke # Mock `get_ready_attestations_fn` so it returns the attestation alice # attested to. - def get_ready_attestations_fn(slot): + def get_ready_attestations_fn(slot, is_aggregated): return attestations monkeypatch.setattr(alice, 'get_ready_attestations', get_ready_attestations_fn) diff --git a/tests/eth2/core/beacon/conftest.py b/tests/eth2/core/beacon/conftest.py index 42047de793..8b110368f8 100644 --- a/tests/eth2/core/beacon/conftest.py +++ b/tests/eth2/core/beacon/conftest.py @@ -575,9 +575,9 @@ def sample_state(sample_beacon_state_params): @pytest.fixture def sample_aggregate_and_proof_params(sample_attestation_params): return { - "index": 5, - "selection_proof": bytes([1] * 96), + "aggregator_index": 5, "aggregate": Attestation.create(**sample_attestation_params), + "selection_proof": bytes([1] * 96), } diff --git a/tests/eth2/core/beacon/tools/builder/test_aggregator.py b/tests/eth2/core/beacon/tools/builder/test_aggregator.py index 1de33eee11..18417accd6 100644 --- a/tests/eth2/core/beacon/tools/builder/test_aggregator.py +++ b/tests/eth2/core/beacon/tools/builder/test_aggregator.py @@ -1,14 +1,19 @@ import pytest +from eth2._utils.bitfield import get_empty_bitfield, set_voted from eth2.beacon.committee_helpers import ( compute_epoch_at_slot, iterate_committees_at_epoch, ) +from eth2.beacon.signature_domain import SignatureDomain from eth2.beacon.tools.builder.aggregator import ( TARGET_AGGREGATORS_PER_COMMITTEE, + get_aggregate_from_valid_committee_attestations, + get_slot_signature, is_aggregator, - slot_signature, ) +from eth2.beacon.tools.builder.validator import sign_transaction +from eth2.beacon.types.attestations import Attestation from eth2.configs import CommitteeConfig @@ -16,7 +21,7 @@ @pytest.mark.parametrize( ("validator_count", "target_committee_size", "slots_per_epoch"), [(1000, 100, 10)] ) -def test_aggregate_votes(validator_count, privkeys, genesis_state, config): +def test_aggregator_selection(validator_count, privkeys, genesis_state, config): config = CommitteeConfig(config) state = genesis_state epoch = compute_epoch_at_slot(state.slot, config.SLOTS_PER_EPOCH) @@ -29,7 +34,9 @@ def test_aggregate_votes(validator_count, privkeys, genesis_state, config): aggregator_count = 0 for index in range(validator_count): if index in committee: - signature = slot_signature(genesis_state, slot, privkeys[index], config) + signature = get_slot_signature( + genesis_state, slot, privkeys[index], config + ) attester_is_aggregator = is_aggregator( state, slot, committee_index, signature, config ) @@ -45,3 +52,38 @@ def test_aggregate_votes(validator_count, privkeys, genesis_state, config): < average_aggregator_count < TARGET_AGGREGATORS_PER_COMMITTEE + 3 ) + + +def test_get_aggregate_from_valid_committee_attestations( + sample_attestation_params, privkeys, genesis_state, config +): + committee_size = 16 + empty_bitfield = get_empty_bitfield(committee_size) + base_attestation = Attestation.create(**sample_attestation_params) + message_hash = base_attestation.data.hash_tree_root + attestations = [] + expected_bitfield = empty_bitfield + + for i in range(4, 16, 2): + attestations.append( + base_attestation.mset( + "aggregation_bits", + set_voted(empty_bitfield, i), + "signature", + sign_transaction( + message_hash=message_hash, + privkey=privkeys[i], + state=genesis_state, + slot=genesis_state.slot, + signature_domain=SignatureDomain.DOMAIN_BEACON_ATTESTER, + slots_per_epoch=config.SLOTS_PER_EPOCH, + ), + ) + ) + expected_bitfield = set_voted(expected_bitfield, i) + + aggregate_attestation = get_aggregate_from_valid_committee_attestations( + attestations + ) + + assert aggregate_attestation.aggregation_bits == expected_bitfield diff --git a/tests/eth2/core/beacon/types/test_aggregate_and_proof.py b/tests/eth2/core/beacon/types/test_aggregate_and_proof.py index 062ab8abe8..e2a29ef064 100644 --- a/tests/eth2/core/beacon/types/test_aggregate_and_proof.py +++ b/tests/eth2/core/beacon/types/test_aggregate_and_proof.py @@ -4,11 +4,13 @@ def test_defaults(sample_aggregate_and_proof_params): aggregate_and_proof = AggregateAndProof.create(**sample_aggregate_and_proof_params) - assert aggregate_and_proof.index == sample_aggregate_and_proof_params["index"] - assert ( - aggregate_and_proof.selection_proof - == sample_aggregate_and_proof_params["selection_proof"] + assert aggregate_and_proof.aggregator_index == ( + sample_aggregate_and_proof_params["aggregator_index"] ) assert ( aggregate_and_proof.aggregate == sample_aggregate_and_proof_params["aggregate"] ) + assert ( + aggregate_and_proof.selection_proof + == sample_aggregate_and_proof_params["selection_proof"] + ) diff --git a/tests/libp2p/bcc/test_receive_server.py b/tests/libp2p/bcc/test_receive_server.py index a2212cd5fe..570f8fdc38 100644 --- a/tests/libp2p/bcc/test_receive_server.py +++ b/tests/libp2p/bcc/test_receive_server.py @@ -21,8 +21,11 @@ from eth2.configs import Eth2GenesisConfig from trinity.db.beacon.chain import AsyncBeaconChainDB from trinity.protocol.bcc_libp2p.configs import ( + ATTESTATION_SUBNET_COUNT, + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_ATTESTATION, PUBSUB_TOPIC_BEACON_BLOCK, + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION, ) from trinity.protocol.bcc_libp2p.servers import AttestationPool, OrphanBlockPool from trinity.tools.async_method import wait_until_true @@ -86,9 +89,18 @@ async def receive_server(): topic_msg_queues = { PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(), PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(), + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(), } + subnets = set(subnet_id for subnet_id in range(ATTESTATION_SUBNET_COUNT)) + for subnet_id in range(ATTESTATION_SUBNET_COUNT): + topic = ( + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id), + ) + topic_msg_queues[topic] = asyncio.Queue() chain = await get_fake_chain() - server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues) + server = ReceiveServerFactory( + chain=chain, topic_msg_queues=topic_msg_queues, subnets=subnets + ) asyncio.ensure_future(server.run()) await server.ready.wait() try: @@ -104,6 +116,7 @@ async def receive_server_with_mock_process_orphan_blocks_period( topic_msg_queues = { PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(), PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(), + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(), } chain = await get_fake_chain() server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues) @@ -295,7 +308,7 @@ async def test_bcc_receive_server_handle_beacon_attestations(receive_server): topicIDs=[PUBSUB_TOPIC_BEACON_ATTESTATION], ) - assert attestation not in receive_server.attestation_pool + assert attestation not in receive_server.unaggregated_attestation_pool beacon_attestation_queue = receive_server.topic_msg_queues[ PUBSUB_TOPIC_BEACON_ATTESTATION @@ -304,7 +317,7 @@ async def test_bcc_receive_server_handle_beacon_attestations(receive_server): await beacon_attestation_queue.put(msg) await wait_all_messages_processed(beacon_attestation_queue) # Check that attestation is put to attestation pool - assert attestation in receive_server.attestation_pool + assert attestation in receive_server.unaggregated_attestation_pool # Put the attestation in the next block block = get_blocks(receive_server.chain, num_blocks=1)[0] @@ -322,7 +335,7 @@ async def test_bcc_receive_server_handle_beacon_attestations(receive_server): await beacon_block_queue.put(msg) await wait_all_messages_processed(beacon_block_queue) # Check that attestation is removed from attestation pool - assert attestation not in receive_server.attestation_pool + assert attestation not in receive_server.unaggregated_attestation_pool @pytest.mark.asyncio @@ -371,6 +384,11 @@ async def request_beacon_blocks_by_root(peer_id, block_roots): return requested_blocks with monkeypatch.context() as m: + for orphan_block in (blocks[4],) + fork_blocks: + receive_server.orphan_block_pool.add(orphan_block) + await wait_until_true( + lambda: len(receive_server.orphan_block_pool) != 0, timeout=4 + ) for peer in (peer1, peer2): receive_server.p2p_node.handshaked_peers.add(peer) m.setattr( @@ -378,12 +396,9 @@ async def request_beacon_blocks_by_root(peer_id, block_roots): "request_beacon_blocks_by_root", request_beacon_blocks_by_root, ) - - for orphan_block in (blocks[4],) + fork_blocks: - receive_server.orphan_block_pool.add(orphan_block) # Wait for receive server to process the orphan blocks await wait_until_true( - lambda: len(receive_server.orphan_block_pool) == 0, timeout=2 + lambda: len(receive_server.orphan_block_pool) == 0, timeout=4 ) # Check that both peers were requested for blocks assert peer_1_called_event.is_set() @@ -413,7 +428,7 @@ def mock_get_head_state(): a3 = Attestation.create( signature=b"\x78" * 96, data=AttestationData.create(slot=attesting_slot + 1) ) - receive_server.attestation_pool.batch_add([a1, a2, a3]) + receive_server.unaggregated_attestation_pool.batch_add([a1, a2, a3]) # Workaround: add a fake head state slot # so `get_state_machine` won't trigger `HeadStateSlotNotFound` exception @@ -424,21 +439,29 @@ def mock_get_head_state(): state.slot = ( attesting_slot + MINIMAL_SERENITY_CONFIG.MIN_ATTESTATION_INCLUSION_DELAY - 1 ) - ready_attestations = receive_server.get_ready_attestations(state.slot) + ready_attestations = receive_server.get_ready_attestations( + state.slot, is_aggregated=False + ) assert len(ready_attestations) == 0 state.slot = ( attesting_slot + MINIMAL_SERENITY_CONFIG.MIN_ATTESTATION_INCLUSION_DELAY ) - ready_attestations = receive_server.get_ready_attestations(state.slot) + ready_attestations = receive_server.get_ready_attestations( + state.slot, is_aggregated=False + ) assert set([a1, a2]) == set(ready_attestations) state.slot = ( attesting_slot + MINIMAL_SERENITY_CONFIG.MIN_ATTESTATION_INCLUSION_DELAY + 1 ) - ready_attestations = receive_server.get_ready_attestations(state.slot) + ready_attestations = receive_server.get_ready_attestations( + state.slot, is_aggregated=False + ) assert set([a1, a2, a3]) == set(ready_attestations) state.slot = attesting_slot + MINIMAL_SERENITY_CONFIG.SLOTS_PER_EPOCH + 1 - ready_attestations = receive_server.get_ready_attestations(state.slot) + ready_attestations = receive_server.get_ready_attestations( + state.slot, is_aggregated=False + ) assert set([a3]) == set(ready_attestations) diff --git a/tests/libp2p/bcc/test_topic_validator.py b/tests/libp2p/bcc/test_topic_validator.py index fb8ca79a31..52a63a0800 100644 --- a/tests/libp2p/bcc/test_topic_validator.py +++ b/tests/libp2p/bcc/test_topic_validator.py @@ -1,6 +1,7 @@ import pytest from trinity.protocol.bcc_libp2p.configs import ( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_ATTESTATION, PUBSUB_TOPIC_BEACON_BLOCK, ) @@ -12,5 +13,7 @@ async def test_setup_topic_validators(nodes): node = nodes[0] topic_1 = PUBSUB_TOPIC_BEACON_BLOCK topic_2 = PUBSUB_TOPIC_BEACON_ATTESTATION + topic_3 = PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF assert topic_1 in node.pubsub.topic_validators assert topic_2 in node.pubsub.topic_validators + assert topic_3 in node.pubsub.topic_validators diff --git a/trinity/components/eth2/beacon/component.py b/trinity/components/eth2/beacon/component.py index 0add98f5ca..79b0a7ad36 100644 --- a/trinity/components/eth2/beacon/component.py +++ b/trinity/components/eth2/beacon/component.py @@ -5,7 +5,7 @@ import asyncio import logging import os -from typing import cast +from typing import Set, cast from async_exit_stack import AsyncExitStack from lahja import EndpointAPI @@ -15,7 +15,10 @@ from eth_utils import decode_hex -from eth2.beacon.typing import ValidatorIndex +from eth2.beacon.typing import ( + SubnetId, + ValidatorIndex, +) from p2p.service import run_service @@ -23,6 +26,7 @@ from trinity.config import BeaconAppConfig from trinity.db.manager import DBClient from trinity.extensibility import AsyncioIsolatedComponent +from trinity.protocol.bcc_libp2p.configs import ATTESTATION_SUBNET_COUNT from trinity.protocol.bcc_libp2p.node import Node from trinity.protocol.bcc_libp2p.servers import BCCReceiveServer @@ -103,6 +107,10 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: base_db, chain_config.genesis_config ) + # TODO: To simplify, subsribe all subnets + subnets: Set[SubnetId] = set( + SubnetId(subnet_id) for subnet_id in range(ATTESTATION_SUBNET_COUNT) + ) # TODO: Handle `bootstrap_nodes`. libp2p_node = Node( @@ -111,12 +119,14 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: listen_port=boot_info.args.port, preferred_nodes=trinity_config.preferred_nodes, chain=chain, + subnets=subnets, ) receive_server = BCCReceiveServer( chain=chain, p2p_node=libp2p_node, topic_msg_queues=libp2p_node.pubsub.my_topics, + subnets=subnets, cancel_token=libp2p_node.cancel_token, ) @@ -140,6 +150,8 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: event_bus=event_bus, token=libp2p_node.cancel_token, get_ready_attestations_fn=receive_server.get_ready_attestations, + get_aggregatable_attestations_fn=receive_server.get_aggregatable_attestations, + import_attestation_fn=receive_server.import_attestation, ) slot_ticker = SlotTicker( @@ -159,7 +171,6 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: block_importer=SyncBlockImporter(chain), genesis_config=chain_config.genesis_config, token=libp2p_node.cancel_token, - ) services = (libp2p_node, receive_server, slot_ticker, validator, syncer) diff --git a/trinity/components/eth2/beacon/validator.py b/trinity/components/eth2/beacon/validator.py index 5b0f125189..5251101a14 100644 --- a/trinity/components/eth2/beacon/validator.py +++ b/trinity/components/eth2/beacon/validator.py @@ -1,14 +1,11 @@ from itertools import ( groupby, ) -from operator import ( - itemgetter, -) from typing import ( Callable, Dict, Iterable, - Sequence, + Set, Tuple, ) @@ -17,6 +14,7 @@ from cancel_token import ( CancelToken, ) +from eth_typing import BLSSignature from eth_utils import ( humanize_hash, to_tuple, @@ -35,6 +33,11 @@ from eth2.beacon.state_machines.forks.serenity.blocks import ( SerenityBeaconBlock, ) +from eth2.beacon.tools.builder.aggregator import ( + get_aggregate_from_valid_committee_attestations, + get_slot_signature, + is_aggregator, +) from eth2.beacon.tools.builder.committee_assignment import ( CommitteeAssignment, ) @@ -48,6 +51,9 @@ from eth2.beacon.tools.builder.validator import ( create_signed_attestation_at_slot, ) +from eth2.beacon.types.aggregate_and_proof import ( + AggregateAndProof, +) from eth2.beacon.types.attestations import ( Attestation, ) @@ -62,6 +68,7 @@ CommitteeValidatorIndex, Epoch, Slot, + SubnetId, ValidatorIndex, ) from eth2.configs import CommitteeConfig @@ -75,9 +82,12 @@ SlotTickEvent, ) from trinity.protocol.bcc_libp2p.node import Node +from trinity.protocol.bcc_libp2p.configs import ATTESTATION_SUBNET_COUNT -GetReadyAttestationsFn = Callable[[Slot], Sequence[Attestation]] +GetReadyAttestationsFn = Callable[[Slot, bool], Tuple[Attestation, ...]] +GetAggregatableAttestationsFn = Callable[[Slot, CommitteeIndex], Tuple[Attestation, ...]] +ImportAttestationFn = Callable[[Attestation, bool], None] # FIXME: Read this from validator config @@ -92,7 +102,7 @@ class Validator(BaseService): slots_per_epoch: int latest_proposed_epoch: Dict[ValidatorIndex, Epoch] latest_attested_epoch: Dict[ValidatorIndex, Epoch] - this_epoch_assignment: Dict[ValidatorIndex, Tuple[Epoch, CommitteeAssignment]] + local_validator_epoch_assignment: Dict[ValidatorIndex, Tuple[Epoch, CommitteeAssignment]] def __init__( self, @@ -101,6 +111,8 @@ def __init__( validator_privkeys: Dict[ValidatorIndex, int], event_bus: EndpointAPI, get_ready_attestations_fn: GetReadyAttestationsFn, + get_aggregatable_attestations_fn: GetAggregatableAttestationsFn, + import_attestation_fn: ImportAttestationFn, token: CancelToken = None) -> None: super().__init__(token) self.chain = chain @@ -113,15 +125,17 @@ def __init__( # into/read from validator's own db. self.latest_proposed_epoch = {} self.latest_attested_epoch = {} - self.this_epoch_assignment = {} + self.local_validator_epoch_assignment = {} for validator_index in validator_privkeys: self.latest_proposed_epoch[validator_index] = Epoch(-1) self.latest_attested_epoch[validator_index] = Epoch(-1) - self.this_epoch_assignment[validator_index] = ( + self.local_validator_epoch_assignment[validator_index] = ( Epoch(-1), CommitteeAssignment((), CommitteeIndex(-1), Slot(-1)), ) self.get_ready_attestations: GetReadyAttestationsFn = get_ready_attestations_fn + self.get_aggregatable_attestations: GetAggregatableAttestationsFn = get_aggregatable_attestations_fn # noqa: E501 + self.import_attestation: ImportAttestationFn = import_attestation_fn async def _run(self) -> None: self.logger.info( @@ -150,24 +164,6 @@ async def handle_slot_tick(self) -> None: " HERE AS IT IS INTERNAL TO OUR OWN CODE" ) - def _get_this_epoch_assignment(self, - validator_index: ValidatorIndex, - this_epoch: Epoch) -> CommitteeAssignment: - # update `this_epoch_assignment` if it's outdated - if this_epoch > self.this_epoch_assignment[validator_index][0]: - state_machine = self.chain.get_state_machine() - state = self.chain.get_head_state() - self.this_epoch_assignment[validator_index] = ( - this_epoch, - get_committee_assignment( - state, - state_machine.config, - this_epoch, - validator_index, - ) - ) - return self.this_epoch_assignment[validator_index][1] - async def handle_first_tick(self, slot: Slot) -> None: head = self.chain.get_canonical_head() state_machine = self.chain.get_state_machine() @@ -243,23 +239,45 @@ async def handle_second_tick(self, slot: Slot) -> None: await self.attest(slot) async def handle_third_tick(self, slot: Slot) -> None: - # TODO: Add aggregator strategy - pass + state_machine = self.chain.get_state_machine() + state = self.chain.get_head_state() + if state.slot < slot: + self.skip_block( + slot=slot, + state=state, + state_machine=state_machine, + ) + + await self.aggregate(slot) + # + # Proposing block + # async def propose_block(self, proposer_index: ValidatorIndex, slot: Slot, state: BeaconState, state_machine: BaseBeaconStateMachine, head_block: BaseBeaconBlock) -> BaseBeaconBlock: - ready_attestations = self.get_ready_attestations(slot) - block = self._make_proposing_block( - proposer_index=proposer_index, - slot=slot, + """ + Propose a block and broadcast it. + """ + # TODO(hwwhww): Check if need to aggregate and if they are overlapping. + aggregated_attestations = self.get_ready_attestations(slot, True) + unaggregated_attestations = self.get_ready_attestations(slot, False) + ready_attestations = aggregated_attestations + unaggregated_attestations + + block = create_block_on_state( state=state, + config=state_machine.config, state_machine=state_machine, + block_class=SerenityBeaconBlock, parent_block=head_block, + slot=slot, + validator_index=proposer_index, + privkey=self.validator_privkeys[proposer_index], attestations=ready_attestations, + check_proposer_index=False, ) self.logger.debug( bold_green("validator %s is proposing a block %s with attestations %s"), @@ -272,30 +290,13 @@ async def propose_block(self, await self.p2p_node.broadcast_beacon_block(block) return block - def _make_proposing_block(self, - proposer_index: ValidatorIndex, - slot: Slot, - state: BeaconState, - state_machine: BaseBeaconStateMachine, - parent_block: BaseBeaconBlock, - attestations: Sequence[Attestation]) -> BaseBeaconBlock: - return create_block_on_state( - state=state, - config=state_machine.config, - state_machine=state_machine, - block_class=SerenityBeaconBlock, - parent_block=parent_block, - slot=slot, - validator_index=proposer_index, - privkey=self.validator_privkeys[proposer_index], - attestations=attestations, - check_proposer_index=False, - ) - def skip_block(self, slot: Slot, state: BeaconState, state_machine: BaseBeaconStateMachine) -> BeaconState: + """ + Forward state to the target ``slot`` and persist the state. + """ post_state = state_machine.state_transition.apply_state_transition( state, future_slot=slot, @@ -310,69 +311,111 @@ def skip_block(self, self.chain.chaindb.persist_state(post_state) return post_state - def _is_attesting(self, - validator_index: ValidatorIndex, - assignment: CommitteeAssignment, - slot: Slot, - epoch: Epoch) -> bool: - has_attested = epoch <= self.latest_attested_epoch[validator_index] - return not has_attested and slot == assignment.slot + # + # Attesting attestation + # + def _get_local_current_epoch_assignment( + self, + validator_index: ValidatorIndex, + epoch: Epoch) -> CommitteeAssignment: + """ + Return the validator's epoch assignment at the given epoch. + + Note that ``epoch`` <= next_epoch. + """ + is_new_local_validator = validator_index not in self.local_validator_epoch_assignment + should_update = ( + is_new_local_validator or ( + not is_new_local_validator and ( + epoch > self.local_validator_epoch_assignment[validator_index][0] + ) + ) + ) + if should_update: + state_machine = self.chain.get_state_machine() + state = self.chain.get_head_state() + self.local_validator_epoch_assignment[validator_index] = ( + epoch, + get_committee_assignment( + state, + state_machine.config, + epoch, + validator_index, + ) + ) + return self.local_validator_epoch_assignment[validator_index][1] + + def _get_local_current_epoch_assignments( + self, epoch: Epoch + ) -> Dict[ValidatorIndex, CommitteeAssignment]: + """ + Return the validator assignments of all the local validators. + """ + validator_assignments = { + validator_index: self._get_local_current_epoch_assignment( + validator_index, + epoch, + ) + for validator_index in self.validator_privkeys + } + return validator_assignments + + def _get_attesting_assignments_at_slot(self, slot: Slot) -> Set[CommitteeAssignment]: + """ + Return the set of ``CommitteeAssignment``s of the given ``slot`` + """ + epoch = compute_epoch_at_slot(slot, self.slots_per_epoch) + validator_assignments = self._get_local_current_epoch_assignments(epoch) + committee_assignments = set(validator_assignments.values()) + committee_assignments_at_slot = set( + filter( + lambda committee_assignment: committee_assignment.slot == slot, + committee_assignments + ) + ) + return committee_assignments_at_slot @to_tuple - def _get_attesting_validator_and_committee_index( - self, - assignments: Dict[ValidatorIndex, CommitteeAssignment], - slot: Slot, - epoch: Epoch - ) -> Iterable[Tuple[ValidatorIndex, CommitteeIndex]]: - for validator_index, assignment in assignments.items(): - if self._is_attesting(validator_index, assignment, slot, epoch): - yield (validator_index, assignment.committee_index) + def _get_local_attesters_at_assignment( + self, target_assignment: CommitteeAssignment + ) -> Iterable[ValidatorIndex]: + """ + Return the local attesters that in the committee of the given assignment + """ + for validator_index, (_, assignment) in self.local_validator_epoch_assignment.items(): + if ( + assignment.slot == target_assignment.slot and + assignment.committee_index == target_assignment.committee_index + ): + yield validator_index async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: + """ + Attest the block at the given ``slot`` and broadcast them. + """ attestations: Tuple[Attestation, ...] = () head = self.chain.get_canonical_head() state_machine = self.chain.get_state_machine() state = self.chain.get_head_state() epoch = compute_epoch_at_slot(slot, self.slots_per_epoch) - validator_assignments = { - validator_index: self._get_this_epoch_assignment( - validator_index, - epoch, - ) - for validator_index in self.validator_privkeys - } - attesting_validators = self._get_attesting_validator_and_committee_index( - validator_assignments, - slot, - epoch, - ) - if len(attesting_validators) == 0: - return () + attesting_committee_assignments_at_slot = self._get_attesting_assignments_at_slot(slot) - # Sort the attesting validators by committee index - sorted_attesting_validators = sorted( - attesting_validators, - key=itemgetter(1), - ) - # Group the attesting validators by committee index - attesting_validators_groups = groupby( - sorted_attesting_validators, - key=itemgetter(1), - ) - for _, group in attesting_validators_groups: + for committee_assignment in attesting_committee_assignments_at_slot: + committee_index = committee_assignment.committee_index + committee = committee_assignment.committee + + attesting_validators_indices = tuple( + filter( + lambda attester: self.latest_attested_epoch[attester] < epoch, + self._get_local_attesters_at_assignment(committee_assignment), + ) + ) # Get the validator_index -> privkey map of the attesting validators attesting_validator_privkeys = { - attesting_data[0]: self.validator_privkeys[attesting_data[0]] - for attesting_data in group + index: self.validator_privkeys[index] + for index in attesting_validators_indices } - attesting_validators_indices = tuple(attesting_validator_privkeys.keys()) - # Get one of the attesting validator's assignment in order to get the committee info - assignment = self._get_this_epoch_assignment( - attesting_validators_indices[0], - epoch, - ) attestation = create_signed_attestation_at_slot( state, state_machine.config, @@ -380,10 +423,10 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: slot, head.signing_root, attesting_validator_privkeys, - assignment.committee, - assignment.committee_index, + committee, + committee_index, tuple( - CommitteeValidatorIndex(assignment.committee.index(index)) + CommitteeValidatorIndex(committee.index(index)) for index in attesting_validators_indices ), ) @@ -393,13 +436,101 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: head, attestation, ) - for validator_index in attesting_validators_indices: - self.latest_attested_epoch[validator_index] = epoch - self.logger.debug("broadcasting attestation %s", attestation) - await self.p2p_node.broadcast_attestation(attestation) + # await self.p2p_node.broadcast_attestation(attestation) + subnet_id = SubnetId(committee_index % ATTESTATION_SUBNET_COUNT) + + # Import attestation to pool and then broadcast it + self.import_attestation(attestation, False) + await self.p2p_node.broadcast_attestation_to_subnet(attestation, subnet_id) + + # Log the last epoch that the validator attested + for index in attesting_validators_indices: + self.latest_attested_epoch[index] = epoch attestations = attestations + (attestation,) # TODO: Aggregate attestations return attestations + + # + # Aggregating attestation + # + @to_tuple + def _get_aggregates( + self, slot: Slot, committee_index: CommitteeIndex, config: CommitteeConfig + ) -> Iterable[Attestation]: + """ + Return the aggregate attestation of the given committee. + """ + # TODO: The aggregator should aggregate the late attestations? + aggregatable_attestations = self.get_aggregatable_attestations(slot, committee_index) + attestation_groups = groupby( + aggregatable_attestations, + key=lambda attestation: attestation.data, + ) + for _, group in attestation_groups: + yield get_aggregate_from_valid_committee_attestations(tuple(group)) + + async def aggregate( + self, + slot: Slot + ) -> Tuple[AggregateAndProof, ...]: + """ + Aggregate the attestations at ``slot`` and broadcast them. + """ + # Check the aggregators selection + aggregate_and_proofs: Tuple[AggregateAndProof, ...] = () + state_machine = self.chain.get_state_machine() + state = self.chain.get_head_state() + config = state_machine.config + + attesting_committee_assignments_at_slot = self._get_attesting_assignments_at_slot(slot) + # 1. For each committee_assignment at the given slot + for committee_assignment in attesting_committee_assignments_at_slot: + committee_index = committee_assignment.committee_index + + local_attesters = self._get_local_attesters_at_assignment(committee_assignment) + # Get the validator_index -> privkey map of the attesting validators + attesting_validator_privkeys = { + index: self.validator_privkeys[index] + for index in local_attesters + } + + selected_proofs: Dict[ValidatorIndex, BLSSignature] = {} + # 2. For each attester + for validator_index, privkey in attesting_validator_privkeys.items(): + # Check if the vallidator is one of the aggregators + signature = get_slot_signature( + state, slot, privkey, CommitteeConfig(config), + ) + is_aggregator_result = is_aggregator( + state, + slot, + committee_index, + signature, + CommitteeConfig(config), + ) + if is_aggregator_result: + self.logger.debug( + f"validator ({validator_index}) is aggregator of" + f" committee_index={committee_index} at slot={slot}" + ) + selected_proofs[validator_index] = signature + else: + continue + + aggregates = self._get_aggregates(slot, committee_index, config) + # 3. For each aggregate + # (it's possible with same CommitteeIndex and different AttesatationData) + for aggregate in aggregates: + aggregate_and_proof = AggregateAndProof.create( + aggregator_index=validator_index, + aggregate=aggregate, + selection_proof=selected_proofs[validator_index], + ) + self.import_attestation(aggregate_and_proof.aggregate, True) + await self.p2p_node.broadcast_beacon_aggregate_and_proof(aggregate_and_proof) + aggregate_and_proofs += (aggregate_and_proof,) + + return aggregate_and_proofs diff --git a/trinity/protocol/bcc_libp2p/configs.py b/trinity/protocol/bcc_libp2p/configs.py index 51c4bd256c..2cf19b6526 100644 --- a/trinity/protocol/bcc_libp2p/configs.py +++ b/trinity/protocol/bcc_libp2p/configs.py @@ -1,4 +1,5 @@ import enum +from string import Template from typing import ( NamedTuple, ) @@ -11,18 +12,18 @@ # Network configuration # -# TODO: TBD # The max size of uncompressed gossip messages. GOSSIP_MAX_SIZE = 2 ** 20 # 1 MiB # The maximum allowed size of uncompressed req/resp chunked responses. MAX_CHUNK_SIZE = 2**20 # 1 MiB -# TODO: TBD -# The number of shard subnets used in the gossipsub protocol. -SHARD_SUBNET_COUNT = None +# The number of attestation subnets used in the gossipsub protocol. +ATTESTATION_SUBNET_COUNT = 64 # Maximum time to wait for first byte of request response (time-to-first-byte). TTFB_TIMEOUT = 5 # seconds # Maximum time for complete response transfer. RESP_TIMEOUT = 10 # seconds +# The maximum number of slots during which an attestation can be propagated. +ATTESTATION_PROPAGATION_SLOT_RANGE = 32 # # Gossip domain @@ -56,8 +57,11 @@ class GossipsubParams(NamedTuple): # Topics PUBSUB_TOPIC_BEACON_BLOCK = "beacon_block" +PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF = "beacon_aggregate_and_proof" PUBSUB_TOPIC_BEACON_ATTESTATION = "beacon_attestation" -PUBSUB_TOPIC_SHARD_ATTESTATION_FMT = "shard{}_attestation" +PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION = Template( + "committee_index${subnet_id}_beacon_attestation" +) PUBSUB_TOPIC_VOLUNTARY_EXIT = "voluntary_exit" PUBSUB_TOPIC_PROPOSER_SLASHING = "proposer_slashing" PUBSUB_TOPIC_ATTESTER_SLASHING = "attester_slashing" diff --git a/trinity/protocol/bcc_libp2p/exceptions.py b/trinity/protocol/bcc_libp2p/exceptions.py index 50cd9bbc4a..85617f862b 100644 --- a/trinity/protocol/bcc_libp2p/exceptions.py +++ b/trinity/protocol/bcc_libp2p/exceptions.py @@ -56,3 +56,7 @@ class ServerError(IShouldRespondAnError): class UnhandshakedPeer(BaseLibp2pError): ... + + +class InvalidGossipMessage(BaseLibp2pError): + ... diff --git a/trinity/protocol/bcc_libp2p/node.py b/trinity/protocol/bcc_libp2p/node.py index 4c1f6f1f9a..4158c60e3a 100644 --- a/trinity/protocol/bcc_libp2p/node.py +++ b/trinity/protocol/bcc_libp2p/node.py @@ -8,6 +8,7 @@ Dict, Optional, Sequence, + Set, Tuple, ) @@ -21,6 +22,9 @@ from eth2.beacon.chains.base import ( BaseBeaconChain, ) +from eth2.beacon.types.aggregate_and_proof import ( + AggregateAndProof, +) from eth2.beacon.types.attestations import ( Attestation, ) @@ -33,6 +37,7 @@ Slot, Version, SigningRoot, + SubnetId, ) from libp2p import ( @@ -96,8 +101,10 @@ GOSSIPSUB_PROTOCOL_ID, GoodbyeReasonCode, GossipsubParams, + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_BLOCK, PUBSUB_TOPIC_BEACON_ATTESTATION, + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION, REQ_RESP_BEACON_BLOCKS_BY_RANGE, REQ_RESP_GOODBYE, REQ_RESP_STATUS, @@ -123,8 +130,10 @@ BeaconBlocksByRootRequest, ) from .topic_validators import ( + get_beacon_aggregate_and_proof_validator, get_beacon_attestation_validator, get_beacon_block_validator, + get_committee_index_beacon_attestation_validator, ) from .utils import ( make_rpc_v1_ssz_protocol_id, @@ -247,6 +256,7 @@ class Node(BaseService): bootstrap_nodes: Tuple[Multiaddr, ...] preferred_nodes: Tuple[Multiaddr, ...] chain: BaseBeaconChain + subnets: Set[SubnetId] handshaked_peers: PeerPool = None @@ -261,13 +271,15 @@ def __init__( gossipsub_params: Optional[GossipsubParams] = None, cancel_token: CancelToken = None, bootstrap_nodes: Tuple[Multiaddr, ...] = (), - preferred_nodes: Tuple[Multiaddr, ...] = ()) -> None: + preferred_nodes: Tuple[Multiaddr, ...] = (), + subnets: Optional[Set[SubnetId]] = None) -> None: super().__init__(cancel_token) self.listen_ip = listen_ip self.listen_port = listen_port self.key_pair = key_pair self.bootstrap_nodes = bootstrap_nodes self.preferred_nodes = preferred_nodes + self.subnets = subnets if subnets is not None else set() # TODO: Add key and peer_id to the peerstore if security_protocol_ops is None: security_protocol_ops = { @@ -328,14 +340,22 @@ async def start(self) -> None: await self.connect_preferred_nodes() # TODO: Connect bootstrap nodes? - # pubsub + # Pubsub + # Global channel await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_BLOCK) await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_ATTESTATION) + await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF) + # Attestation subnets + for subnet_id in self.subnets: + topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id)) + await self.pubsub.subscribe(topic) + self._setup_topic_validators() self._is_started = True def _setup_topic_validators(self) -> None: + # Global channel self.pubsub.set_topic_validator( PUBSUB_TOPIC_BEACON_BLOCK, get_beacon_block_validator(self.chain), @@ -346,6 +366,19 @@ def _setup_topic_validators(self) -> None: get_beacon_attestation_validator(self.chain), False, ) + # Attestation subnets + for subnet_id in self.subnets: + self.pubsub.set_topic_validator( + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id)), + get_committee_index_beacon_attestation_validator(self.chain, subnet_id), + False, + ) + + self.pubsub.set_topic_validator( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, + get_beacon_aggregate_and_proof_validator(self.chain), + False, + ) async def dial_peer_maddr(self, maddr: Multiaddr, peer_id: ID) -> None: """ @@ -423,6 +456,22 @@ async def broadcast_beacon_block(self, block: BaseBeaconBlock) -> None: async def broadcast_attestation(self, attestation: Attestation) -> None: await self._broadcast_data(PUBSUB_TOPIC_BEACON_ATTESTATION, ssz.encode(attestation)) + async def broadcast_attestation_to_subnet( + self, attestation: Attestation, subnet_id: SubnetId + ) -> None: + await self._broadcast_data( + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id)), + ssz.encode(attestation) + ) + + async def broadcast_beacon_aggregate_and_proof( + self, aggregate_and_proof: AggregateAndProof + ) -> None: + await self._broadcast_data( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, + ssz.encode(aggregate_and_proof), + ) + async def _broadcast_data(self, topic: str, data: bytes) -> None: await self.pubsub.publish(topic, data) diff --git a/trinity/protocol/bcc_libp2p/servers.py b/trinity/protocol/bcc_libp2p/servers.py index 2b18e3ae81..ec3fd3e766 100644 --- a/trinity/protocol/bcc_libp2p/servers.py +++ b/trinity/protocol/bcc_libp2p/servers.py @@ -5,6 +5,7 @@ Dict, Iterable, List, + Optional, Set, Tuple, Union, @@ -32,6 +33,7 @@ BaseBeaconChain, ) from eth2.beacon.operations.attestation_pool import AttestationPool +from eth2.beacon.types.aggregate_and_proof import AggregateAndProof from eth2.beacon.types.attestations import ( Attestation, ) @@ -41,17 +43,17 @@ ) from eth2.beacon.typing import ( SigningRoot, + SubnetId, ) -from eth2.beacon.state_machines.forks.serenity.block_validation import ( - validate_attestation_slot, -) -from eth2.beacon.typing import Slot +from eth2.beacon.typing import CommitteeIndex, Slot from trinity.protocol.bcc_libp2p.node import Node from .configs import ( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_BLOCK, PUBSUB_TOPIC_BEACON_ATTESTATION, + PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION, ) PROCESS_ORPHAN_BLOCKS_PERIOD = 10.0 @@ -113,20 +115,25 @@ class BCCReceiveServer(BaseService): chain: BaseBeaconChain p2p_node: Node topic_msg_queues: Dict[str, 'asyncio.Queue[rpc_pb2.Message]'] - attestation_pool: AttestationPool + unaggregated_attestation_pool: AttestationPool + aggregated_attestation_pool: AttestationPool orphan_block_pool: OrphanBlockPool + subnets: Set[SubnetId] def __init__( self, chain: BaseBeaconChain, p2p_node: Node, topic_msg_queues: Dict[str, 'asyncio.Queue[rpc_pb2.Message]'], + subnets: Optional[Set[SubnetId]] = None, cancel_token: CancelToken = None) -> None: super().__init__(cancel_token) self.chain = chain - self.topic_msg_queues = topic_msg_queues self.p2p_node = p2p_node - self.attestation_pool = AttestationPool() + self.topic_msg_queues = topic_msg_queues + self.subnets = subnets if subnets is not None else set() + self.unaggregated_attestation_pool = AttestationPool() + self.aggregated_attestation_pool = AttestationPool() self.orphan_block_pool = OrphanBlockPool() self.ready = asyncio.Event() @@ -134,12 +141,19 @@ async def _run(self) -> None: while not self.p2p_node.is_started: await self.sleep(0.5) self.logger.info("BCCReceiveServer up") + + # Handle gossipsub messages self.run_daemon_task(self._handle_beacon_attestation_loop()) self.run_daemon_task(self._handle_beacon_block_loop()) + self.run_daemon_task(self._handle_aggregate_and_proof_loop()) + self.run_daemon_task(self._handle_committee_beacon_attestation_loop()) self.run_daemon_task(self._process_orphan_blocks_loop()) self.ready.set() await self.cancellation() + # + # Daemon tasks + # async def _handle_message( self, topic: str, @@ -158,7 +172,28 @@ async def _handle_message( async def _handle_beacon_attestation_loop(self) -> None: await self._handle_message( PUBSUB_TOPIC_BEACON_ATTESTATION, - self._handle_beacon_attestations + self._handle_beacon_attestation + ) + + async def _handle_committee_beacon_attestation_loop(self) -> None: + while True: + await asyncio.sleep(0.5) + for subnet_id in self.subnets: + topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute( + subnet_id=str(subnet_id) + ) + try: + queue = self.topic_msg_queues[topic] + message = queue.get_nowait() + except asyncio.QueueEmpty: + continue + else: + await self._handle_committee_beacon_attestation(message) + + async def _handle_aggregate_and_proof_loop(self) -> None: + await self._handle_message( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, + self._handle_beacon_aggregate_and_proof, ) async def _handle_beacon_block_loop(self) -> None: @@ -204,29 +239,48 @@ async def _process_orphan_blocks_loop(self) -> None: else: self._process_received_block(block) - async def _handle_beacon_attestations(self, msg: rpc_pb2.Message) -> None: + # + # Message handlers + # + async def _handle_committee_beacon_attestation(self, msg: rpc_pb2.Message) -> None: + await self._handle_beacon_attestation(msg) + + async def _handle_beacon_aggregate_and_proof(self, msg: rpc_pb2.Message) -> None: + aggregate_and_proof = ssz.decode(msg.data, sedes=AggregateAndProof) + + self.logger.debug("Received aggregate_and_proof=%s", aggregate_and_proof) + + self._add_attestation(self.aggregated_attestation_pool, aggregate_and_proof.aggregate) + + async def _handle_beacon_attestation(self, msg: rpc_pb2.Message) -> None: attestation = ssz.decode(msg.data, sedes=Attestation) self.logger.debug("Received attestation=%s", attestation) - # Check if attestation has been seen already. - if not self._is_attestation_new(attestation): - return - # Add new attestation to attestation pool. - self.attestation_pool.add(attestation) + self._add_attestation(self.unaggregated_attestation_pool, attestation) async def _handle_beacon_block(self, msg: rpc_pb2.Message) -> None: block = ssz.decode(msg.data, BeaconBlock) self._process_received_block(block) - def _is_attestation_new(self, attestation: Attestation) -> bool: + def _is_attestation_new( + self, attestation_pool: AttestationPool, attestation: Attestation + ) -> bool: """ Check if the attestation is already in the database or the attestion pool. """ - if attestation.hash_tree_root in self.attestation_pool: + if attestation.hash_tree_root in attestation_pool: return False return not self.chain.attestation_exists(attestation.hash_tree_root) + def _add_attestation(self, attestation_pool: AttestationPool, attestation: Attestation) -> None: + # Check if attestation has been seen already. + if not self._is_attestation_new(attestation_pool, attestation): + return + + # Add new attestation to attestation pool. + attestation_pool.add(attestation) + def _process_received_block(self, block: BaseBeaconBlock) -> None: # If the block is an orphan, put it to the orphan pool self.logger.debug( @@ -255,7 +309,7 @@ def _process_received_block(self, block: BaseBeaconBlock) -> None: # TODO: should be done asynchronously? self._try_import_orphan_blocks(block.signing_root) # Remove attestations in block that are also in the attestation pool. - self.attestation_pool.batch_remove(block.body.attestations) + self.unaggregated_attestation_pool.batch_remove(block.body.attestations) def _try_import_orphan_blocks(self, parent_root: SigningRoot) -> None: """ @@ -308,18 +362,48 @@ def _is_block_root_seen(self, block_root: SigningRoot) -> bool: def _is_block_seen(self, block: BaseBeaconBlock) -> bool: return self._is_block_root_seen(block_root=block.signing_root) + # + # Exposed APIs for Validator + # @to_tuple - def get_ready_attestations(self, current_slot: Slot) -> Iterable[Attestation]: + def get_ready_attestations( + self, current_slot: Slot, is_aggregated: bool + ) -> Iterable[Attestation]: + """ + Get the attestations that are ready to be included in ``current_slot`` block. + """ config = self.chain.get_state_machine().config - for attestation in self.attestation_pool.get_all(): - try: - validate_attestation_slot( - attestation.data.slot, - current_slot, - config.SLOTS_PER_EPOCH, - config.MIN_ATTESTATION_INCLUSION_DELAY, - ) - except ValidationError: - continue - else: - yield attestation + + if is_aggregated: + attestation_pool = self.aggregated_attestation_pool + else: + attestation_pool = self.unaggregated_attestation_pool + + return attestation_pool.get_valid_attestation_by_current_slot( + current_slot, + config, + ) + + def get_aggregatable_attestations( + self, + slot: Slot, + committee_index: CommitteeIndex + ) -> Tuple[Attestation, ...]: + """ + Get the attestations of ``slot`` and ``committee_index``. + """ + try: + block = self.chain.get_canonical_block_by_slot(slot) + except BlockNotFound: + return () + + beacon_block_root = block.signing_root + return self.unaggregated_attestation_pool.get_acceptable_attestations( + slot, committee_index, beacon_block_root + ) + + def import_attestation(self, attestation: Attestation, is_aggregated: bool) -> None: + if is_aggregated: + self.aggregated_attestation_pool.add(attestation) + else: + self.unaggregated_attestation_pool.add(attestation) diff --git a/trinity/protocol/bcc_libp2p/topic_validators.py b/trinity/protocol/bcc_libp2p/topic_validators.py index 110354cf70..d8fcbcb0c9 100644 --- a/trinity/protocol/bcc_libp2p/topic_validators.py +++ b/trinity/protocol/bcc_libp2p/topic_validators.py @@ -12,20 +12,33 @@ from eth.exceptions import BlockNotFound +from eth2.beacon.types.aggregate_and_proof import AggregateAndProof from eth2.beacon.types.attestations import Attestation from eth2.beacon.chains.base import BaseBeaconChain -from eth2.beacon.types.blocks import BeaconBlock +from eth2.beacon.types.blocks import BaseBeaconBlock, BeaconBlock +from eth2.beacon.types.states import BeaconState +from eth2.beacon.state_machines.base import BaseBeaconStateMachine from eth2.beacon.state_machines.forks.serenity.block_validation import ( - validate_attestation, validate_proposer_signature, ) -from eth2.beacon.typing import Slot +from eth2.beacon.tools.builder.aggregator import ( + validate_aggregate_and_proof, + validate_attestation_propagation_slot_range, + validate_attestation_signature, +) +from eth2.beacon.typing import SubnetId from eth2.configs import CommitteeConfig from libp2p.peer.id import ID from libp2p.pubsub.pb import rpc_pb2 from trinity._utils.shellart import bold_red +from trinity.protocol.bcc_libp2p.configs import ( + ATTESTATION_PROPAGATION_SLOT_RANGE, + ATTESTATION_SUBNET_COUNT, +) +from trinity.protocol.bcc_libp2p.exceptions import InvalidGossipMessage + logger = logging.getLogger('trinity.components.eth2.beacon.TopicValidator') @@ -36,36 +49,28 @@ def beacon_block_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: block = ssz.decode(msg.data, BeaconBlock) except (TypeError, ssz.DeserializationError) as error: logger.debug( - bold_red("Failed to validate block=%s, error=%s"), - encode_hex(block.signing_root), + bold_red("Failed to decode block=%s, error=%s"), + encode_hex(msg.data), str(error), ) return False state_machine = chain.get_state_machine(block.slot - 1) - state_transition = state_machine.state_transition state = chain.get_head_state() - # Fast forward to state in future slot in order to pass - # block.slot validity check - state = state_transition.apply_state_transition( - state, - future_slot=block.slot, - ) + try: - validate_proposer_signature(state, block, CommitteeConfig(state_machine.config)) - except ValidationError as error: - logger.debug( - bold_red("Failed to validate block=%s, error=%s"), - encode_hex(block.signing_root), - str(error), - ) + run_validate_block_proposer_signature(state, state_machine, block) + except InvalidGossipMessage as error: + logger.debug("%s", str(error)) return False else: return True + return beacon_block_validator def get_beacon_attestation_validator(chain: BaseBeaconChain) -> Callable[..., bool]: + # TODO: The beacon_attestation topic is only for interop and will be removed prior to mainnet. def beacon_attestation_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: try: attestation = ssz.decode(msg.data, sedes=Attestation) @@ -73,47 +78,184 @@ def beacon_attestation_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> boo # Not correctly encoded logger.debug( bold_red("Failed to validate attestation=%s, error=%s"), - attestation, + encode_hex(msg.data), str(error), ) return False - state_machine = chain.get_state_machine() - config = state_machine.config state = chain.get_head_state() + state_machine = chain.get_state_machine() - # Check that beacon blocks attested to by the attestation are validated try: - chain.get_block_by_root(attestation.data.beacon_block_root) - except BlockNotFound: - logger.debug( - bold_red( - "Failed to validate attestation=%s, attested block=%s is not validated yet" - ), + validate_voting_beacon_block(chain, attestation) + validate_attestation_signature( + state, attestation, - encode_hex(attestation.data.beacon_block_root), + CommitteeConfig(state_machine.config), ) + except InvalidGossipMessage as error: + logger.debug("%s", str(error)) return False + else: + return True - # Fast forward to state in future slot in order to pass - # attestation.data.slot validity check - future_state = state_machine.state_transition.apply_state_transition( - state, - future_slot=Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY), - ) + return beacon_attestation_validator + + +def get_committee_index_beacon_attestation_validator( + chain: BaseBeaconChain, subnet_id: SubnetId +) -> Callable[..., bool]: + def committee_index_beacon_attestation_validator( + msg_forwarder: ID, msg: rpc_pb2.Message + ) -> bool: try: - validate_attestation( - future_state, - attestation, - config, - ) - except ValidationError as error: + attestation = ssz.decode(msg.data, sedes=Attestation) + except (TypeError, ssz.DeserializationError) as error: + # Not correctly encoded logger.debug( bold_red("Failed to validate attestation=%s, error=%s"), + encode_hex(msg.data), + str(error), + ) + return False + + state_machine = chain.get_state_machine() + state = chain.get_head_state() + + try: + validate_subnet_id(attestation, subnet_id) + validate_is_unaggregated(attestation) + validate_voting_beacon_block(chain, attestation) + validate_attestation_propagation_slot_range( + state, attestation, + ATTESTATION_PROPAGATION_SLOT_RANGE, + ) + validate_attestation_signature( + state, + attestation, + CommitteeConfig(state_machine.config), + ) + except InvalidGossipMessage as error: + logger.debug("%s", str(error)) + return False + else: + return True + + return committee_index_beacon_attestation_validator + + +def get_beacon_aggregate_and_proof_validator(chain: BaseBeaconChain) -> Callable[..., bool]: + def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> bool: + try: + aggregate_and_proof = ssz.decode(msg.data, sedes=AggregateAndProof) + except (TypeError, ssz.DeserializationError) as error: + # Not correctly encoded + logger.debug( + bold_red("Failed to validate aggregate_and_proof=%s, error=%s"), + encode_hex(msg.data), str(error), ) return False + state = chain.get_head_state() + state_machine = chain.get_state_machine() + attestation = aggregate_and_proof.aggregate + + try: + validate_voting_beacon_block(chain, attestation) + run_validate_aggregate_and_proof( + state, + aggregate_and_proof, + CommitteeConfig(state_machine.config), + ) + except InvalidGossipMessage as error: + logger.debug("%s", str(error)) + return False + return True - return beacon_attestation_validator + return beacon_aggregate_and_proof_validator + + +# +# Validation +# + + +def run_validate_block_proposer_signature( + state: BeaconState, state_machine: BaseBeaconStateMachine, block: BaseBeaconBlock +) -> None: + # Fast forward to state in future slot in order to pass + # block.slot validity check + try: + future_state = state_machine.state_transition.apply_state_transition( + state, + future_slot=block.slot, + ) + except ValidationError as error: + raise InvalidGossipMessage( + "Failed to fast forward to state at slot=%d, error=%s", + block.slot, + str(error), + ) + + try: + validate_proposer_signature(future_state, block, CommitteeConfig(state_machine.config)) + except ValidationError as error: + logger.debug( + "Failed to validate block=%s, error=%s", + encode_hex(block.signing_root), + str(error), + ) + + +def validate_subnet_id(attestation: Attestation, subnet_id: SubnetId) -> None: + if attestation.data.index % ATTESTATION_SUBNET_COUNT != subnet_id: + raise InvalidGossipMessage( + "Wrong attestation subnet_id=%d, topic subnet_id=%d. Attestation: %s", + attestation.data.index % ATTESTATION_SUBNET_COUNT, + subnet_id, + attestation, + ) + + +def validate_is_unaggregated(attestation: Attestation) -> None: + # Check if the attestation is unaggregated + if len([bit for bit in attestation.aggregation_bits if bit is True]) != 1: + raise InvalidGossipMessage( + "The attestation is aggregated. Attestation: %s", + attestation, + ) + return False + + +def validate_voting_beacon_block(chain: BaseBeaconChain, attestation: Attestation) -> None: + # Check that beacon blocks attested to by the attestation are validated + try: + chain.get_block_by_root(attestation.data.beacon_block_root) + except BlockNotFound: + raise InvalidGossipMessage( + "Failed to validate attestation=%s, attested block=%s is not validated yet", + attestation, + encode_hex(attestation.data.beacon_block_root), + ) + + +def run_validate_aggregate_and_proof( + state: BeaconState, + aggregate_and_proof: AggregateAndProof, + config: CommitteeConfig +) -> None: + try: + validate_aggregate_and_proof( + state, + aggregate_and_proof, + ATTESTATION_PROPAGATION_SLOT_RANGE, + config, + ) + except ValidationError as error: + InvalidGossipMessage( + "Failed to validate aggregate_and_proof=%s, error=%s", + aggregate_and_proof, + str(error), + ) diff --git a/trinity/tools/bcc_factories.py b/trinity/tools/bcc_factories.py index b5ce039287..573218102c 100644 --- a/trinity/tools/bcc_factories.py +++ b/trinity/tools/bcc_factories.py @@ -87,6 +87,7 @@ class Meta: cancel_token = None bootstrap_nodes = None preferred_nodes: Tuple[Multiaddr, ...] = tuple() + subnets: None chain = factory.SubFactory(BeaconChainFactory) @classmethod @@ -227,6 +228,7 @@ class Meta: chain = None p2p_node = factory.SubFactory(NodeFactory) topic_msg_queues = None + subnets = None cancel_token = None @classmethod