11import os
2+ import select
23import socket
34import struct
45import subprocess
@@ -17,27 +18,47 @@ def stream_monitor(stream, callback=None):
1718
1819def socket_monitor (socket , callback = None ):
1920 try :
20- while True :
21- msg_len = struct .unpack (">I" , socket .recv (4 ))[0 ]
22- msg = socket .recv (msg_len )
23- print (f"Received message of length { msg_len } " )
24- callback (msg )
25- except OSError :
21+ while socket :
22+ ret = select .select ([socket ], [], [], 1 )
23+ if len (ret [0 ]):
24+ msg_len = struct .unpack (">I" , socket .recv (4 ))[0 ]
25+ buf = None
26+ while socket and msg_len :
27+ msg = socket .recv (msg_len )
28+ if len (msg ) == 0 : break
29+ buf = buf + msg if buf else msg
30+ msg_len -= len (msg )
31+
32+ callback (buf )
33+ except (ValueError , OSError ):
34+ # terminate whenever either recv() or select() fails.
35+ # - if the socket is closed, then recv() will raise OSError as it attempts to read
36+ # from a closed file descriptor.
37+ # - if the socket is closed, then select() will receive an invalid (i.e. negative) file descriptor
38+ # and will raise a ValueError
2639 print (f"Closing listener for socket { socket } " )
2740
2841
2942class BackendContext :
30- def __init__ (self , skyline_bin , entry_point ):
43+ def __init__ (self , entry_point , stdout_fd = None , stderr_fd = None ):
3144 self .process = None
32- self .skyline_bin = skyline_bin
3345 self .entry_point = entry_point
3446 self .state = 0
47+ self .stdout_fd = stdout_fd
48+ self .stderr_fd = stderr_fd
3549
3650 def on_message_stdout (self , message ):
37- message = message .decode ("ascii" ).rstrip ()
51+ # message = message.decode("ascii").rstrip()
52+ message = message .decode ("ascii" )
53+ if self .stdout_fd :
54+ self .stdout_fd .write (message )
3855
3956 def on_message_stderr (self , message ):
40- message = message .decode ("ascii" ).rstrip ()
57+ message = message .decode ("ascii" )
58+ if self .stderr_fd :
59+ self .stderr_fd .write (message )
60+
61+ message = message .rstrip ()
4162 print ("stderr" , message )
4263 if "DeepView interactive profiling session started!" in message :
4364 self .state = 1
@@ -46,8 +67,7 @@ def spawn_process(self):
4667 # DeepView expects the entry_point filename to be relative
4768 working_dir = os .path .dirname (self .entry_point )
4869 entry_filename = os .path .basename (self .entry_point )
49- launch_command = [self .skyline_bin , "interactive" , entry_filename ]
50-
70+ launch_command = ["python" , "-m" , "deepview_profile" , "interactive" , "--debug" ]
5171 # Launch backend + listener threads for stdout and stderr
5272 self .process = subprocess .Popen (
5373 launch_command ,
@@ -64,6 +84,9 @@ def spawn_process(self):
6484 )
6585 self .stderr_thread .start ()
6686
87+ def alive (self ):
88+ return self .process and self .process .poll () is None
89+
6790 def join (self ):
6891 self .process .wait ()
6992
@@ -73,7 +96,7 @@ def terminate(self):
7396 self .stderr_thread .join ()
7497
7598
76- class SkylineSession :
99+ class DeepviewSession :
77100 def __init__ (self ):
78101 self .seq_num = 0
79102 self .received_messages = []
@@ -103,9 +126,11 @@ def send_message(self, message):
103126 self .socket .sendall (length_buffer )
104127 self .socket .sendall (buf )
105128
106- def send_initialize_request (self ):
129+ def send_initialize_request (self , project_root ):
107130 request = innpv_pb2 .InitializeRequest ()
108131 request .protocol_version = 5
132+ request .project_root = project_root
133+ request .entry_point = "entry_point.py"
109134 self .send_message (request )
110135
111136 def send_analysis_request (self ):
@@ -116,12 +141,17 @@ def send_analysis_request(self):
116141 def handle_message (self , message ):
117142 from_server = innpv_pb2 .FromServer ()
118143 from_server .ParseFromString (message )
119- # print("From Server:")
120- # print(from_server)
144+ print ("From Server:" )
145+ print (from_server )
121146
122147 self .received_messages .append (from_server )
123148 print (f"new message. total: { len (self .received_messages )} " )
124149
150+ def alive (self ):
151+ # makes sure that the last message is not error
152+ return (not self .received_messages ) or \
153+ not self .received_messages [- 1 ].HasField ("analysis_error" )
154+
125155 def cleanup (self ):
126156 # Closing the socket should cause the listener thread to die
127157 self .socket .close ()
0 commit comments