Skip to content
Browse files

Format and type integration test Python files.

I want these to be formatted and statically typed, but I don't want to mix formatting
changes in with my actual changes, so I'm doing them in a separate commit.

Reviewed By: arxanas

Differential Revision: D8811679

fbshipit-source-id: de7ff8b3f06a5d7c57bbc2ad4cb8fb16ba3d9072
  • Loading branch information
Will Pitts (they/them) authored and hhvm-bot committed Jul 11, 2018
1 parent 7a0e7cb commit d57de3d5af48f64e8c942f537ca49bd00d67ca11
@@ -0,0 +1 @@
Only here to get flake8 to stop using Python 2 syntax.
@@ -20,7 +20,7 @@ class CommonTestDriver(object):
# This needs to be overridden in child classes. The files in this
# directory will be used to set up the initial environment for each
# test.
template_repo = None
template_repo: str = None

def setUpClass(cls):
@@ -2,29 +2,41 @@
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from typing import Any, Iterator, Mapping, Optional, Sequence, Type, TypeVar
import contextlib
import subprocess
import uuid
import time
from hh_paths import hh_client
from jsonrpc_stream import JsonRpcStreamReader
from jsonrpc_stream import JsonRpcStreamWriter
from jsonrpc_stream import JsonRpcStreamReader, JsonRpcStreamWriter

Json = Mapping[str, Any]
Transcript = Mapping[str, Mapping[str, Optional[Json]]]

class LspCommandProcessor:
def __init__(self, proc, reader, writer):
U = TypeVar("U", bound="LspCommandProcessor")

def __init__(
proc: subprocess.Popen,
reader: JsonRpcStreamReader,
writer: JsonRpcStreamWriter,
) -> None:
self.proc = proc
self.reader = reader
self.writer = writer

def create(cls, env):
proc = subprocess.Popen([hh_client, 'lsp', '--enhanced-hover'],
def create(cls: Type[U], env: Mapping[str, str]) -> Iterator[U]:
proc = subprocess.Popen(
[hh_client, "lsp", "--enhanced-hover"],
reader = JsonRpcStreamReader(proc.stdout)
writer = JsonRpcStreamWriter(proc.stdin)
@@ -41,25 +53,25 @@ def create(cls, env):
# notify_timeout is the number of seconds to wait for responses
# from the server that aren't caused by a request. these could
# be errors or server notifications.
def communicate(self,

def communicate(
self, json_commands: Sequence[Json], request_timeout=30, notify_timeout=1
) -> Transcript:
transcript = self._send_commands({}, json_commands)

# we are expecting at least one response per request sent so
# we read these giving the server more time to respond with them.
transcript = self._read_request_responses(transcript,
transcript = self._read_request_responses(
transcript, json_commands, request_timeout

# because it's possible the server sent us notifications
# along with responses we need to try to keep reading
# from the stream to get anything that might be left.
return self._read_extra_responses(transcript, notify_timeout)

def _send_commands(self, transcript, commands):
def _send_commands(
self, transcript: Transcript, commands: Sequence[Json]
) -> Transcript:
for command in commands:
transcript = self._scribe(transcript, sent=command, received=None)
@@ -70,39 +82,40 @@ def _send_commands(self, transcript, commands):

return transcript

def _read_request_responses(self,
def _read_request_responses(
self, transcript: Transcript, commands: Sequence[Json], timeout_seconds: float
) -> Transcript:
for _ in self._requests_in(commands):
response = self._try_read_logged(timeout_seconds)
transcript = self._scribe(transcript, sent=None, received=response)
return transcript

def _read_extra_responses(self, transcript, timeout_seconds):
def _read_extra_responses(
self, transcript: Transcript, timeout_seconds: float
) -> Transcript:
while True:
response = self._try_read_logged(timeout_seconds)
if not response:
transcript = self._scribe(transcript, sent=None, received=response)
return transcript

def _scribe(self, transcript, sent, received):
def _scribe(
self, transcript: Transcript, sent: Optional[Json], received: Optional[Json]
) -> Transcript:
transcript = dict(transcript)

id = self._transcript_id(sent, received)

if sent and not received:
received = transcript[id]['received'] if id in transcript else None
received = transcript[id]["received"] if id in transcript else None

if received and not sent:
sent = transcript[id]['sent'] if id in transcript else None

transcript[id] = {'sent': sent, 'received': received}
sent = transcript[id]["sent"] if id in transcript else None

transcript[id] = {"sent": sent, "received": received}
return transcript

def _transcript_id(self, sent, received):
def _transcript_id(self, sent: Optional[Json], received: Optional[Json]) -> str:
assert sent is not None or received is not None

def make_id(json, idgen):
@@ -116,33 +129,33 @@ def make_id(json, idgen):
return make_id(received, LspCommandProcessor._server_notify_id)

def _requests_in(self, commands):
def _requests_in(self, commands: Sequence[Json]) -> Sequence[Json]:
return [c for c in commands if LspCommandProcessor._has_id(c)]

def _try_read_logged(self, timeout_seconds):
def _try_read_logged(self, timeout_seconds: float) -> Optional[Json]:
response = self.reader.try_read(timeout_seconds)
return response

def _has_id(json):
return 'id' in json
def _has_id(json: Json) -> bool:
return "id" in json

def _client_notify_id():
return LspCommandProcessor._notify_id('NOTIFY_CLIENT_TO_SERVER_')
def _client_notify_id() -> str:
return LspCommandProcessor._notify_id("NOTIFY_CLIENT_TO_SERVER_")

def _server_notify_id():
return LspCommandProcessor._notify_id('NOTIFY_SERVER_TO_CLIENT_')
def _server_notify_id() -> str:
return LspCommandProcessor._notify_id("NOTIFY_SERVER_TO_CLIENT_")

def _notify_id(prefix):
def _notify_id(prefix: str) -> str:
return prefix + str(uuid.uuid4())

def _request_id(json_command):
return LspCommandProcessor.request_id(json_command['id'])
def _request_id(json_command: Json) -> str:
return LspCommandProcessor.request_id(json_command["id"])

def request_id(id):
return 'REQUEST_' + str(id)
def request_id(id: str) -> str:
return "REQUEST_" + str(id)

0 comments on commit d57de3d

Please sign in to comment.