Skip to content

Commit

Permalink
Support multiple WebGPU message types
Browse files Browse the repository at this point in the history
Adds support for handling multiple WebGPU protocol messages. The old
behavior is still kept around until the Dawn code is updated to send
multiple messages.

Also adds documentation describing the message protocol.

Bug: 1340602
Change-Id: I717665b8d4a7a92caba72e9fe5c0ad6be53c07a3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3816869
Reviewed-by: Austin Eng <enga@chromium.org>
Auto-Submit: Brian Sheedy <bsheedy@chromium.org>
Commit-Queue: Brian Sheedy <bsheedy@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1033258}
  • Loading branch information
Brian Sheedy authored and Chromium LUCI CQ committed Aug 10, 2022
1 parent ed922ab commit 51f1a29
Show file tree
Hide file tree
Showing 2 changed files with 337 additions and 33 deletions.
234 changes: 201 additions & 33 deletions content/test/gpu/gpu_tests/webgpu_cts_integration_test.py
Expand Up @@ -11,7 +11,7 @@
import sys
import tempfile
import threading
from typing import Any, List
from typing import Any, Dict, List
import unittest

import websockets # pylint:disable=import-error
Expand Down Expand Up @@ -42,12 +42,24 @@
HTML_FILENAME = os.path.join('webgpu-cts', 'test_page.html')

JAVASCRIPT_DURATION = 'javascript_duration'
MESSAGE_TYPE_TEST_STARTED = 'TEST_STARTED'
MESSAGE_TYPE_TEST_HEARTBEAT = 'TEST_HEARTBEAT'
MESSAGE_TYPE_TEST_STATUS = 'TEST_STATUS'
MESSAGE_TYPE_TEST_LOG = 'TEST_LOG'
MESSAGE_TYPE_TEST_FINISHED = 'TEST_FINISHED'

# These are tests that, for whatever reason, don't like being run in parallel.
SERIAL_TESTS = {}


class WebGpuTestResult():
"""Struct-like object for holding a single test result."""

def __init__(self):
self.status = None
self.log_pieces = []


async def StartWebsocketServer() -> None:
async def HandleWebsocketConnection(
websocket: ws_server.WebSocketServerProtocol) -> None:
Expand Down Expand Up @@ -268,12 +280,14 @@ def GenerateGpuTests(cls, options: ct.ParsedCmdArgs) -> ct.TestGenerator:

def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
self._query, self._run_in_worker = args
timeout = self._GetTestTimeout()
# Only a single instance is used to run tests despite a number of instances
# (~2x the number of total tests) being initialized, so make sure to clear
# this state so we don't accidentally keep it around from a previous test.
if JAVASCRIPT_DURATION in self.additionalTags:
del self.additionalTags[JAVASCRIPT_DURATION]

timeout = self._GetTestTimeout()

try:
self._NavigateIfNecessary(test_path)
asyncio.run_coroutine_threadsafe(
Expand All @@ -282,38 +296,10 @@ def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
'q': self._query,
'w': self._run_in_worker
})), WebGpuCtsIntegrationTest.event_loop)
# Loop until we receive a message saying that the test is finished. This
# currently has no practical effect, but it is an intermediate step to
# supporting a heartbeat mechanism. See crbug.com/1340602.
while True:
future = asyncio.run_coroutine_threadsafe(
asyncio.wait_for(WebGpuCtsIntegrationTest.websocket.recv(),
timeout), WebGpuCtsIntegrationTest.event_loop)
response = future.result()
response = json.loads(response)
if response['type'] == MESSAGE_TYPE_TEST_FINISHED:
break
result = self.HandleMessageLoop(timeout)

status = response['s']
logs_pieces = [response['l']]
is_final_payload = response['final']
js_duration = response['js_duration_ms'] / 1000
# Specify the precision to avoid scientific notation. Nanoseconds should
# be more precision than we need anyways.
self.additionalTags[JAVASCRIPT_DURATION] = '%.9fs' % js_duration
# Get multiple log pieces if necessary, e.g. if a monolithic log would
# have gone over the max payload size.
while not is_final_payload:
future = asyncio.run_coroutine_threadsafe(
asyncio.wait_for(WebGpuCtsIntegrationTest.websocket.recv(),
MULTI_PAYLOAD_TIMEOUT),
WebGpuCtsIntegrationTest.event_loop)
response = future.result()
response = json.loads(response)
logs_pieces.append(response['l'])
is_final_payload = response['final']

log_str = ''.join(logs_pieces)
log_str = ''.join(result.log_pieces)
status = result.status
if status == 'skip':
self.skipTest('WebGPU CTS JavaScript reported test skip with logs ' +
log_str)
Expand All @@ -329,6 +315,96 @@ def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
finally:
WebGpuCtsIntegrationTest.total_tests_run += 1

def HandleMessageLoop(self, timeout: float) -> WebGpuTestResult:
"""Helper function to handle the loop for the message protocol.
See //docs/gpu/webgpu_cts_harness_message_protocol.md for more information
on the message format.
TODO(crbug.com/1340602): Update this to be the total test timeout once the
heartbeat mechanism is implemented.
Args:
timeout: A float denoting the number of seconds to the test is allowed
to wait between test messages before timing out.
Returns:
A filled WebGpuTestResult instance.
"""
result = WebGpuTestResult()
message_state = {
MESSAGE_TYPE_TEST_STARTED: False,
MESSAGE_TYPE_TEST_STATUS: False,
MESSAGE_TYPE_TEST_LOG: False,
}
# Loop until we receive a message saying that the test is finished. This
# currently has no practical effect, but it is an intermediate step to
# supporting a heartbeat mechanism. See crbug.com/1340602.
while True:
future = asyncio.run_coroutine_threadsafe(
asyncio.wait_for(WebGpuCtsIntegrationTest.websocket.recv(), timeout),
WebGpuCtsIntegrationTest.event_loop)
response = future.result()
response = json.loads(response)
response_type = response['type']

if response == MESSAGE_TYPE_TEST_STARTED:
# If we ever want the adapter information from WebGPU, we would
# retrieve it from the message here. However, to avoid pylint
# complaining about unused variables, don't grab it until we actually
# need it.
VerifyMessageOrderTestStarted(message_state)

elif response_type == MESSAGE_TYPE_TEST_HEARTBEAT:
VerifyMessageOrderTestHeartbeat(message_state)
continue

elif response_type == MESSAGE_TYPE_TEST_STATUS:
VerifyMessageOrderTestStatus(message_state)
result.status = response['status']
js_duration = response['js_duration_ms'] / 1000
# Specify the precision to avoid scientific notation. Nanoseconds
# should be more precision than we need anyways.
self.additionalTags[JAVASCRIPT_DURATION] = '%.9fs' % js_duration

elif response_type == MESSAGE_TYPE_TEST_LOG:
VerifyMessageOrderTestLog(message_state)
result.log_pieces.append(response['log'])

elif response_type == MESSAGE_TYPE_TEST_FINISHED:
VerifyMessageOrderTestFinished(message_state)
# TODO(crbug.com/1340602): Remove log, etc. once the Dawn code has
# been updated to send multiple message types.
if 's' in response:
result.status = response['s']
if 'l' in response:
result.log_pieces.append(response['l'])
if 'js_duration_ms' in response:
js_duration = response['js_duration_ms'] / 1000
# Specify the precision to avoid scientific notation. Nanoseconds
# should be more precision than we need anyways.
self.additionalTags[JAVASCRIPT_DURATION] = '%.9fs' % js_duration

if 'final' in response:
is_final_payload = response['final']
# Get multiple log pieces if necessary, e.g. if a monolithic log
# would have gone over the max payload size.
while not is_final_payload:
future = asyncio.run_coroutine_threadsafe(
asyncio.wait_for(WebGpuCtsIntegrationTest.websocket.recv(),
MULTI_PAYLOAD_TIMEOUT),
WebGpuCtsIntegrationTest.event_loop)
response = future.result()
response = json.loads(response)
result.log_pieces.append(response['l'])
is_final_payload = response['final']
break

else:
raise WebGpuMessageProtocolError('Received unknown message type %s' %
response_type)
return result

@classmethod
def CleanUpExistingWebsocket(cls) -> None:
if cls.connection_stopper:
Expand Down Expand Up @@ -404,6 +480,98 @@ def ExpectationsFiles(cls) -> List[str]:
return [EXPECTATIONS_FILE]


class WebGpuMessageProtocolError(RuntimeError):
pass


def VerifyMessageOrderTestStarted(message_state: Dict[str, bool]) -> None:
"""Helper function to verify that messages are ordered correctly.
Handles MESSAGE_TYPE_TEST_STARTED messages.
Split out to reduce the number of branches within a single function.
Args:
message_state: A map from message type to a boolean denoting whether a
message of that type has been received before.
"""
if message_state[MESSAGE_TYPE_TEST_STARTED]:
raise WebGpuMessageProtocolError(
'Received multiple start messages for one test')
message_state[MESSAGE_TYPE_TEST_STARTED] = True


def VerifyMessageOrderTestHeartbeat(message_state: Dict[str, bool]) -> None:
"""Helper function to verify that messages are ordered correctly.
Handles MESSAGE_TYPE_TEST_HEARTBEAT messages.
Split out to reduce the number of branches within a single function.
Args:
message_state: A map from message type to a boolean denoting whether a
message of that type has been received before.
"""
if not message_state[MESSAGE_TYPE_TEST_STARTED]:
raise WebGpuMessageProtocolError('Received heartbeat before test start')
if message_state[MESSAGE_TYPE_TEST_STATUS]:
raise WebGpuMessageProtocolError(
'Received heartbeat after test supposedly done')


def VerifyMessageOrderTestStatus(message_state: Dict[str, bool]) -> None:
"""Helper function to verify that messages are ordered correctly.
Handles MESSAGE_TYPE_TEST_STATUS messages.
Split out to reduce the number of branches within a single function.
Args:
message_state: A map from message type to a boolean denoting whether a
message of that type has been received before.
"""
if not message_state[MESSAGE_TYPE_TEST_STARTED]:
raise WebGpuMessageProtocolError(
'Received test status message before test start')
if message_state[MESSAGE_TYPE_TEST_STATUS]:
raise WebGpuMessageProtocolError(
'Received multiple status messages for one test')
message_state[MESSAGE_TYPE_TEST_STATUS] = True


def VerifyMessageOrderTestLog(message_state: Dict[str, bool]) -> None:
"""Helper function to verify that messages are ordered correctly.
Handles MESSAGE_TYPE_TEST_LOG messages.
Split out to reduce the number of branches within a single function.
Args:
message_state: A map from message type to a boolean denoting whether a
message of that type has been received before.
"""
if not message_state[MESSAGE_TYPE_TEST_STATUS]:
raise WebGpuMessageProtocolError(
'Received log message before status message')
message_state[MESSAGE_TYPE_TEST_LOG] = True


def VerifyMessageOrderTestFinished(message_state: Dict[str, bool]) -> None:
"""Helper function to verify that messages are ordered correctly.
Handles MESSAGE_TYPE_TEST_FINISHED messages.
Split out to reduce the number of branches within a single function.
Args:
message_state: A map from message type to a boolean denoting whether a
message of that type has been received before.
"""
# TODO(crbug.com/1340602): Add message state verification once the Dawn code
# has been updated to send multiple message types.
del message_state # currently unused


def TestNameFromInputs(query: str, worker: bool) -> str:
return 'worker_%s' % query if worker else query

Expand Down

0 comments on commit 51f1a29

Please sign in to comment.