diff --git a/docs/data/conference.schema.png b/docs/data/conference.schema.png index 07bb20de..c5f37c7d 100644 Binary files a/docs/data/conference.schema.png and b/docs/data/conference.schema.png differ diff --git a/docs/data/conference.schema.xml b/docs/data/conference.schema.xml index 8e5e0f8c..b6553483 100644 --- a/docs/data/conference.schema.xml +++ b/docs/data/conference.schema.xml @@ -1 +1 @@ -7V3bcts4Ev0aVc0+ZIpXmXqMFCeTWSfribMzybyoIBGWmFCkloRsy1+/IAlIlADKVATwMumUKyWCNxA4OGycbjYG9mT19C5B6+WH2MfhwDL8p4H9ZmBZnjui/2cF26LAHQ2LgkUS+EWRuS+4C54xKzRY6SbwcXpwIInjkATrw8J5HEV4Tg7KUJLEj4eH3cfh4V3XaIGFgrs5CsXSvwKfLPljGfvy33CwWPI7mwbbs0L8YFaQLpEfP5aK7OuBPUnimBS/Vk8THGZtx9ulOO9txd5dxRIckTonXE9uZ8Zfxp39Mfny/t0tip6tr69su7jMAwo37IlpjVasxmTLmyF9DFYhiujW+D6OyB3bY9Dt+TII/Ru0jTdZNVKC5t/51ngZJ8EzPR6FdJdJC+juhLBetobZ1YIwnMRhnOT3sec+nnmzgzPvsiuyeyU4pefe8mc2j4o+oKeDA29QSngt4zBE6zSY5fXOTlyhZBFE45gQ+rzFQfwp3x5W6j7/R/ejMFhEtGxO74UT3hbF05hO/nhJ/B2XTrWH9simHTAWe4t14ANOCH4qFbHee4fjFSbJlh7C9loeQxIbSp75q1sUPO6RaXH8Lcuo9NigQ2w0LHYX3wOG/mCYOQc/joCf7PjXtGhDR66IoyVaZz9pQ5AAhZ/okEXRIts7JvGadUOI73m3Jewhst8z3lVmqSuKY8dZKwZ01L5mxavA9/OrlvEVxTmA0zWaB9HipriL7eyLPrG7ZUUxveR9mA/YJb0YjrLaxAQRNNtBfx0HEcmb1B3TP9ryE4P2iUufdEK3zf02/csOT8gkjihIUJCjAFN8PuIMo2M/idefKSIxf9ryMHOPoGZJAXV6gL8MMwYra1gPVLahC1OugKnbf1+MpFl5oEtQURdQh+hRBp6X8JE9JoP+jyLBqUDC7uXJKnfwfjpASAkSdqOIGAqIIMEKFzxDUvKc0K7HesjGeBkb+cV6zjRNMItbk1ksbW+rKwFHbfNKgZ3+ksqwAgTKSEUfGDwBDGjjBziac2LBTwQ4peOc4lltc8oIOEUxp3gVIOg+pziGAIZ5ghHB/hQRxirUcKGz4NWaPAO5dJxcTMNrmV0cUwAUsMtF7LIbop1ml9Eq+OhcbcJbdHtn/jZG/5vYD69M8VUzQ/PvOGKKC442q1++oWiTZoPMMrL+/BeQTNdJhk/MmyAZKa4s8a0FJFOXZE6P1E6TjH3zfvq7895/vnv84+6j9eX69unNK/GFk25m6TwJZjhJp2GwCpghkzUbcItCbpGApSbdVHPL0GiOW6RwsoBazqKW2oxxCI7qodyhyZHoK0rIXIQDuBoHXXM1WiOzHo+YpqcLPaJXCDyNXfI0OlX+pe56Gh3RrwSeRgVTbLcCCd33NDqihyiLh5nqp5qf1erVwCut+xkd0bXUNqt02+p9GQRXFSDo1IRaWnVXnFGDT6C/5NK6w9GFKbVicnGr5kc9IBcxfhcYpOMMUturaOqycl1RiQEKuYxCqtw+yihEGxg80fuT+xCn3LfYlkB3fz+fj0YDEOgkAp1rHQp0nqjPmZ4MRVfaYCSauXzSjBb0Qaeg0bWv0e3G+iUanRRY2hQZbm+DRqf0heVpt3n1IUK0eZcoov2xU+loe3lgBjdqBp/PLBKVTsos2uZOHljBqklFuxWsAgxv/3xIXfcm+HvzlPwn8N78Mf14+0rU/VOcpkEcAan0iVQk6pw2UpHiCJT/H+eUkwOzf5QCuv8/iVlkql2z1AK6v2Jq6YXsL43sk1grEKTb2SBdrwI9Jz4A0MYsUjiB0dJekO5Va8wix6qGT+SBQDSbJpIof7n6r02SAwZRLaVon/foA4P4mWLhUEzIfEqbC6P28ouBT7G+T9EaXkkSjMktE0dB2L/URhe1foj675RHsepLx054FKWQEmV/cChePpnWLv1r8yeahqisgEOxdTP4bGJp3aFoGhqiMX9uM3g/Njst0kkZUTSDMwMYviTqEaW07k7kkzRgFFV2Si8yc0jhOxLNFBaiO20m0hKIRRmxNOpNlKMJbBXFtspufPaPWnarFpTQEKIZDiETan8opUk3YgWKIFuh8vlPL9IVyvlQzK6RZubJbAvGSr+Ypa5/UR+UIHG7amOlv4nbR6KzGVaD6CGreG3rtSNI3a6aVXqRul2qC4np5SBau7/kYpltK7ei8QvccpFwqz2pnL73jIYPDYFANBOIXVOg1RZGNxLfSMAgl1kn2lfA0wYG0xgKnY/9BeYRXLTdlvEijlB4vS+lDb6JfOyz5t4fcxPn4Mia8hsmZMvaEm1ITIuWZMXDKfFTQL6Ufn/NLvWry7be8GjIfGPLNyL6wLuTso3SWdnm/rR8i5+Xh26+zpbspQXXnxgrZYVZuOTuFH9/yApF26KsdISIBdbFabxJ5vgUSfD1hTnhVOKIDcSsA05ySYJDRIIHfFAPDdi46h02jJ5hg6e0LWPjlHu5eWjIwwdEQT7B8zjxKVuLLxNIvD5oOwb7OPG6IzFD5LniuPdHPYYsEUONxDFBGHb9yZFZH2ktJF+XV1kMQIA4bAUfNVsVSOhUILa86uI8OX+XgAbX7BRaEcM0mYZdXmWYQasmF+0zaH2LJ4pg2HsNs+87Cq9h0Y5ALd2mliaTsMvRBPK+6iUTe6Hvy6suScdCG3OTlpZlTTDyt8WyrKsgTWmHwMqsXWeZRpd/liML8iqoppn+5pPjn+f3SOY90wVwoOAWaGxcwOXzZ3UKbn4qfS60LR3AcL+/8m2c5+riQqBzJATaxqgMIOF417jseOdqdATQosZ7uO4evRaC76fR+ua/fvD18+dvf5K30af3f48lGRHzGGCR0kCJHrStRA8P8WGNzLrJQJRI0VL8iEo0qNAtqdAnx3cnc4FIawwa9IVG1slx2mkJWlpzyRLVsPxnU9M2NZzSZBoQaY1BdVZMJ70QnaU1F6fvu4/e4BO4HnFKk3lApDWGL1UUc0ovPlSRK1gCFOBDld4SS6N5QKRwEufUwCyXKcx9YBb5tE2c/RQGSstJeydvrsfeeAAy3csynePUTAJiOidEOna3/UDf3e44PNUyZKqgVQO1KKStFdH31jjzUKQCeBXIzZY4EwO9sEt6oXX22tmtC4aWGPgBiqECxVB79Ic+RMiWDiyluQHNsHEz/HxeaV00tCDSQzWl9CLSQ151Ue6BbHz9IpTWFUMOWCAUZYTSi9TBcjCAaPhPIpdGVUM5oEA2VMwudn91Q1tDKmmgEM0UUjdbsIqUJnLUaMiK9JNTiPY1mPSBYST0/bmx7eWY9RNpRhSlEpEEzUuD60+Gu7PQev67Tmi9CAVJBPypoLxyAPzJebPiAHghAt02D30JpulyXwK/SPFM7Lw9woRLWe5R8LtzdXyp4rGFS50dln90I2YHVVXs6HCX5TxQFWQvXVDYvXgkXfCVyDlQ/tFkQC8PzlpjXIryH/u4hL/KXswc5VSs9qw8G5AUGa2mEDsHGUrTPWnr4epYRfUdTDeTOCZlpkjQevkh9jNV8Pr/ \ No newline at end of file +7V1bc9s2E/01enRH4sWWHiPZTtM6qRvna+O+aCARluhQhEpCtuRf/4EkqIux1MUmCSLdTmZqQryAwMHh7uJg0bIHs+XHiMynn5lHg5bV9pYt+7JlWR2n64j/JSUrWdLuWVnJJPI9WbYpuPNfaH6iLF34Ho13TuSMBdyf7xaOWRjSMd8pI1HEnndPe2DB7lPnZEKVgrsxCdTSv32PT7PSrtvelP9K/ck0f3KnLX+ZkfxkWRBPiceet4rsq5Y9iBjj2V+z5YAGSevl7ZJdd13w67piEQ35MRdcDW5H7b/bd/aX6Punj7ckfLHuz2w7u80TCRbyjUWNZrLGfJU3Q/zszwISiqP+Awv5nfylLY7HUz/wbsiKLZJqxJyMf+RH/SmL/BdxPgnETx1RIH6OuOxl6zy5mx8EAxawKH2OPfboqDvaufIuuaN8VkRjce1t/s6dV0WfyXLnxBsS87yWLAjIPPZHab2TC2ckmvhhn3Eu3jc7KX/L691KPaT/id9J4E9CUTYWz6JR3hbZ23Sc9PUi9oNuXWqf2z1bdEBf7S3ZgU804nS5VSR77yNlM8qjlThF/mp1JZLkWHLav7hZwfMGmVaOv+kWKq1zOQyJHA2T9c03gBF/SMycgh9HwU9y/gdRtBAjV8XRlMyTP0VDcJ8EX8WQJeEk+bXP2Vx2Q0Af8m6L5Eskf4/yrupsdUV2bj9pRV+M2g+yeOZ7XnrXbXyFLAVwPCdjP5zcZE+xnU3RV/m0pIiJWz4E6YCdipvRMKkN44ST0Rr6c+aHPG1Sty/+iZYfJH3iijcdiOPO5lj8S06P+ICFAiTET1FABT6faYLRvhex+TeBSJq/7fYwc19BzQIBtX+AH4bZKgfLcaCy21VhylUwdfv7u5E02h7oACqOBdQuekoDzyF8JK8pof9WJDgFSFh/PGXldr5POwjZgoRdKyLOFURwf0YznuExf4lE19NqyKZ9GBvpzQxnmjqYxT2SWXIGKh9HFwqOdPNKhh1zSeW8AASlkUp1YOgqYCALz6fhOCcWuuTIKQ3nlK6lm1N6yCklc0q3AATN5xSnrYBhHFHCqTckXLKKMFyEFzyb8xckl4aTS6fd1cwuTkcBFLLLu9hlPUQbzS6P393J7x+f7n+7GfSj++cPy+Cze9ZRg3UjMv5BQxlxoeECiNwhp7ydUwCQHEkzxZyS++F1cAoMIzVmh5yyj1OOpopddOwZww3iFPUDE9GYRk/SCUpaCyml4ZRy3tZNKRZSij5KKTBidVGK6hBzMokzOnmMWYh80nQ+6dYYqIUhrfrRyCd18UlPG530Zv4X52IR3JLbu86vffLvwH4666h8Ir2eYT7VTCbijYZVTTf/V5mlirnl3FCog1lANFnILG8PqOwfn40OqNg3n4a/OZ+8l7vnP+++WN+vbpeXZ4Dvw8dD0f2RaPfhnAX+eIWhFXPYxa4xXAsCCt2gt5PL3hHaaG6B48xqlE2QiwoHFFa2miastN3ecTyynh8qHz2qBg51lU3SVTpFarrm6iodVUWHusoSJhTdAiQ0X1fpqHq4RP0/rJ5q0OotjVe0qyodVUinm1XMsXr3j0vzzF5X9alRAWUuuWiXV7roUpdMLq65PnVXDd4+knARD3MZlC7v+uFhPO710LuGvGvX2vWucwHeFog6XQhF+VxB+TBSv1H1zCShg308Ta3H+nscbBBYlblTXfVbhQ72+z9Y3co/WNUhQlXsTkko+mPtYov26qIVXKsVfDqzAC42/MmqzPBByW7ZpFI0u9goK/j6r6fYdW/8fxbL6A+/e/nn8MvtmRq0i2kc+yxEUjGJVADXujJSAXGEYbu3c8regWkepWDQ7mdiFmjZYr3UgkG7kqnFiJgdqOBRrZUxSdqcr3CBkTGEAqxZrIxQQBShrVKysM4IWwVcnqAqo0YkIOGYRkNklsYtNeoWoGfP0sXKmAWEkyqKQmapa6WR2yxmUU0VMveH4oWT4ArmhDKHVID1i/WSCpor+kjlQhupwFhVly9mmoV0qRGPKNGXbRllC8fLFuy2A6RbhmnFLmFZABgGUKcTcVVAo0QLRWshGyFaACGlziyiZuH98brKZxcrkyx02mrwFjUL2mN2JxOLds1Cp61+rXQTS7Ot4IMo2IzNRoftQEZUzeDEAMaVRgZRinbFAqYFKttOMSJ3BwjffB8pTAtkPrHUKliA0YS2Ssm2ynp8mkct6z3cttAQkBENcA7AHEqpU7JQgCLM3V66/2NE8naYD9WJ5jgxT0YrNFbMYpY6JQswlHAbq7KNFXO3seqpc824N56BrFKnZgEGEm5kVTarGLGRFRgXUkWWuCDEXHKB0i7XG7lFlWXJgdvKk85V6BefK31PvQnN5Rei2aZswkISXG1KRXsvQo96srU359ywFBtJSz5SzleyKcmCM1E05bNcC0WXPv++9fd9cqtfXHl0mUuZ0oNVfhCKF15flBxsXZUcbi5Lj/LrUt3VhyhKIXP1VXJSUphondaXeJtTZiRcZWVbZ6hQkF0cs0U0pvsoQp7Ic7op/ETJcZh0wF4miWhAuP9Ed+pRATYujMNG2zBs5Pkqt7Gxb26ofmjAc3/QRlVjFnmCrNVvCWZVbukWUOZ5nk5OqpxHbsuHkDplWI8GASWUxxs2neOBpiGxMlxldfIQNZQlrHmufAaxOkSoqtr0U4L+c73+c0kMU2eKZbjKajxGN78Y7j93ilLhNsqBhjdcUsEQ08mMpu2X67OzqH/WlkgvzaaXOpMsw4jC8FzZm6IZEZ+Dqw7klxONuYhxJzRjGAUSVNZMKbgEumxKMSJly8MwnN/8z/Pvv317/Itfh18//dMH0sulaicVEBi2a+kO253vhu06F2rYDpw6XMuiSkePGrXDiJ2miN3e0d3INc9gjTFe984P1N5x2uhwHVhzYKtO3AatLpO3HE6pc7kzWGOM0JVMJ0YE6MCaq67PWtyPUn+DOKXO9c5gjVGRWzKnGCHIBbOaqU4QCnKbkaGwmGuasd4ZhBMud9aXoNDSxixwXFGBAob7zbJTal3rDGJI/Tohnbwv2G+CoQJHgdRgSubvyIonqZR1xf0Hl1f9br+Fcf/DcX+nfWy207UxA0FJPm4z2tfP6756ngU+0DoCuiQQzRUKW7ifrAOIFQSv3/UdoFajO7suPc5ENGEmwjp5f3PtUxGWKr3BuYgS5iIq199UhwhVf6PmX0WTvFaT/HRi0T4fYaEAp2xOMVeAY+MGjz8TuWifmLDR4S+ZXGwjPH5wKzlX6foal2y3W9Uv2T68GvuoRd0qEo5dsp3H9w4u53fkJ6q8NdvppeLNyGrrBDk4Nne+ZalNmvv21sWucy/56vrI0x1pu2xAmVXgrS49iFqtSShOQa32hAEQ+vYpNswGn+WUCz5wCsjRAr430h/YqwehtNctPwilgjmy0lNNgL0DrIBJ8gQIg9lLNhll4YOPKScqimEfmGw/KeWE26kz5QQMJdULx5QTNYeND0BqDwWcEjauLOUEXD9VGoZh41pVHNnI1hI2hlUlas56+cFC2alGddhb2KXOdBMwljBzvUZ5WNOUp8DqGDqmwgofPvkeZRm5jBgLkFgaTix1JpqAsYSrZDQSi9MwYlEnEXJiIQvPR2Ixh1hqzTcBgwkV7RqZpSBcp41ZVFFVEnV5jnyOAbwmB/Beq0KPj+CVkX0CxhIcDMYIXsMjeBkHNDeCB+izMIJX71erwKbRFsFTY7pxsp0tetkNM4YPUIv28J2thoJ1E8t/yBbuNcsWBqSfKaugi20Wq2iP3aHmUyOr2AV+kzZWUcMtqZkyjOhstMlCjazScFbRH7iz1eklpJXaaMVuGK0A62wTOwVpxTBaATJS1EwrmNJeI624DaOVn35Xwez9TtVo79126fBik4JerkekbatfgYb26dv6Zq+cuOF9k+uw6+2bEts5n/Spvp0LFqW8ysay3jwxv0VWMXnVprdOXivz6jmuPC5cLKNkiWm/AstJi2XEYcQY3z49IvPpZ+bR5Iz/Aw== \ No newline at end of file diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 983017bd..be9a9c24 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -10,7 +10,7 @@ - [Delete](api/room/delete.md) - [Enter](api/room/enter.md) - [Leave](api/room/leave.md) - - [Message](api/message/md) + - [Message](api/message.md) - [Broadcast](api/message/broadcast.md) - [Unicast](api/message/unicast.md) - [Callback](api/message/callback.md) @@ -25,4 +25,10 @@ - [List](api/rtc_stream/list.md) - [Agent](api/agent.md) - [List](api/agent/list.md) + - [Agent Reader Config](api/agent_reader_config.md) + - [Update](api/agent_reader_config/update.md) + - [Read](api/agent_reader_config/read.md) + - [Agent Writer Config](api/agent_writer_config.md) + - [Update](api/agent_writer_config/update.md) + - [Read](api/agent_writer_config/read.md) - [Errors](api/errors.md) diff --git a/docs/src/api/agent_reader_config.md b/docs/src/api/agent_reader_config.md new file mode 100644 index 00000000..cea8ab50 --- /dev/null +++ b/docs/src/api/agent_reader_config.md @@ -0,0 +1,20 @@ +# Agent Reader Config + +Each agent sets his own config for each writer so he can mute writers selectively just for himself. + +## Properties + +Name | Type | Default | Description +------- | -------- | ---------- | ------------------------------------------ +room_id | uuid | _required_ | The **Room** identifier. +configs | [object] | [] | The list of **Agent Reader Config Items**. + +# Agent Reader Config Item + +## Properties + +Name | Type | Default | Description +------------- | -------- | ---------- | ---------------------------------------------- +agent_id | agent_id | _required_ | Writer identifier which the config applies to. +receive_video | bool | true | Whether to receive video from the writer. +receive_audio | bool | true | Whether to receive audio from the writer. diff --git a/docs/src/api/agent_reader_config/read.md b/docs/src/api/agent_reader_config/read.md new file mode 100644 index 00000000..5e046206 --- /dev/null +++ b/docs/src/api/agent_reader_config/read.md @@ -0,0 +1,25 @@ +# Read + +Retrieve own **Agent Reader Config** state. + + + +## Multicast request + +**Properties** + +Name | Type | Default | Description +---------------- | ------ | ---------- | ---------------------------------- +method | String | _required_ | Always `agent_reader_config.read`. + +**Payload** + +Name | Type | Default | Description +------- | ---- | ---------- | -------------------- +room_id | Uuid | _required_ | The room identifier. + + + +## Unicast response + +If successful, the response payload contains a requested **Agent Reader Config** state. diff --git a/docs/src/api/agent_reader_config/update.md b/docs/src/api/agent_reader_config/update.md new file mode 100644 index 00000000..fd902c7a --- /dev/null +++ b/docs/src/api/agent_reader_config/update.md @@ -0,0 +1,31 @@ +# Update + +Update or initialize own **Agent Reader Configs** in bulk. +Configs get merged into the current state so one may send only diffs. + +One must enter the room first and the room must be opened. + +The room must have `owned` RTC sharing policy. + +The writer for which we want to apply config for must have created an owned RTC in the room. + +## Multicast request + +**Properties** + +Name | Type | Default | Description +---------------- | ------ | ---------- | ---------------------------------------- +method | String | _required_ | Always `agent_reader_connection.update`. + +**Payload** + +Name | Type | Default | Description +------- | -------- | ---------- | ---------------------------------------------- +room_id | uuid | _required_ | The **Room** identifier. +configs | [object] | [] | Array of **[Agent Reader Config Item](../agent_reader_config.html#agent-reader-config-item)** objects. + +## Unicast response + +If successful, the response payload contains current +**[Agent Reader Config](../agent_reader_config.html#agent-reader-config)** state for all RTCs +in the room for the agent that made the request. diff --git a/docs/src/api/agent_writer_config.md b/docs/src/api/agent_writer_config.md new file mode 100644 index 00000000..30919f4c --- /dev/null +++ b/docs/src/api/agent_writer_config.md @@ -0,0 +1,24 @@ +# Agent Writer Config + +**Agent Writer Config** affects the writer and hence all of his readers. +Muting the writer on this level also prevents media to be recorded. +One can also set REMBs to control requested media bitrate. + +## Properties + +Name | Type | Default | Description +------- | -------- | ---------- | ------------------------------------------ +room_id | uuid | _required_ | The **Room** identifier. +configs | [object] | [] | The list of **Agent Writer Config Items**. + +# Agent Writer Config Item + +## Properties + +Name | Type | Default | Description +--------- | -------- | ---------- | ----------------------------------------------- +agent_id | agent_id | _required_ | Writer identifier which the config applies to. +send_video | bool | true | Whether the writer is allowed to publish video. +send_audio | bool | true | Whether the writer is allowed to publish audio. +video_remb | int | _required_ | Maximum video bitrate requested for the writer. +audio_remb | int | _required_ | Maximum audio bitrate requested for the writer. diff --git a/docs/src/api/agent_writer_config/read.md b/docs/src/api/agent_writer_config/read.md new file mode 100644 index 00000000..d95d3dae --- /dev/null +++ b/docs/src/api/agent_writer_config/read.md @@ -0,0 +1,25 @@ +# Read + +Retrieve **Agent Writer Config** state. + + + +## Multicast request + +**Properties** + +Name | Type | Default | Description +---------------- | ------ | ---------- | ---------------------------------- +method | String | _required_ | Always `agent_writer_config.read`. + +**Payload** + +Name | Type | Default | Description +------- | ---- | ---------- | -------------------- +room_id | Uuid | _required_ | The room identifier. + + + +## Unicast response + +If successful, the response payload contains a requested **Agent Writer Config** state. diff --git a/docs/src/api/agent_writer_config/update.md b/docs/src/api/agent_writer_config/update.md new file mode 100644 index 00000000..67f0dc94 --- /dev/null +++ b/docs/src/api/agent_writer_config/update.md @@ -0,0 +1,36 @@ +# Update + +Update or initialize own **Agent Writer Configs** in bulk. +Configs get merged into the current state so one may send only diffs. + +One must enter the room first and the room must be opened. + +## Multicast request + +**Properties** + +Name | Type | Default | Description +---------------- | ------ | ---------- | ---------------------------------------- +method | String | _required_ | Always `agent_writer_connection.update`. + +**Payload** + +Name | Type | Default | Description +------- | -------- | ---------- | ---------------------------------------------- +room_id | uuid | _required_ | The **Room** identifier. +configs | [object] | [] | Array of **[Agent Writer Config Item](../agent_writer_config.md#agent-writer-config-item)** objects. + +## Unicast response + +If successful, the response payload contains current +**[Agent Writer Config](../agent_writer_config.md#agent-writer-config)** state for all RTCs in the room. + +## Broadcast event + +A notification is being sent to the _audience_ topic. + +**URI:** `audiences/:audience/events` + +**Label:** `agent_writer_config.update`. + +**Payload:** current **Agent Writer Config** state for all RTCs in the room. diff --git a/docs/src/api/errors.md b/docs/src/api/errors.md index 3dc09f26..780811da 100644 --- a/docs/src/api/errors.md +++ b/docs/src/api/errors.md @@ -39,6 +39,7 @@ The following types are a part of the service's API and are guaranteed to mainta - `invalid_jsep_format` – Failed to determine whether the SDP is recvonly. - `invalid_sdp_type` – Failed to parse SDP type or an SDP answer is received. - `invalid_subscription_object` – An object for dynamic subscription is not of format `["rooms", UUID, "events"]`. +- `invalid_payload` – A validation on a request payload as failed. - `message_building_failed` – An error occurred while building a message to another service. - `message_handling_failed` – An incoming message is likely to have non-valid JSON payload or missing required properties. - `message_parsing_failed` – Failed to parse a message from another service. diff --git a/docs/src/authz.md b/docs/src/authz.md index de456adb..322fd869 100644 --- a/docs/src/authz.md +++ b/docs/src/authz.md @@ -22,13 +22,13 @@ Subject's namespace and account label are retrieved from `audience` and `account Possible values for `OBJECT` and `ACTION`: -object / action | create | read | update | delete | list | subscribe --------------------------------------- | ------ | ---- | ------ | ------ | ---- | --------- -["rooms"] | + | | | | + | -["rooms", ROOM_ID] | | + | + | + | | -["rooms", ROOM_ID, "agents"] | | | | | + | -["rooms", ROOM_ID, "agents", AGENT_ID] | | + | + | | | -["rooms", ROOM_ID, "rtcs"] | + | | | | + | -["rooms", ROOM_ID, "rtcs", RTC_ID] | | + | + | + | | -["rooms", ROOM_ID, "events"] | | | | | | + -["audiences", AUDIENCE, "events"] | | | | | | + \ No newline at end of file +object / action | create | read | update | delete | list | subscribe +----------------------------------------- | ------ | ---- | ------ | ------ | ---- | --------- +["rooms"] | + | | | | + | +["rooms", ROOM_ID] | | + | + | + | | +["rooms", ROOM_ID, "agents"] | | | + | | + | +["rooms", ROOM_ID, "agents", AGENT_ID] | | + | + | | | +["rooms", ROOM_ID, "rtcs"] | + | | | | + | +["rooms", ROOM_ID, "rtcs", RTC_ID] | | + | + | + | | +["rooms", ROOM_ID, "events"] | | | | | | + +["audiences", AUDIENCE, "events"] | | | | | | + diff --git a/migrations/2021-03-18-234004_create_rtc_rw_configs/down.sql b/migrations/2021-03-18-234004_create_rtc_rw_configs/down.sql new file mode 100644 index 00000000..5808d77b --- /dev/null +++ b/migrations/2021-03-18-234004_create_rtc_rw_configs/down.sql @@ -0,0 +1,2 @@ +DROP TABLE rtc_reader_config; +DROP TABLE rtc_writer_config; diff --git a/migrations/2021-03-18-234004_create_rtc_rw_configs/up.sql b/migrations/2021-03-18-234004_create_rtc_rw_configs/up.sql new file mode 100644 index 00000000..5ef30cd2 --- /dev/null +++ b/migrations/2021-03-18-234004_create_rtc_rw_configs/up.sql @@ -0,0 +1,20 @@ +CREATE TABLE rtc_reader_config ( + rtc_id UUID NOT NULL, + reader_id agent_id NOT NULL, + receive_video BOOLEAN NOT NULL, + receive_audio BOOLEAN NOT NULL, + + FOREIGN KEY (rtc_id) REFERENCES rtc (id) ON DELETE CASCADE, + PRIMARY KEY (rtc_id, reader_id) +); + +CREATE TABLE rtc_writer_config ( + rtc_id UUID NOT NULL, + send_video BOOLEAN NOT NULL, + send_audio BOOLEAN NOT NULL, + audio_remb BIGINT CHECK (video_remb IS NULL OR video_remb > 0), + video_remb BIGINT CHECK (audio_remb IS NULL OR audio_remb > 0), + + FOREIGN KEY (rtc_id) REFERENCES rtc (id) ON DELETE CASCADE, + PRIMARY KEY (rtc_id) +); diff --git a/src/app/endpoint/agent_reader_config.rs b/src/app/endpoint/agent_reader_config.rs new file mode 100644 index 00000000..2d19487b --- /dev/null +++ b/src/app/endpoint/agent_reader_config.rs @@ -0,0 +1,915 @@ +use std::collections::HashMap; + +use async_std::stream; +use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; +use svc_agent::{ + mqtt::{IncomingRequestProperties, ResponseStatus}, + Addressable, AgentId, +}; +use uuid::Uuid; + +use crate::app::context::Context; +use crate::app::endpoint::prelude::*; +use crate::db; +use crate::db::rtc::Object as Rtc; +use crate::db::rtc_reader_config::Object as RtcReaderConfig; +use crate::diesel::Connection; + +const MAX_STATE_CONFIGS_LEN: usize = 20; + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct State { + room_id: Uuid, + configs: Vec, +} + +impl State { + fn new(room_id: Uuid, rtc_reader_configs: &[(RtcReaderConfig, Rtc)]) -> State { + let configs = rtc_reader_configs + .into_iter() + .map(|(rtc_reader_config, rtc)| { + StateConfigItem::new(rtc.created_by().to_owned()) + .receive_video(rtc_reader_config.receive_video()) + .receive_audio(rtc_reader_config.receive_audio()) + }) + .collect::>(); + + Self { room_id, configs } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct StateConfigItem { + agent_id: AgentId, + receive_video: Option, + receive_audio: Option, +} + +impl StateConfigItem { + fn new(agent_id: AgentId) -> Self { + Self { + agent_id, + receive_video: None, + receive_audio: None, + } + } + + fn receive_video(self, receive_video: bool) -> Self { + Self { + receive_video: Some(receive_video), + ..self + } + } + + fn receive_audio(self, receive_audio: bool) -> Self { + Self { + receive_audio: Some(receive_audio), + ..self + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct UpdateHandler; + +#[async_trait] +impl RequestHandler for UpdateHandler { + type Payload = State; + const ERROR_TITLE: &'static str = "Failed to update agent reader config"; + + async fn handle( + context: &mut C, + payload: Self::Payload, + reqp: &IncomingRequestProperties, + ) -> Result { + if payload.configs.len() > MAX_STATE_CONFIGS_LEN { + return Err(anyhow!("Too many items in `configs` list")) + .error(AppErrorKind::InvalidPayload)?; + } + + let room = + helpers::find_room_by_id(context, payload.room_id, helpers::RoomTimeRequirement::Open)?; + + if room.rtc_sharing_policy() != db::rtc::SharingPolicy::Owned { + return Err(anyhow!( + "Agent reader config is available only for rooms with owned RTC sharing policy" + )) + .error(AppErrorKind::InvalidPayload)?; + } + + let conn = context.get_conn()?; + helpers::check_room_presence(&room, reqp.as_agent_id(), &conn)?; + + let rtc_reader_configs_with_rtcs = conn.transaction::<_, AppError, _>(|| { + // Find RTCs owned by agents. + let agent_ids = payload + .configs + .iter() + .map(|c| &c.agent_id) + .collect::>(); + + let rtcs = db::rtc::ListQuery::new() + .room_id(room.id()) + .created_by(agent_ids.as_slice()) + .execute(&conn)?; + + let agents_to_rtcs = rtcs + .iter() + .map(|rtc| (rtc.created_by(), rtc.id())) + .collect::>(); + + // Create or update the config. + for state_config_item in payload.configs { + let rtc_id = agents_to_rtcs + .get(&state_config_item.agent_id) + .ok_or_else(|| anyhow!("{} has no owned RTC", state_config_item.agent_id)) + .error(AppErrorKind::InvalidPayload)?; + + let mut q = db::rtc_reader_config::UpsertQuery::new(*rtc_id, reqp.as_agent_id()); + + if let Some(receive_video) = state_config_item.receive_video { + q = q.receive_video(receive_video); + } + + if let Some(receive_audio) = state_config_item.receive_audio { + q = q.receive_audio(receive_audio); + } + + q.execute(&conn)?; + } + + // Retrieve state data. + let rtc_reader_configs_with_rtcs = + db::rtc_reader_config::ListWithRtcQuery::new(room.id(), reqp.as_agent_id()) + .execute(&conn)?; + + Ok(rtc_reader_configs_with_rtcs) + })?; + + // Respond to the agent. + let response = helpers::build_response( + ResponseStatus::OK, + State::new(room.id(), &rtc_reader_configs_with_rtcs), + reqp, + context.start_timestamp(), + None, + ); + + let mut messages = vec![response]; + + // Find backend and send updates to it if present. + let maybe_backend = match room.backend_id() { + None => None, + Some(backend_id) => db::janus_backend::FindQuery::new() + .id(backend_id) + .execute(&conn)?, + }; + + if let Some(backend) = maybe_backend { + let backend_request = context + .janus_client() + .update_agent_reader_config_request( + reqp.to_owned(), + &backend, + &rtc_reader_configs_with_rtcs, + context.start_timestamp(), + ) + .or_else(|err| Err(err).error(AppErrorKind::MessageBuildingFailed))?; + + messages.push(Box::new(backend_request)); + } + + Ok(Box::new(stream::from_iter(messages))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Deserialize)] +pub(crate) struct ReadRequest { + room_id: Uuid, +} + +pub(crate) struct ReadHandler; + +#[async_trait] +impl RequestHandler for ReadHandler { + type Payload = ReadRequest; + const ERROR_TITLE: &'static str = "Failed to read agent reader config"; + + async fn handle( + context: &mut C, + payload: Self::Payload, + reqp: &IncomingRequestProperties, + ) -> Result { + let room = + helpers::find_room_by_id(context, payload.room_id, helpers::RoomTimeRequirement::Open)?; + + if room.rtc_sharing_policy() != db::rtc::SharingPolicy::Owned { + return Err(anyhow!( + "Agent reader config is available only for rooms with owned RTC sharing policy" + )) + .error(AppErrorKind::InvalidPayload)?; + } + + let conn = context.get_conn()?; + helpers::check_room_presence(&room, reqp.as_agent_id(), &conn)?; + + let rtc_reader_configs_with_rtcs = + db::rtc_reader_config::ListWithRtcQuery::new(room.id(), reqp.as_agent_id()) + .execute(&conn)?; + + Ok(Box::new(stream::once(helpers::build_response( + ResponseStatus::OK, + State::new(room.id(), &rtc_reader_configs_with_rtcs), + reqp, + context.start_timestamp(), + None, + )))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + mod update { + use std::ops::Bound; + + use chrono::{Duration, Utc}; + use serde_derive::Deserialize; + use uuid::Uuid; + + use crate::backend::janus::{self, requests::UpdateReaderConfigRequestBody}; + use crate::db::rtc::SharingPolicy as RtcSharingPolicy; + use crate::test_helpers::prelude::*; + + use super::super::*; + + #[derive(Debug, Deserialize)] + struct UpdateReaderConfigJanusRequest { + janus: String, + session_id: i64, + handle_id: i64, + body: UpdateReaderConfigRequestBody, + } + + #[async_std::test] + async fn update_agent_reader_config() -> std::io::Result<()> { + let db = TestDb::new(); + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + let agent3 = TestAgent::new("web", "user3", USR_AUDIENCE); + let agent4 = TestAgent::new("web", "user4", USR_AUDIENCE); + + // Insert a room with agents and RTCs. + let (room, backend, rtcs) = db + .connection_pool() + .get() + .map(|conn| { + let backend = shared_helpers::insert_janus_backend(&conn); + + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .backend_id(backend.id()) + .insert(&conn); + + for agent in &[&agent1, &agent2, &agent3, &agent4] { + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + } + + let rtcs = vec![&agent2, &agent3, &agent4] + .into_iter() + .map(|agent| { + factory::Rtc::new(room.id()) + .created_by(agent.agent_id().to_owned()) + .insert(&conn) + }) + .collect::>(); + + (room, backend, rtcs) + }) + .unwrap(); + + // Make agent_reader_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![ + StateConfigItem { + agent_id: agent2.agent_id().to_owned(), + receive_video: Some(true), + receive_audio: Some(false), + }, + StateConfigItem { + agent_id: agent3.agent_id().to_owned(), + receive_video: Some(false), + receive_audio: Some(false), + }, + ], + }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent reader config update failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 2); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.receive_video, Some(true)); + assert_eq!(agent2_config.receive_audio, Some(false)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.receive_video, Some(false)); + assert_eq!(agent3_config.receive_audio, Some(false)); + + // Assert backend request. + let (req, _reqp, topic) = + find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, expected_topic); + assert_eq!(req.janus, "message"); + assert_eq!(req.session_id, backend.session_id()); + assert_eq!(req.handle_id, backend.handle_id()); + assert_eq!(req.body.method(), "reader_config.update"); + + let configs = req.body.configs(); + assert_eq!(configs.len(), 2); + + let agent2_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[0].id()) + .expect("Config for agent2's RTC not found"); + + assert_eq!(agent2_config.reader_id(), agent1.agent_id()); + assert_eq!(agent2_config.receive_video(), true); + assert_eq!(agent2_config.receive_audio(), false); + + let agent3_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[1].id()) + .expect("Config for agent3's RTC not found"); + + assert_eq!(agent3_config.reader_id(), agent1.agent_id()); + assert_eq!(agent3_config.receive_video(), false); + assert_eq!(agent3_config.receive_audio(), false); + + // Make one more agent_reader_config.update request. + let payload = State { + room_id: room.id(), + configs: vec![ + StateConfigItem { + agent_id: agent4.agent_id().to_owned(), + receive_video: Some(true), + receive_audio: Some(true), + }, + StateConfigItem { + agent_id: agent3.agent_id().to_owned(), + receive_video: None, + receive_audio: Some(true), + }, + ], + }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent reader config update failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 3); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.receive_video, Some(true)); + assert_eq!(agent2_config.receive_audio, Some(false)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.receive_video, Some(false)); + assert_eq!(agent3_config.receive_audio, Some(true)); + + let agent4_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent4.agent_id()) + .expect("Config for agent4 not found"); + + assert_eq!(agent4_config.receive_video, Some(true)); + assert_eq!(agent4_config.receive_audio, Some(true)); + + // Assert backend request. + let (req, _reqp, topic) = + find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, expected_topic); + assert_eq!(req.janus, "message"); + assert_eq!(req.session_id, backend.session_id()); + assert_eq!(req.handle_id, backend.handle_id()); + assert_eq!(req.body.method(), "reader_config.update"); + + let configs = req.body.configs(); + assert_eq!(configs.len(), 3); + + let agent2_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[0].id()) + .expect("Config for agent2's RTC not found"); + + assert_eq!(agent2_config.reader_id(), agent1.agent_id()); + assert_eq!(agent2_config.receive_video(), true); + assert_eq!(agent2_config.receive_audio(), false); + + let agent3_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[1].id()) + .expect("Config for agent3's RTC not found"); + + assert_eq!(agent3_config.reader_id(), agent1.agent_id()); + assert_eq!(agent3_config.receive_video(), false); + assert_eq!(agent3_config.receive_audio(), true); + + let agent4_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[2].id()) + .expect("Config for agent4's RTC not found"); + + assert_eq!(agent4_config.reader_id(), agent1.agent_id()); + assert_eq!(agent4_config.receive_video(), true); + assert_eq!(agent4_config.receive_audio(), true); + Ok(()) + } + + #[async_std::test] + async fn too_many_config_items() -> std::io::Result<()> { + // Make agent_reader_config.update request. + let agent = TestAgent::new("web", "user", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let configs = (0..(MAX_STATE_CONFIGS_LEN + 1)) + .map(|i| { + let agent = TestAgent::new("web", &format!("user{}", i), USR_AUDIENCE); + + StateConfigItem { + agent_id: agent.agent_id().to_owned(), + receive_video: Some(false), + receive_audio: Some(true), + } + }) + .collect::>(); + + let payload = State { + room_id: Uuid::new_v4(), + configs, + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn agent_without_rtc() -> std::io::Result<()> { + let db = TestDb::new(); + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + + // Insert a room with agents. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent1.agent_id(), room.id()); + shared_helpers::insert_agent(&conn, agent2.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_reader_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![StateConfigItem { + agent_id: agent2.agent_id().to_owned(), + receive_video: Some(false), + receive_audio: Some(true), + }], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent1, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn not_entered() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room. + let room = db + .connection_pool() + .get() + .map(|conn| shared_helpers::insert_room_with_owned(&conn)) + .unwrap(); + + // Make agent_reader_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "agent_not_entered_the_room"); + Ok(()) + } + + #[async_std::test] + async fn closed_room() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(Utc::now() - Duration::hours(2)), + Bound::Excluded(Utc::now() - Duration::hours(1)), + )) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_reader_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_closed"); + Ok(()) + } + + #[async_std::test] + async fn room_with_wrong_rtc_policy() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Shared) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_reader_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn missing_room() -> std::io::Result<()> { + // Make agent_reader_config.update request. + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let payload = State { + room_id: Uuid::new_v4(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_not_found"); + Ok(()) + } + } + + mod read { + use std::ops::Bound; + + use chrono::{Duration, Utc}; + use uuid::Uuid; + + use crate::db::rtc::SharingPolicy as RtcSharingPolicy; + use crate::test_helpers::prelude::*; + + use super::super::*; + + #[async_std::test] + async fn read_state() -> std::io::Result<()> { + let db = TestDb::new(); + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + let agent3 = TestAgent::new("web", "user3", USR_AUDIENCE); + + // Insert a room with RTCs and agent reader configs. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent1.agent_id(), room.id()); + + let rtc2 = factory::Rtc::new(room.id()) + .created_by(agent2.agent_id().to_owned()) + .insert(&conn); + + factory::RtcReaderConfig::new(&rtc2, agent1.agent_id()) + .receive_video(true) + .receive_audio(true) + .insert(&conn); + + let rtc3 = factory::Rtc::new(room.id()) + .created_by(agent3.agent_id().to_owned()) + .insert(&conn); + + factory::RtcReaderConfig::new(&rtc3, agent1.agent_id()) + .receive_video(false) + .receive_audio(false) + .insert(&conn); + + room + }) + .unwrap(); + + // Make agent_reader_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent reader config read failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 2); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.receive_video, Some(true)); + assert_eq!(agent2_config.receive_audio, Some(true)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.receive_video, Some(false)); + assert_eq!(agent3_config.receive_audio, Some(false)); + + Ok(()) + } + + #[async_std::test] + async fn not_entered() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room. + let room = db + .connection_pool() + .get() + .map(|conn| shared_helpers::insert_room_with_owned(&conn)) + .unwrap(); + + // Make agent_reader_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "agent_not_entered_the_room"); + Ok(()) + } + + #[async_std::test] + async fn closed_room() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(Utc::now() - Duration::hours(2)), + Bound::Excluded(Utc::now() - Duration::hours(1)), + )) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_reader_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_closed"); + Ok(()) + } + + #[async_std::test] + async fn wrong_rtc_sharing_policy() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Shared) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_reader_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn missing_room() -> std::io::Result<()> { + // Make agent_reader_config.read request. + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let payload = ReadRequest { + room_id: Uuid::new_v4(), + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent reader config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_not_found"); + Ok(()) + } + } +} diff --git a/src/app/endpoint/agent_writer_config.rs b/src/app/endpoint/agent_writer_config.rs new file mode 100644 index 00000000..7b34d72e --- /dev/null +++ b/src/app/endpoint/agent_writer_config.rs @@ -0,0 +1,1069 @@ +use std::collections::HashMap; + +use async_std::stream; +use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; +use svc_agent::{ + mqtt::{IncomingRequestProperties, ResponseStatus}, + Addressable, AgentId, +}; +use uuid::Uuid; + +use crate::app::context::Context; +use crate::app::endpoint::prelude::*; +use crate::db; +use crate::db::rtc::Object as Rtc; +use crate::db::rtc_writer_config::Object as RtcWriterConfig; +use crate::diesel::Connection; + +const MAX_STATE_CONFIGS_LEN: usize = 20; + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct State { + room_id: Uuid, + configs: Vec, +} + +impl State { + fn new(room_id: Uuid, rtc_writer_configs_with_rtcs: &[(RtcWriterConfig, Rtc)]) -> State { + let configs = rtc_writer_configs_with_rtcs + .into_iter() + .map(|(rtc_writer_config, rtc)| { + let mut config_item = StateConfigItem::new(rtc.created_by().to_owned()) + .send_video(rtc_writer_config.send_video()) + .send_audio(rtc_writer_config.send_audio()); + + if let Some(video_remb) = rtc_writer_config.video_remb() { + config_item = config_item.video_remb(video_remb as u32); + } + + if let Some(audio_remb) = rtc_writer_config.audio_remb() { + config_item = config_item.audio_remb(audio_remb as u32); + } + + config_item + }) + .collect::>(); + + Self { room_id, configs } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct StateConfigItem { + agent_id: AgentId, + send_video: Option, + send_audio: Option, + video_remb: Option, + audio_remb: Option, +} + +impl StateConfigItem { + fn new(agent_id: AgentId) -> Self { + Self { + agent_id, + send_video: None, + send_audio: None, + video_remb: None, + audio_remb: None, + } + } + + fn send_video(self, send_video: bool) -> Self { + Self { + send_video: Some(send_video), + ..self + } + } + + fn send_audio(self, send_audio: bool) -> Self { + Self { + send_audio: Some(send_audio), + ..self + } + } + + fn video_remb(self, video_remb: u32) -> Self { + Self { + video_remb: Some(video_remb), + ..self + } + } + + fn audio_remb(self, audio_remb: u32) -> Self { + Self { + audio_remb: Some(audio_remb), + ..self + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct UpdateHandler; + +#[async_trait] +impl RequestHandler for UpdateHandler { + type Payload = State; + const ERROR_TITLE: &'static str = "Failed to update agent writer config"; + + async fn handle( + context: &mut C, + payload: Self::Payload, + reqp: &IncomingRequestProperties, + ) -> Result { + if payload.configs.len() > MAX_STATE_CONFIGS_LEN { + return Err(anyhow!("Too many items in `configs` list")) + .error(AppErrorKind::InvalidPayload)?; + } + + let room = + helpers::find_room_by_id(context, payload.room_id, helpers::RoomTimeRequirement::Open)?; + + if room.rtc_sharing_policy() != db::rtc::SharingPolicy::Owned { + return Err(anyhow!( + "Agent writer config is available only for rooms with owned RTC sharing policy" + )) + .error(AppErrorKind::InvalidPayload)?; + } + + { + let conn = context.get_conn()?; + helpers::check_room_presence(&room, reqp.as_agent_id(), &conn)?; + } + + // Authorize agent writer config updating on the tenant. + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "agents"]; + + let authz_time = context + .authz() + .authorize(room.audience(), reqp, object, "update") + .await?; + + let conn = context.get_conn()?; + + let rtc_writer_configs_with_rtcs = conn.transaction::<_, AppError, _>(|| { + // Find RTCs owned by agents. + let agent_ids = payload + .configs + .iter() + .map(|c| &c.agent_id) + .collect::>(); + + let rtcs = db::rtc::ListQuery::new() + .room_id(room.id()) + .created_by(agent_ids.as_slice()) + .execute(&conn)?; + + let agents_to_rtcs = rtcs + .iter() + .map(|rtc| (rtc.created_by(), rtc.id())) + .collect::>(); + + // Create or update the config. + for state_config_item in payload.configs { + let rtc_id = agents_to_rtcs + .get(&state_config_item.agent_id) + .ok_or_else(|| anyhow!("{} has no owned RTC", state_config_item.agent_id)) + .error(AppErrorKind::InvalidPayload)?; + + let mut q = db::rtc_writer_config::UpsertQuery::new(*rtc_id); + + if let Some(send_video) = state_config_item.send_video { + q = q.send_video(send_video); + } + + if let Some(send_audio) = state_config_item.send_audio { + q = q.send_audio(send_audio); + } + + if let Some(video_remb) = state_config_item.video_remb { + q = q.video_remb(video_remb.into()); + } + + if let Some(audio_remb) = state_config_item.audio_remb { + q = q.audio_remb(audio_remb.into()); + } + + q.execute(&conn)?; + } + + // Retrieve state data. + let rtc_writer_configs_with_rtcs = + db::rtc_writer_config::ListWithRtcQuery::new(room.id()).execute(&conn)?; + + Ok(rtc_writer_configs_with_rtcs) + })?; + + // Respond to the agent and broadcast notification. + let state = State::new(room.id(), &rtc_writer_configs_with_rtcs); + + let response = helpers::build_response( + ResponseStatus::OK, + state.clone(), + reqp, + context.start_timestamp(), + Some(authz_time), + ); + + let notification = helpers::build_notification( + "agent_writer_config.update", + &format!("rooms/{}/events", room.id()), + state, + reqp, + context.start_timestamp(), + ); + + let mut messages = vec![response, notification]; + + // Find backend and send updates to it if present. + let maybe_backend = match room.backend_id() { + None => None, + Some(backend_id) => db::janus_backend::FindQuery::new() + .id(backend_id) + .execute(&conn)?, + }; + + if let Some(backend) = maybe_backend { + let backend_request = context + .janus_client() + .update_agent_writer_config_request( + reqp.to_owned(), + &backend, + &rtc_writer_configs_with_rtcs, + context.start_timestamp(), + authz_time, + ) + .or_else(|err| Err(err).error(AppErrorKind::MessageBuildingFailed))?; + + messages.push(Box::new(backend_request)); + } + + Ok(Box::new(stream::from_iter(messages))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Deserialize)] +pub(crate) struct ReadRequest { + room_id: Uuid, +} + +pub(crate) struct ReadHandler; + +#[async_trait] +impl RequestHandler for ReadHandler { + type Payload = ReadRequest; + const ERROR_TITLE: &'static str = "Failed to read agent writer config"; + + async fn handle( + context: &mut C, + payload: Self::Payload, + reqp: &IncomingRequestProperties, + ) -> Result { + let room = + helpers::find_room_by_id(context, payload.room_id, helpers::RoomTimeRequirement::Open)?; + + if room.rtc_sharing_policy() != db::rtc::SharingPolicy::Owned { + return Err(anyhow!( + "Agent writer config is available only for rooms with owned RTC sharing policy" + )) + .error(AppErrorKind::InvalidPayload)?; + } + + let conn = context.get_conn()?; + helpers::check_room_presence(&room, reqp.as_agent_id(), &conn)?; + + let rtc_writer_configs_with_rtcs = + db::rtc_writer_config::ListWithRtcQuery::new(room.id()).execute(&conn)?; + + Ok(Box::new(stream::once(helpers::build_response( + ResponseStatus::OK, + State::new(room.id(), &rtc_writer_configs_with_rtcs), + reqp, + context.start_timestamp(), + None, + )))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + mod update { + use std::ops::Bound; + + use chrono::{Duration, Utc}; + use serde_derive::Deserialize; + use uuid::Uuid; + + use crate::backend::janus::{self, requests::UpdateWriterConfigRequestBody}; + use crate::db::rtc::SharingPolicy as RtcSharingPolicy; + use crate::test_helpers::prelude::*; + + use super::super::*; + + #[derive(Debug, Deserialize)] + struct UpdateWriterConfigJanusRequest { + janus: String, + session_id: i64, + handle_id: i64, + body: UpdateWriterConfigRequestBody, + } + + #[async_std::test] + async fn update_agent_writer_config() -> std::io::Result<()> { + let db = TestDb::new(); + let mut authz = TestAuthz::new(); + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + let agent3 = TestAgent::new("web", "user3", USR_AUDIENCE); + let agent4 = TestAgent::new("web", "user4", USR_AUDIENCE); + + // Insert a room with agents and RTCs. + let (room, backend, rtcs) = db + .connection_pool() + .get() + .map(|conn| { + let backend = shared_helpers::insert_janus_backend(&conn); + + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .backend_id(backend.id()) + .insert(&conn); + + for agent in &[&agent1, &agent2, &agent3, &agent4] { + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + } + + let rtcs = vec![&agent2, &agent3, &agent4] + .into_iter() + .map(|agent| { + factory::Rtc::new(room.id()) + .created_by(agent.agent_id().to_owned()) + .insert(&conn) + }) + .collect::>(); + + (room, backend, rtcs) + }) + .unwrap(); + + // Allow agent to update agent_writer_config. + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "agents"]; + authz.allow(agent1.account_id(), object, "update"); + + // Make agent_writer_config.update request. + let mut context = TestContext::new(db, authz); + + let payload = State { + room_id: room.id(), + configs: vec![ + StateConfigItem { + agent_id: agent2.agent_id().to_owned(), + send_video: Some(true), + send_audio: Some(false), + video_remb: Some(300_000), + audio_remb: Some(60_000), + }, + StateConfigItem { + agent_id: agent3.agent_id().to_owned(), + send_video: Some(false), + send_audio: Some(false), + video_remb: None, + audio_remb: None, + }, + ], + }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent writer config update failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 2); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.send_video, Some(true)); + assert_eq!(agent2_config.send_audio, Some(false)); + assert_eq!(agent2_config.video_remb, Some(300_000)); + assert_eq!(agent2_config.audio_remb, Some(60_000)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.send_video, Some(false)); + assert_eq!(agent3_config.send_audio, Some(false)); + assert_eq!(agent3_config.video_remb, None); + assert_eq!(agent3_config.audio_remb, None); + + // Assert notification. + let (state, evp, _) = find_event::(messages.as_slice()); + assert_eq!(evp.label(), "agent_writer_config.update"); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 2); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.send_video, Some(true)); + assert_eq!(agent2_config.send_audio, Some(false)); + assert_eq!(agent2_config.video_remb, Some(300_000)); + assert_eq!(agent2_config.audio_remb, Some(60_000)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.send_video, Some(false)); + assert_eq!(agent3_config.send_audio, Some(false)); + assert_eq!(agent3_config.video_remb, None); + assert_eq!(agent3_config.audio_remb, None); + + // Assert backend request. + let (req, _reqp, topic) = + find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, expected_topic); + assert_eq!(req.janus, "message"); + assert_eq!(req.session_id, backend.session_id()); + assert_eq!(req.handle_id, backend.handle_id()); + assert_eq!(req.body.method(), "writer_config.update"); + + let configs = req.body.configs(); + assert_eq!(configs.len(), 2); + + let agent2_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[0].id()) + .expect("Config for agent2's RTC not found"); + + assert_eq!(agent2_config.send_video(), true); + assert_eq!(agent2_config.send_audio(), false); + assert_eq!(agent2_config.video_remb(), Some(300_000)); + assert_eq!(agent2_config.audio_remb(), Some(60_000)); + + let agent3_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[1].id()) + .expect("Config for agent3's RTC not found"); + + assert_eq!(agent3_config.send_video(), false); + assert_eq!(agent3_config.send_audio(), false); + assert_eq!(agent3_config.video_remb(), None); + assert_eq!(agent3_config.audio_remb(), None); + + // Make one more agent_writer_config.update request. + let payload = State { + room_id: room.id(), + configs: vec![ + StateConfigItem { + agent_id: agent4.agent_id().to_owned(), + send_video: Some(true), + send_audio: Some(true), + video_remb: Some(1_000_000), + audio_remb: Some(60_000), + }, + StateConfigItem { + agent_id: agent3.agent_id().to_owned(), + send_video: None, + send_audio: Some(true), + video_remb: None, + audio_remb: Some(30_000), + }, + ], + }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent writer config update failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 3); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.send_video, Some(true)); + assert_eq!(agent2_config.send_audio, Some(false)); + assert_eq!(agent2_config.video_remb, Some(300_000)); + assert_eq!(agent2_config.audio_remb, Some(60_000)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.send_video, Some(false)); + assert_eq!(agent3_config.send_audio, Some(true)); + assert_eq!(agent3_config.video_remb, None); + assert_eq!(agent3_config.audio_remb, Some(30_000)); + + let agent4_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent4.agent_id()) + .expect("Config for agent4 not found"); + + assert_eq!(agent4_config.send_video, Some(true)); + assert_eq!(agent4_config.send_audio, Some(true)); + assert_eq!(agent4_config.video_remb, Some(1_000_000)); + assert_eq!(agent4_config.audio_remb, Some(60_000)); + + // Assert notification. + let (state, evp, _) = find_event::(messages.as_slice()); + assert_eq!(evp.label(), "agent_writer_config.update"); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 3); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.send_video, Some(true)); + assert_eq!(agent2_config.send_audio, Some(false)); + assert_eq!(agent2_config.video_remb, Some(300_000)); + assert_eq!(agent2_config.audio_remb, Some(60_000)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.send_video, Some(false)); + assert_eq!(agent3_config.send_audio, Some(true)); + assert_eq!(agent3_config.video_remb, None); + assert_eq!(agent3_config.audio_remb, Some(30_000)); + + let agent4_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent4.agent_id()) + .expect("Config for agent4 not found"); + + assert_eq!(agent4_config.send_video, Some(true)); + assert_eq!(agent4_config.send_audio, Some(true)); + assert_eq!(agent4_config.video_remb, Some(1_000_000)); + assert_eq!(agent4_config.audio_remb, Some(60_000)); + + // Assert backend request. + let (req, _reqp, topic) = + find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, expected_topic); + assert_eq!(req.janus, "message"); + assert_eq!(req.session_id, backend.session_id()); + assert_eq!(req.handle_id, backend.handle_id()); + assert_eq!(req.body.method(), "writer_config.update"); + + let configs = req.body.configs(); + assert_eq!(configs.len(), 3); + + let agent2_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[0].id()) + .expect("Config for agent2's RTC not found"); + + assert_eq!(agent2_config.send_video(), true); + assert_eq!(agent2_config.send_audio(), false); + assert_eq!(agent2_config.video_remb(), Some(300_000)); + assert_eq!(agent2_config.audio_remb(), Some(60_000)); + + let agent3_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[1].id()) + .expect("Config for agent3's RTC not found"); + + assert_eq!(agent3_config.send_video(), false); + assert_eq!(agent3_config.send_audio(), true); + assert_eq!(agent3_config.video_remb(), None); + assert_eq!(agent3_config.audio_remb(), Some(30_000)); + + let agent4_config = configs + .iter() + .find(|c| c.stream_id() == rtcs[2].id()) + .expect("Config for agent4's RTC not found"); + + assert_eq!(agent4_config.send_video(), true); + assert_eq!(agent4_config.send_audio(), true); + assert_eq!(agent4_config.video_remb(), Some(1_000_000)); + assert_eq!(agent4_config.audio_remb(), Some(60_000)); + Ok(()) + } + + #[async_std::test] + async fn not_authorized() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with agents. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + room + }) + .unwrap(); + + // Make agent_writer_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::FORBIDDEN); + assert_eq!(err.kind(), "access_denied"); + Ok(()) + } + + #[async_std::test] + async fn too_many_config_items() -> std::io::Result<()> { + // Make agent_writer_config.update request. + let agent = TestAgent::new("web", "user", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let configs = (0..(MAX_STATE_CONFIGS_LEN + 1)) + .map(|i| { + let agent = TestAgent::new("web", &format!("user{}", i), USR_AUDIENCE); + + StateConfigItem { + agent_id: agent.agent_id().to_owned(), + send_video: Some(false), + send_audio: Some(true), + video_remb: Some(300_000), + audio_remb: Some(60_000), + } + }) + .collect::>(); + + let payload = State { + room_id: Uuid::new_v4(), + configs, + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn not_entered() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room. + let room = db + .connection_pool() + .get() + .map(|conn| shared_helpers::insert_room_with_owned(&conn)) + .unwrap(); + + // Make agent_writer_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "agent_not_entered_the_room"); + Ok(()) + } + + #[async_std::test] + async fn closed_room() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(Utc::now() - Duration::hours(2)), + Bound::Excluded(Utc::now() - Duration::hours(1)), + )) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + room + }) + .unwrap(); + + // Make agent_writer_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_closed"); + Ok(()) + } + + #[async_std::test] + async fn room_with_wrong_rtc_policy() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Shared) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + room + }) + .unwrap(); + + // Make agent_writer_config.update request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = State { + room_id: room.id(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn missing_room() -> std::io::Result<()> { + // Make agent_writer_config.update request. + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let payload = State { + room_id: Uuid::new_v4(), + configs: vec![], + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_not_found"); + Ok(()) + } + } + + mod read { + use std::ops::Bound; + + use chrono::{Duration, Utc}; + use uuid::Uuid; + + use crate::db::rtc::SharingPolicy as RtcSharingPolicy; + use crate::test_helpers::prelude::*; + + use super::super::*; + + #[async_std::test] + async fn read_state() -> std::io::Result<()> { + let db = TestDb::new(); + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + let agent3 = TestAgent::new("web", "user3", USR_AUDIENCE); + + // Insert a room with RTCs and agent writer configs. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent1.agent_id(), room.id()); + + let rtc2 = factory::Rtc::new(room.id()) + .created_by(agent2.agent_id().to_owned()) + .insert(&conn); + + factory::RtcWriterConfig::new(&rtc2) + .send_video(true) + .send_audio(true) + .video_remb(1_000_000) + .audio_remb(60_000) + .insert(&conn); + + let rtc3 = factory::Rtc::new(room.id()) + .created_by(agent3.agent_id().to_owned()) + .insert(&conn); + + factory::RtcWriterConfig::new(&rtc3) + .send_video(false) + .send_audio(false) + .video_remb(300_000) + .audio_remb(50_000) + .insert(&conn); + + room + }) + .unwrap(); + + // Make agent_writer_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + let messages = handle_request::(&mut context, &agent1, payload) + .await + .expect("Agent writer config read failed"); + + // Assert response. + let (state, respp, _) = find_response::(messages.as_slice()); + assert_eq!(respp.status(), ResponseStatus::OK); + assert_eq!(state.room_id, room.id()); + assert_eq!(state.configs.len(), 2); + + let agent2_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent2.agent_id()) + .expect("Config for agent2 not found"); + + assert_eq!(agent2_config.send_video, Some(true)); + assert_eq!(agent2_config.send_audio, Some(true)); + assert_eq!(agent2_config.video_remb, Some(1_000_000)); + assert_eq!(agent2_config.audio_remb, Some(60_000)); + + let agent3_config = state + .configs + .iter() + .find(|c| &c.agent_id == agent3.agent_id()) + .expect("Config for agent3 not found"); + + assert_eq!(agent3_config.send_video, Some(false)); + assert_eq!(agent3_config.send_audio, Some(false)); + assert_eq!(agent3_config.video_remb, Some(300_000)); + assert_eq!(agent3_config.audio_remb, Some(50_000)); + + Ok(()) + } + + #[async_std::test] + async fn not_entered() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room. + let room = db + .connection_pool() + .get() + .map(|conn| shared_helpers::insert_room_with_owned(&conn)) + .unwrap(); + + // Make agent_writer_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "agent_not_entered_the_room"); + Ok(()) + } + + #[async_std::test] + async fn closed_room() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(Utc::now() - Duration::hours(2)), + Bound::Excluded(Utc::now() - Duration::hours(1)), + )) + .rtc_sharing_policy(RtcSharingPolicy::Owned) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_writer_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_closed"); + Ok(()) + } + + #[async_std::test] + async fn wrong_rtc_sharing_policy() -> std::io::Result<()> { + let db = TestDb::new(); + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + + // Insert a room with an agent. + let room = db + .connection_pool() + .get() + .map(|conn| { + let room = factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(Utc::now()), Bound::Unbounded)) + .rtc_sharing_policy(RtcSharingPolicy::Shared) + .insert(&conn); + + shared_helpers::insert_agent(&conn, agent.agent_id(), room.id()); + + room + }) + .unwrap(); + + // Make agent_writer_config.read request. + let mut context = TestContext::new(db, TestAuthz::new()); + + let payload = ReadRequest { room_id: room.id() }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config update success"); + + assert_eq!(err.status(), ResponseStatus::BAD_REQUEST); + assert_eq!(err.kind(), "invalid_payload"); + Ok(()) + } + + #[async_std::test] + async fn missing_room() -> std::io::Result<()> { + // Make agent_writer_config.read request. + let agent = TestAgent::new("web", "user1", USR_AUDIENCE); + let mut context = TestContext::new(TestDb::new(), TestAuthz::new()); + + let payload = ReadRequest { + room_id: Uuid::new_v4(), + }; + + // Assert error. + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected agent writer config read success"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_not_found"); + Ok(()) + } + } +} diff --git a/src/app/endpoint/helpers.rs b/src/app/endpoint/helpers.rs index 65d387f7..56f82c9c 100644 --- a/src/app/endpoint/helpers.rs +++ b/src/app/endpoint/helpers.rs @@ -2,10 +2,14 @@ use std::ops::Bound; use anyhow::anyhow; use chrono::{DateTime, Duration, Utc}; +use diesel::pg::PgConnection; use serde::Serialize; -use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties, - OutgoingResponse, ResponseStatus, ShortTermTimingProperties, +use svc_agent::{ + mqtt::{ + IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties, + OutgoingResponse, ResponseStatus, ShortTermTimingProperties, + }, + AgentId, }; use uuid::Uuid; @@ -160,3 +164,20 @@ pub(crate) fn add_room_logger_tags(context: &mut C, room: &db::room: context.add_logger_tags(o!("scope" => scope.to_string())); } } + +pub(crate) fn check_room_presence( + room: &db::room::Object, + agent_id: &AgentId, + conn: &PgConnection, +) -> Result<(), AppError> { + let results = db::agent::ListQuery::new() + .room_id(room.id()) + .agent_id(agent_id) + .execute(conn)?; + + if results.is_empty() { + Err(anyhow!("Agent is not online in the room")).error(AppErrorKind::AgentNotEnteredTheRoom) + } else { + Ok(()) + } +} diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index aba060f3..a518333b 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -1,8 +1,5 @@ -use std::result::Result as StdResult; - use async_std::stream; use async_trait::async_trait; -use diesel::pg::PgConnection; use serde_derive::{Deserialize, Serialize}; use serde_json::{json, Value as JsonValue}; use svc_agent::mqtt::{ @@ -16,7 +13,6 @@ use uuid::Uuid; use crate::app::context::Context; use crate::app::endpoint::prelude::*; use crate::app::API_VERSION; -use crate::db::{self, room::Object as Room}; //////////////////////////////////////////////////////////////////////////////// @@ -58,8 +54,8 @@ impl RequestHandler for UnicastHandler { )?; let conn = context.get_conn()?; - check_room_presence(&room, reqp.as_agent_id(), &conn)?; - check_room_presence(&room, &payload.agent_id, &conn)?; + helpers::check_room_presence(&room, reqp.as_agent_id(), &conn)?; + helpers::check_room_presence(&room, &payload.agent_id, &conn)?; } let response_topic = @@ -122,7 +118,7 @@ impl RequestHandler for BroadcastHandler { )?; let conn = context.get_conn()?; - check_room_presence(&room, &reqp.as_agent_id(), &conn)?; + helpers::check_room_presence(&room, &reqp.as_agent_id(), &conn)?; room }; @@ -192,25 +188,6 @@ impl ResponseHandler for UnicastResponseHandler { /////////////////////////////////////////////////////////////////////////////// -fn check_room_presence( - room: &Room, - agent_id: &AgentId, - conn: &PgConnection, -) -> StdResult<(), AppError> { - let results = db::agent::ListQuery::new() - .room_id(room.id()) - .agent_id(agent_id) - .execute(conn)?; - - if results.is_empty() { - Err(anyhow!("Agent is not online in the room")).error(AppErrorKind::AgentNotEnteredTheRoom) - } else { - Ok(()) - } -} - -/////////////////////////////////////////////////////////////////////////////// - #[cfg(test)] mod test { mod unicast { diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index 110a7616..095bec7a 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -51,6 +51,10 @@ macro_rules! request_routes { // Request routes configuration: method => RequestHandler request_routes!( "agent.list" => agent::ListHandler, + "agent_reader_config.read" => agent_reader_config::ReadHandler, + "agent_reader_config.update" => agent_reader_config::UpdateHandler, + "agent_writer_config.read" => agent_writer_config::ReadHandler, + "agent_writer_config.update" => agent_writer_config::UpdateHandler, "message.broadcast" => message::BroadcastHandler, "message.unicast" => message::UnicastHandler, "room.create" => room::CreateHandler, @@ -177,6 +181,8 @@ event_routes!( /////////////////////////////////////////////////////////////////////////////// mod agent; +mod agent_reader_config; +mod agent_writer_config; pub(crate) mod helpers; mod message; mod metric; diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index d8a1b97c..c7c23e41 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -101,7 +101,7 @@ impl RequestHandler for CreateHandler { ); let notification = helpers::build_notification( - "room.create", + "rtc.create", &format!("rooms/{}/events", room.id()), rtc, reqp, @@ -470,7 +470,7 @@ mod test { // Assert notification. let (rtc, evp, topic) = find_event::(messages.as_slice()); assert!(topic.ends_with(&format!("/rooms/{}/events", room.id()))); - assert_eq!(evp.label(), "room.create"); + assert_eq!(evp.label(), "rtc.create"); assert_eq!(rtc.room_id(), room.id()); }); } diff --git a/src/app/error.rs b/src/app/error.rs index b85fa4fd..c0d4e03e 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -32,6 +32,7 @@ pub(crate) enum ErrorKind { InvalidRoomTime, InvalidSdpType, InvalidSubscriptionObject, + InvalidPayload, MessageBuildingFailed, MessageHandlingFailed, MessageParsingFailed, @@ -156,6 +157,12 @@ impl Into for ErrorKind { title: "Invalid JSEP format", is_notify_sentry: false, }, + Self::InvalidPayload => ErrorKindProperties { + status: ResponseStatus::BAD_REQUEST, + kind: "invalid_payload", + title: "Invalid payload", + is_notify_sentry: false, + }, Self::InvalidRoomTime => ErrorKindProperties { status: ResponseStatus::BAD_REQUEST, kind: "invalid_room_time", diff --git a/src/backend/janus/requests.rs b/src/backend/janus/requests.rs index 0568e310..236ddcad 100644 --- a/src/backend/janus/requests.rs +++ b/src/backend/janus/requests.rs @@ -192,3 +192,161 @@ impl AgentLeaveRequestBody { } } } + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct UpdateReaderConfigRequestBody { + method: String, + configs: Vec, +} + +#[cfg(test)] +impl UpdateReaderConfigRequestBody { + pub(crate) fn method(&self) -> &str { + &self.method + } + + pub(crate) fn configs(&self) -> &[UpdateReaderConfigRequestBodyConfigItem] { + &self.configs + } +} + +impl UpdateReaderConfigRequestBody { + pub(crate) fn new(configs: Vec) -> Self { + Self { + method: String::from("reader_config.update"), + configs, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct UpdateReaderConfigRequestBodyConfigItem { + reader_id: AgentId, + stream_id: Uuid, + receive_video: bool, + receive_audio: bool, +} + +impl UpdateReaderConfigRequestBodyConfigItem { + pub(crate) fn new( + reader_id: AgentId, + stream_id: Uuid, + receive_video: bool, + receive_audio: bool, + ) -> Self { + Self { + reader_id, + stream_id, + receive_video, + receive_audio, + } + } + + #[cfg(test)] + pub(crate) fn reader_id(&self) -> &AgentId { + &self.reader_id + } + + #[cfg(test)] + pub(crate) fn stream_id(&self) -> Uuid { + self.stream_id + } + + #[cfg(test)] + pub(crate) fn receive_video(&self) -> bool { + self.receive_video + } + + #[cfg(test)] + pub(crate) fn receive_audio(&self) -> bool { + self.receive_audio + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct UpdateWriterConfigRequestBody { + method: String, + configs: Vec, +} + +impl UpdateWriterConfigRequestBody { + pub(crate) fn new(configs: Vec) -> Self { + Self { + method: String::from("writer_config.update"), + configs, + } + } +} + +#[cfg(test)] +impl UpdateWriterConfigRequestBody { + pub(crate) fn method(&self) -> &str { + &self.method + } + + pub(crate) fn configs(&self) -> &[UpdateWriterConfigRequestBodyConfigItem] { + &self.configs + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct UpdateWriterConfigRequestBodyConfigItem { + stream_id: Uuid, + send_video: bool, + send_audio: bool, + #[serde(skip_serializing_if = "Option::is_none")] + video_remb: Option, + #[serde(skip_serializing_if = "Option::is_none")] + audio_remb: Option, +} + +impl UpdateWriterConfigRequestBodyConfigItem { + pub(crate) fn new(stream_id: Uuid, send_video: bool, send_audio: bool) -> Self { + Self { + stream_id, + send_video, + send_audio, + video_remb: None, + audio_remb: None, + } + } + + pub(crate) fn set_video_remb(&mut self, video_remb: u32) -> &mut Self { + self.video_remb = Some(video_remb); + self + } + + pub(crate) fn set_audio_remb(&mut self, audio_remb: u32) -> &mut Self { + self.audio_remb = Some(audio_remb); + self + } + + #[cfg(test)] + pub(crate) fn stream_id(&self) -> Uuid { + self.stream_id + } + + #[cfg(test)] + pub(crate) fn send_video(&self) -> bool { + self.send_video + } + + #[cfg(test)] + pub(crate) fn send_audio(&self) -> bool { + self.send_audio + } + + #[cfg(test)] + pub(crate) fn video_remb(&self) -> Option { + self.video_remb + } + + #[cfg(test)] + pub(crate) fn audio_remb(&self) -> Option { + self.audio_remb + } +} diff --git a/src/backend/janus/transactions/mod.rs b/src/backend/janus/transactions/mod.rs index 4cbdc60f..a24379d2 100644 --- a/src/backend/janus/transactions/mod.rs +++ b/src/backend/janus/transactions/mod.rs @@ -10,6 +10,8 @@ pub(crate) enum Transaction { CreateRtcHandle(create_rtc_handle::TransactionData), ReadStream(read_stream::TransactionData), Trickle(trickle::TransactionData), + UpdateReaderConfig, + UpdateWriterConfig, UploadStream(upload_stream::TransactionData), } @@ -20,4 +22,6 @@ mod create_session; mod create_stream; mod read_stream; mod trickle; +mod update_agent_reader_config; +mod update_agent_writer_config; mod upload_stream; diff --git a/src/backend/janus/transactions/update_agent_reader_config.rs b/src/backend/janus/transactions/update_agent_reader_config.rs new file mode 100644 index 00000000..b924407c --- /dev/null +++ b/src/backend/janus/transactions/update_agent_reader_config.rs @@ -0,0 +1,71 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use svc_agent::mqtt::{ + IncomingRequestProperties, OutgoingMessage, OutgoingRequest, ShortTermTimingProperties, +}; + +use crate::db::janus_backend::Object as JanusBackend; +use crate::db::rtc::Object as Rtc; +use crate::db::rtc_reader_config::Object as RtcReaderConfig; +use crate::util::{generate_correlation_data, to_base64}; + +use super::super::requests::{ + MessageRequest, UpdateReaderConfigRequestBody, UpdateReaderConfigRequestBodyConfigItem, +}; +use super::super::{Client, JANUS_API_VERSION}; +use super::Transaction; + +const METHOD: &str = "janus_conference_rtc_reader_config.update"; + +//////////////////////////////////////////////////////////////////////////////// + +impl Client { + pub(crate) fn update_agent_reader_config_request( + &self, + reqp: IncomingRequestProperties, + backend: &JanusBackend, + rtc_reader_configs_with_rtcs: &[(RtcReaderConfig, Rtc)], + start_timestamp: DateTime, + ) -> Result> { + let to = backend.id(); + let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); + + let props = reqp.to_request( + METHOD, + &self.response_topic(to)?, + &generate_correlation_data(), + short_term_timing, + ); + + let items = rtc_reader_configs_with_rtcs + .iter() + .map(|(rtc_reader_config, rtc)| { + UpdateReaderConfigRequestBodyConfigItem::new( + rtc_reader_config.reader_id().to_owned(), + rtc.id(), + rtc_reader_config.receive_video(), + rtc_reader_config.receive_audio(), + ) + }) + .collect::>(); + + let body = UpdateReaderConfigRequestBody::new(items); + + let payload = MessageRequest::new( + &to_base64(&Transaction::UpdateReaderConfig)?, + backend.session_id(), + backend.handle_id(), + serde_json::to_value(&body)?, + None, + ); + + self.register_transaction(to, start_timestamp, &props, &payload, self.timeout(METHOD)); + + Ok(OutgoingRequest::unicast( + payload, + props, + to, + JANUS_API_VERSION, + )) + } +} diff --git a/src/backend/janus/transactions/update_agent_writer_config.rs b/src/backend/janus/transactions/update_agent_writer_config.rs new file mode 100644 index 00000000..6a006950 --- /dev/null +++ b/src/backend/janus/transactions/update_agent_writer_config.rs @@ -0,0 +1,82 @@ +use anyhow::Result; +use chrono::{DateTime, Duration, Utc}; +use svc_agent::mqtt::{ + IncomingRequestProperties, OutgoingMessage, OutgoingRequest, ShortTermTimingProperties, +}; + +use crate::db::janus_backend::Object as JanusBackend; +use crate::db::rtc::Object as Rtc; +use crate::db::rtc_writer_config::Object as RtcWriterConfig; +use crate::util::{generate_correlation_data, to_base64}; + +use super::super::requests::{ + MessageRequest, UpdateWriterConfigRequestBody, UpdateWriterConfigRequestBodyConfigItem, +}; +use super::super::{Client, JANUS_API_VERSION}; +use super::Transaction; + +const METHOD: &str = "janus_conference_rtc_writer_config.update"; + +//////////////////////////////////////////////////////////////////////////////// + +impl Client { + pub(crate) fn update_agent_writer_config_request( + &self, + reqp: IncomingRequestProperties, + backend: &JanusBackend, + rtc_writer_configs_with_rtcs: &[(RtcWriterConfig, Rtc)], + start_timestamp: DateTime, + authz_time: Duration, + ) -> Result> { + let to = backend.id(); + let mut short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); + short_term_timing.set_authorization_time(authz_time); + + let props = reqp.to_request( + METHOD, + &self.response_topic(to)?, + &generate_correlation_data(), + short_term_timing, + ); + + let items = rtc_writer_configs_with_rtcs + .iter() + .map(|(rtc_writer_config, rtc)| { + let mut req = UpdateWriterConfigRequestBodyConfigItem::new( + rtc.id(), + rtc_writer_config.send_video(), + rtc_writer_config.send_audio(), + ); + + if let Some(video_remb) = rtc_writer_config.video_remb() { + req.set_video_remb(video_remb as u32); + } + + if let Some(audio_remb) = rtc_writer_config.audio_remb() { + req.set_audio_remb(audio_remb as u32); + } + + req + }) + .collect::>(); + + let body = UpdateWriterConfigRequestBody::new(items); + + let payload = MessageRequest::new( + &to_base64(&Transaction::UpdateWriterConfig)?, + backend.session_id(), + backend.handle_id(), + serde_json::to_value(&body)?, + None, + ); + + self.register_transaction(to, start_timestamp, &props, &payload, self.timeout(METHOD)); + + Ok(OutgoingRequest::unicast( + payload, + props, + to, + JANUS_API_VERSION, + )) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 4f628293..b169ac0b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -39,3 +39,5 @@ pub(crate) mod janus_rtc_stream; pub(crate) mod recording; pub(crate) mod room; pub(crate) mod rtc; +pub(crate) mod rtc_reader_config; +pub(crate) mod rtc_writer_config; diff --git a/src/db/room.rs b/src/db/room.rs index 47bc3fbb..9496cf84 100644 --- a/src/db/room.rs +++ b/src/db/room.rs @@ -99,6 +99,7 @@ pub(crate) struct Object { #[serde(skip_serializing_if = "Option::is_none")] reserve: Option, tags: JsonValue, + #[serde(skip_serializing_if = "Option::is_none")] backend_id: Option, rtc_sharing_policy: RtcSharingPolicy, } diff --git a/src/db/rtc.rs b/src/db/rtc.rs index ea90b214..18d1f3ea 100644 --- a/src/db/rtc.rs +++ b/src/db/rtc.rs @@ -95,13 +95,14 @@ impl FindQuery { //////////////////////////////////////////////////////////////////////////////// #[derive(Default)] -pub(crate) struct ListQuery { +pub(crate) struct ListQuery<'a> { room_id: Option, + created_by: Option<&'a [&'a AgentId]>, offset: Option, limit: Option, } -impl ListQuery { +impl<'a> ListQuery<'a> { pub(crate) fn new() -> Self { Default::default() } @@ -113,6 +114,13 @@ impl ListQuery { } } + pub(crate) fn created_by(self, created_by: &'a [&'a AgentId]) -> Self { + Self { + created_by: Some(created_by), + ..self + } + } + pub(crate) fn offset(self, offset: i64) -> Self { Self { offset: Some(offset), @@ -136,6 +144,10 @@ impl ListQuery { q = q.filter(rtc::room_id.eq(room_id)); } + if let Some(created_by) = self.created_by { + q = q.filter(rtc::created_by.eq_any(created_by)) + } + if let Some(offset) = self.offset { q = q.offset(offset); } diff --git a/src/db/rtc_reader_config.rs b/src/db/rtc_reader_config.rs new file mode 100644 index 00000000..6d985d27 --- /dev/null +++ b/src/db/rtc_reader_config.rs @@ -0,0 +1,131 @@ +use diesel::{pg::PgConnection, result::Error}; +use svc_agent::AgentId; +use uuid::Uuid; + +use crate::db::rtc::Object as Rtc; +use crate::schema::{rtc, rtc_reader_config}; + +//////////////////////////////////////////////////////////////////////////////// + +type AllColumns = ( + rtc_reader_config::rtc_id, + rtc_reader_config::reader_id, + rtc_reader_config::receive_video, + rtc_reader_config::receive_audio, +); + +const ALL_COLUMNS: AllColumns = ( + rtc_reader_config::rtc_id, + rtc_reader_config::reader_id, + rtc_reader_config::receive_video, + rtc_reader_config::receive_audio, +); + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Identifiable, Queryable, QueryableByName, Associations)] +#[belongs_to(Rtc, foreign_key = "rtc_id")] +#[table_name = "rtc_reader_config"] +#[primary_key(rtc_id, reader_id)] +pub(crate) struct Object { + rtc_id: Uuid, + reader_id: AgentId, + receive_video: bool, + receive_audio: bool, +} + +impl Object { + pub(crate) fn reader_id(&self) -> &AgentId { + &self.reader_id + } + + pub(crate) fn receive_video(&self) -> bool { + self.receive_video + } + + pub(crate) fn receive_audio(&self) -> bool { + self.receive_audio + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub(crate) struct ListWithRtcQuery<'a> { + room_id: Uuid, + reader_id: &'a AgentId, +} + +impl<'a> ListWithRtcQuery<'a> { + pub(crate) fn new(room_id: Uuid, reader_id: &'a AgentId) -> Self { + Self { room_id, reader_id } + } + + pub(crate) fn execute(&self, conn: &PgConnection) -> Result, Error> { + use diesel::prelude::*; + + rtc_reader_config::table + .inner_join(rtc::table) + .filter(rtc::room_id.eq(self.room_id)) + .filter(rtc_reader_config::reader_id.eq(self.reader_id)) + .select((ALL_COLUMNS, crate::db::rtc::ALL_COLUMNS)) + .get_results(conn) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Insertable, AsChangeset)] +#[table_name = "rtc_reader_config"] +pub(crate) struct UpsertQuery<'a> { + rtc_id: Uuid, + reader_id: &'a AgentId, + receive_video: Option, + receive_audio: Option, +} + +impl<'a> UpsertQuery<'a> { + pub(crate) fn new(rtc_id: Uuid, reader_id: &'a AgentId) -> Self { + Self { + rtc_id, + reader_id, + receive_video: None, + receive_audio: None, + } + } + + pub(crate) fn receive_video(self, receive_video: bool) -> Self { + Self { + receive_video: Some(receive_video), + ..self + } + } + + pub(crate) fn receive_audio(self, receive_audio: bool) -> Self { + Self { + receive_audio: Some(receive_audio), + ..self + } + } + + pub(crate) fn execute(&self, conn: &PgConnection) -> Result { + use diesel::prelude::*; + + let mut insert_values = self.clone(); + + if insert_values.receive_video.is_none() { + insert_values.receive_video = Some(true); + } + + if insert_values.receive_audio.is_none() { + insert_values.receive_audio = Some(true); + } + + diesel::insert_into(rtc_reader_config::table) + .values(insert_values) + .on_conflict((rtc_reader_config::rtc_id, rtc_reader_config::reader_id)) + .do_update() + .set(self) + .get_result(conn) + } +} diff --git a/src/db/rtc_writer_config.rs b/src/db/rtc_writer_config.rs new file mode 100644 index 00000000..41669bb6 --- /dev/null +++ b/src/db/rtc_writer_config.rs @@ -0,0 +1,151 @@ +use diesel::{pg::PgConnection, result::Error}; +use uuid::Uuid; + +use crate::db::rtc::Object as Rtc; +use crate::schema::{rtc, rtc_writer_config}; + +//////////////////////////////////////////////////////////////////////////////// + +type AllColumns = ( + rtc_writer_config::rtc_id, + rtc_writer_config::send_video, + rtc_writer_config::send_audio, + rtc_writer_config::video_remb, + rtc_writer_config::audio_remb, +); + +const ALL_COLUMNS: AllColumns = ( + rtc_writer_config::rtc_id, + rtc_writer_config::send_video, + rtc_writer_config::send_audio, + rtc_writer_config::video_remb, + rtc_writer_config::audio_remb, +); + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Identifiable, Queryable, QueryableByName, Associations)] +#[belongs_to(Rtc, foreign_key = "rtc_id")] +#[table_name = "rtc_writer_config"] +#[primary_key(rtc_id)] +pub(crate) struct Object { + rtc_id: Uuid, + send_video: bool, + send_audio: bool, + video_remb: Option, + audio_remb: Option, +} + +impl Object { + pub(crate) fn send_video(&self) -> bool { + self.send_video + } + + pub(crate) fn send_audio(&self) -> bool { + self.send_audio + } + + pub(crate) fn video_remb(&self) -> Option { + self.video_remb + } + + pub(crate) fn audio_remb(&self) -> Option { + self.audio_remb + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub(crate) struct ListWithRtcQuery { + room_id: Uuid, +} + +impl ListWithRtcQuery { + pub(crate) fn new(room_id: Uuid) -> Self { + Self { room_id } + } + + pub(crate) fn execute(&self, conn: &PgConnection) -> Result, Error> { + use diesel::prelude::*; + + rtc_writer_config::table + .inner_join(rtc::table) + .filter(rtc::room_id.eq(self.room_id)) + .select((ALL_COLUMNS, crate::db::rtc::ALL_COLUMNS)) + .get_results(conn) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Insertable, AsChangeset)] +#[table_name = "rtc_writer_config"] +pub(crate) struct UpsertQuery { + rtc_id: Uuid, + send_video: Option, + send_audio: Option, + video_remb: Option, + audio_remb: Option, +} + +impl UpsertQuery { + pub(crate) fn new(rtc_id: Uuid) -> Self { + Self { + rtc_id, + send_video: None, + send_audio: None, + video_remb: None, + audio_remb: None, + } + } + + pub(crate) fn send_video(self, send_video: bool) -> Self { + Self { + send_video: Some(send_video), + ..self + } + } + + pub(crate) fn send_audio(self, send_audio: bool) -> Self { + Self { + send_audio: Some(send_audio), + ..self + } + } + + pub(crate) fn video_remb(self, video_remb: i64) -> Self { + Self { + video_remb: Some(video_remb), + ..self + } + } + + pub(crate) fn audio_remb(self, audio_remb: i64) -> Self { + Self { + audio_remb: Some(audio_remb), + ..self + } + } + + pub(crate) fn execute(&self, conn: &PgConnection) -> Result { + use diesel::prelude::*; + + let mut insert_values = self.clone(); + + if insert_values.send_video.is_none() { + insert_values.send_video = Some(true); + } + + if insert_values.send_audio.is_none() { + insert_values.send_audio = Some(true); + } + + diesel::insert_into(rtc_writer_config::table) + .values(insert_values) + .on_conflict(rtc_writer_config::rtc_id) + .do_update() + .set(self) + .get_result(conn) + } +} diff --git a/src/schema.rs b/src/schema.rs index 8fd1cacb..7d4dfd0f 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -94,12 +94,39 @@ table! { } } +table! { + use diesel::sql_types::*; + use crate::db::sql::*; + + rtc_reader_config (rtc_id, reader_id) { + rtc_id -> Uuid, + reader_id -> Agent_id, + receive_video -> Bool, + receive_audio -> Bool, + } +} + +table! { + use diesel::sql_types::*; + use crate::db::sql::*; + + rtc_writer_config (rtc_id) { + rtc_id -> Uuid, + send_video -> Bool, + send_audio -> Bool, + audio_remb -> Nullable, + video_remb -> Nullable, + } +} + joinable!(agent -> room (room_id)); joinable!(agent_connection -> agent (agent_id)); joinable!(janus_rtc_stream -> janus_backend (backend_id)); joinable!(janus_rtc_stream -> rtc (rtc_id)); joinable!(recording -> rtc (rtc_id)); joinable!(rtc -> room (room_id)); +joinable!(rtc_reader_config -> rtc (rtc_id)); +joinable!(rtc_writer_config -> rtc (rtc_id)); allow_tables_to_appear_in_same_query!( agent, @@ -109,4 +136,6 @@ allow_tables_to_appear_in_same_query!( recording, room, rtc, + rtc_reader_config, + rtc_writer_config, ); diff --git a/src/test_helpers/factory.rs b/src/test_helpers/factory.rs index 8e1d5514..1b161263 100644 --- a/src/test_helpers/factory.rs +++ b/src/test_helpers/factory.rs @@ -336,3 +336,123 @@ impl<'a> Recording<'a> { .expect("Failed to insert recording") } } + +/////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct RtcReaderConfig<'a> { + rtc: &'a db::rtc::Object, + reader_id: &'a AgentId, + receive_video: Option, + receive_audio: Option, +} + +impl<'a> RtcReaderConfig<'a> { + pub(crate) fn new(rtc: &'a db::rtc::Object, reader_id: &'a AgentId) -> Self { + Self { + rtc, + reader_id, + receive_video: None, + receive_audio: None, + } + } + + pub(crate) fn receive_video(self, receive_video: bool) -> Self { + Self { + receive_video: Some(receive_video), + ..self + } + } + + pub(crate) fn receive_audio(self, receive_audio: bool) -> Self { + Self { + receive_audio: Some(receive_audio), + ..self + } + } + + pub(crate) fn insert(&self, conn: &PgConnection) -> db::rtc_reader_config::Object { + let mut q = db::rtc_reader_config::UpsertQuery::new(self.rtc.id(), self.reader_id); + + if let Some(receive_video) = self.receive_video { + q = q.receive_video(receive_video); + } + + if let Some(receive_audio) = self.receive_audio { + q = q.receive_audio(receive_audio); + } + + q.execute(conn).expect("Failed to insert RTC reader config") + } +} + +/////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct RtcWriterConfig<'a> { + rtc: &'a db::rtc::Object, + send_video: Option, + send_audio: Option, + video_remb: Option, + audio_remb: Option, +} + +impl<'a> RtcWriterConfig<'a> { + pub(crate) fn new(rtc: &'a db::rtc::Object) -> Self { + Self { + rtc, + send_video: None, + send_audio: None, + video_remb: None, + audio_remb: None, + } + } + + pub(crate) fn send_video(self, send_video: bool) -> Self { + Self { + send_video: Some(send_video), + ..self + } + } + + pub(crate) fn send_audio(self, send_audio: bool) -> Self { + Self { + send_audio: Some(send_audio), + ..self + } + } + + pub(crate) fn video_remb(self, video_remb: i64) -> Self { + Self { + video_remb: Some(video_remb), + ..self + } + } + + pub(crate) fn audio_remb(self, audio_remb: i64) -> Self { + Self { + audio_remb: Some(audio_remb), + ..self + } + } + + pub(crate) fn insert(&self, conn: &PgConnection) -> db::rtc_writer_config::Object { + let mut q = db::rtc_writer_config::UpsertQuery::new(self.rtc.id()); + + if let Some(send_video) = self.send_video { + q = q.send_video(send_video); + } + + if let Some(send_audio) = self.send_audio { + q = q.send_audio(send_audio); + } + + if let Some(video_remb) = self.video_remb { + q = q.video_remb(video_remb); + } + + if let Some(audio_remb) = self.audio_remb { + q = q.audio_remb(audio_remb); + } + + q.execute(conn).expect("Failed to insert RTC writer config") + } +}