In [5]:
import grpc
import Api_pb2 as api
import Api_pb2_grpc as api_grpc

import pandas as pd
import time
# import route_guide_resources    

### Receive stream by data and device code

Create request

In [11]:
timeFrom = pd.to_datetime("2020-11-12 21:00:00", format='%Y-%m-%d %H:%M:%S')
timeTo = pd.to_datetime("2020-11-13 21:00:00", format='%Y-%m-%d %H:%M:%S')

rangeStreamRequest = api.ObjectsDataRangeRequest(
    Filter= api.DataFilter(
        DateFrom= 			int(timeFrom.timestamp()),
        DateTo= 			int(timeTo.timestamp()),
        Subsystem= 			"kiutr", # для мусоровозов - garbade, на тестовом стенде данные по garbage отсутствуют 
        # ExcludeDeviceCode= 	["10033473", "404957","500459"], # пример исключения уже обработанных блоков
        DeviceCode=			["EEEEEEEEE121210"],  # дополнительные коды БНСО
        # StateNumber= 		["Н040РА195"],  # дополнительные госномера
    ),
    Fields = api.FieldsToggle(
        Position=True # запрашивает только навигационную информацию
    )
)



In [12]:
TEST_ENDPOINT_ADDRESS = 'rnis-tm.t1-group.ru:18082'
REAL_ENDPOINT_ADDRESS = 'portal.rnis.mosreg.ru:52096'

In [19]:

def get_stream_by_request(channel, request, verbose=True):
    if verbose: print("------------Create Stub--------------")
    stub = api_grpc.APIStub(channel)
    if verbose: print(stub)
    
    if verbose: print("------------Get DataStream-----------")
    stream = stub.GetObjectsDataStream(request)
    # stream = script.get_data_stream(stub)
    if verbose: print(stream)
    return stream


def pack_stream_to_df(stream, max_records=10_000):
    records_df = pd.DataFrame()
    it = 0
    for object_data in stream:
        it += 1
        if it == max_records:
            break

        # metadata
        print('object_data: ', object_data)
        device_code = object_data.DeviceCode
        # state_number = object_data.StateNumber
        
        # driver = object_data.Metadata.Driver

        data_point = object_data.Data

        device_time = data_point.DeviceTime
        accelerations = data_point.Accelerations
        endpoint = data_point.Endpoint            
        
        # gps data
        longitude = data_point.Position.Longitude
        latitude = data_point.Position.Latitude
        # altitude = data_point.Position.Altitude
        course = data_point.Position.Course
        speed = data_point.Position.Speed
        valid = data_point.Position.Valid

        # add to dataframe
        record = {
            'device_code': device_code,
            # 'state_number': state_number,
            'device_time': device_time,            
            'course': course,
            'longitude': longitude,
            'latitude': latitude,
            # 'altitude': altitude,
            'speed': speed,
            'valid': valid,
            'accelerations': accelerations,
            # 'driver': driver,
            'endpoint': endpoint
        }

        records_df = records_df.append(record, ignore_index=True)
        return records_df

In [20]:

with grpc.insecure_channel(TEST_ENDPOINT_ADDRESS) as channel:        
    stream = get_stream_by_request(channel, rangeStreamRequest)
    
    print("------ Retreive data from stream-----")
    records_df = pack_stream_to_df(stream, max_records=10_000)
    records_df.to_csv('data/records_{}.csv'.format(str(int(time.time()))), index=False)
    print('Done!')

------------Create Stub--------------
<Api_pb2_grpc.APIStub object at 0x7f00deaa4978>
------------Get DataStream-----------
<_MultiThreadedRendezvous object>
------ Retreive data from stream-----
object_data:  DeviceCode: "EEEEEEEEE121210"
DeviceTime: 1606041355
Position {
  Longitude: 43.29318166787112
  Latitude: 54.91758586254846
  Course: 100
}
State {
  Ignition: true
  DigitalPorts {
    key: 1
    value: false
  }
  DigitalPorts {
    key: 2
    value: false
  }
  DigitalPorts {
    key: 3
    value: false
  }
  DigitalPorts {
    key: 4
    value: false
  }
  DigitalPorts {
    key: 5
    value: false
  }
  DigitalPorts {
    key: 6
    value: false
  }
  DigitalPorts {
    key: 7
    value: false
  }
  has_Moving: true
  has_Ignition: true
}
ReceivedTime: 1606042350
Accelerations {
}



AttributeError: Data

### Получение инфо о ТС

In [22]:


def get_vehicles_request(subsystems, endpoint_address, verbose=True):
    vehicles_df = pd.DataFrame()

    vehicles_request = api.GetVehiclesRequest(Subsystems=subsystems)
    with grpc.insecure_channel(endpoint_address) as channel:

        if verbose: print("------------Create Stub--------------")
        stub = api_grpc.APIStub(channel)
        if verbose: print(stub)
        
        if verbose: print("------------Get objects info-----------")
        result = stub.GetVehicles(vehicles_request)
        # if verbose: print(result)

        for vehicle in result.Vehicles:
            vehicle_record = {"subsystem": vehicle.Subsystem,
                            'device_code': vehicle.DeviceCode,
                            "state_number": vehicle.StateNumber,
                            'vehicle_mark': vehicle.VehicleMark,
                            'vehicle_model': vehicle.VehicleModel,
                            'unit_inn': vehicle.UnitINN}

            vehicles_df.append(vehicle_record, ignore_index=True)

        return vehicles_df





Получение информации об объектах

In [37]:
timeFrom = pd.to_datetime("2020-09-13 21:00:00", format='%Y-%m-%d %H:%M:%S')
timeTo = pd.to_datetime("2020-09-14 21:00:00", format='%Y-%m-%d %H:%M:%S')

objects_info_request = api.ObjectsInfoRequest(Filter= api.DataFilter(
        DateFrom= 			94489479168,#int(timeFrom.timestamp()),
        DateTo= 			94789479168#int(timeTo.timestamp()),
        # Subsystem= 			"kiutr", # для мусоровозов - garbade, на тестовом стенде данные по garbage отсутствуют 
        # ExcludeDeviceCode= 	["10033473", "404957","500459"], # пример исключения уже обработанных блоков
        # DeviceCode=			["356850083601698"],  # дополнительные коды БНСО
        # StateNumber= 		["Н040РА195"],  # дополнительные госномера
    ))

In [38]:
endpoint_address = 'rnis-tm.t1-group.ru:18082'
with grpc.insecure_channel(endpoint_address) as channel:

    print("------------Create Stub--------------")
    stub = api_grpc.APIStub(channel)
    print(stub)
    
    print("------------Get objects info-----------")
    objects_info_responce = stub.GetObjectsInfo(objects_info_request)
    print(objects_info_responce)


------------Create Stub--------------
<Api_pb2_grpc.APIStub object at 0x7fe32c8a80f0>
------------Get objects info-----------


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "grpc: error while marshaling: proto: Marshal called with nil"
	debug_error_string = "{"created":"@1605545275.778071972","description":"Error received from peer ipv4:185.98.83.83:18082","file":"src/core/lib/surface/call.cc","file_line":1061,"grpc_message":"grpc: error while marshaling: proto: Marshal called with nil","grpc_status":13}"
>

получение событий по устройству за период

Подписка на поток

In [45]:
timeFrom = pd.to_datetime("2020-09-13 21:00:00", format='%Y-%m-%d %H:%M:%S')
timeTo = pd.to_datetime("2020-09-15 21:00:00", format='%Y-%m-%d %H:%M:%S')

rangeStreamRequest = api.ObjectsDataRangeRequest(
    Filter= api.DataFilter(
        DateFrom= 			int(timeFrom.timestamp()),
        DateTo= 			int(timeTo.timestamp()),
        Subsystem= 			"kiutr", # для мусоровозов - garbade, на тестовом стенде данные по garbage отсутствуют 
        # ExcludeDeviceCode= 	["10033473", "404957","500459"], # пример исключения уже обработанных блоков
        # DeviceCode=			["10033473","404957"],  # дополнительные коды БНСО
        # StateNumber= 		["Н040РА195"],  # дополнительные госномера
    ),
    Fields = api.FieldsToggle(
        Position=True # запрашивает только навигационную информацию
    )
)



In [10]:
endpoint_address = 'rnis-tm.t1-group.ru:18082'
# endpoint_address = 'rnis-tm.t1-group.xu:000000'

# localhost = 'localhost:50051'
max_records = 250
with grpc.insecure_channel(endpoint_address) as channel:

    print("------------Create Stub--------------")
    stub = api_grpc.APIStub(channel)
    print(stub)
    
    print("------------Get DataStream-----------")
    stream = stub.GetObjectsDataStream(rangeStreamRequest)
    # stream = script.get_data_stream(stub)
    print(stream)

    print("------ Retreive data from stream-----")
    i = 0
    for data_point in stream:
        i += 1
        if i == max_records:
            break


        device_time = data_point.DeviceTime
        device_code = data_point.DeviceCode
        gps_data = data_point.Position
        accelerations = data_point.Accelerations

        longitude = gps_data.Longitude
        latitude = gps_data.Latitude
        altitude = gps_data.Altitude
        course = gps_data.Course
        speed = gps_data.Speed
        valid = gps_data.Valid
        hdop = gps_data.HDOP

        print('datapoint ', i)
        print('device_code: ', device_code)
        print('device_time: ', device_time)
        print('longitude: ', longitude)
        print('latitude: ', latitude)
        print('altitude: ', altitude)
        print('course: ', course)
        print('speed: ', speed)
        print('accelerations: ', accelerations)
        print('valid: ', valid)
        print('hdop: ', hdop)
        print()

        # add to dataframe
        record = {
            'device_code': device_code,
            # 'state_number': object_data.StateNumber,
            'device_time': device_time,            
            # 'state': data_point.ObjectState,
            # 'received_time': data_point.ReceivedTime,
            'accelerations': accelerations,
            # 'fuel_spent': data_point.FuelSpent,
            'longitude': longitude,
            'latitude': latitude,
            'altitude': altitude,
            'course': course,
            # 'satellites': satellites,
            'speed': speed,
            'valid': valid,
            'hdop': hdop,
        }

        records_df = records_df.append(record, ignore_index=True)

53607948834
altitude:  0.0
course:  129
speed:  0
accelerations:  
valid:  False
hdop:  0

datapoint  2
device_code:  46112611
device_time:  1605540467
longitude:  43.47680033265539
latitude:  56.23896540520688
altitude:  0.0
course:  67
speed:  20
accelerations:  
valid:  False
hdop:  0

datapoint  3
device_code:  46112611
device_time:  1605540468
longitude:  43.476902256597974
latitude:  56.23898149846098
altitude:  0.0
course:  78
speed:  21
accelerations:  
valid:  False
hdop:  0

datapoint  4
device_code:  46112611
device_time:  1605540469
longitude:  43.47701759158564
latitude:  56.23899759171507
altitude:  0.0
course:  83
speed:  22
accelerations:  
valid:  False
hdop:  0

datapoint  5
device_code:  46112611
device_time:  1605540489
longitude:  43.47887368022438
latitude:  56.23911024449372
altitude:  0.0
course:  80
speed:  0
accelerations:  
valid:  False
hdop:  0

datapoint  6
device_code:  40931529
device_time:  1605540409
longitude:  43.389019678204555
latitude:  56.2268901

Получение данных по фильтру

- чтобы сичтать что на остановке, брать радиус 50 метров
- можно брать другие маршруты через ту же остановку (есть такой метод)
- сравнить с расписанием для построения временного ряда
- предсказываем на полчаса + вперед, то есть опоздание предсказываем за полчаса
- требуемая точность - +- 4 минуты в 95% (сейчас норм -4+9 минут в городе, в пригороде больше) желательно -2+4 минуты
- посмотреть на типы рейсов, для разных рейсов нужна в идеале разная точность
- попадаем ли в точность на маршрутах разной нагруженности
- желательно одну модель на все / как можно меньше моделей
- желательно, чтобы предсказание строилось __постоянно__



In [5]:
endpoint_address = 'rnis-tm.t1-group.ru:18082'
# endpoint_address = 'rnis-tm.t1-group.xu:000000'

# localhost = 'localhost:50051'
with grpc.insecure_channel(endpoint_address) as channel:

    print("------------Create Stub--------------")
    stub = api_grpc.APIStub(channel)
    print(stub)
    
    print("------------Get DataStream-----------")
    stream = stub.GetObjectsDataRangeAsStream(rangeStreamRequest)
    # stream = script.get_data_stream(stub)
    print(stream)

    print("------ Retreive data from stream-----")
    i = 0
    for object_data in stream:
        i += 1
        if i == 5:
            break

        device_code = object_data.DeviceCode
        # state_number = object_data.StateNumber
        data_point = object_data.DataPoint

        device_time = data_point.DeviceTime
        gps_data = data_point.Position
        state = data_point.ObjectState
        # received_time = data_point.ReceivedTime
        accelerations = data_point.Accelerations
        # fuel_spent = data_point.FuelSpent

        longitude = gps_data.Longitude
        latitude = gps_data.Latitude
        altitude = gps_data.Altitude
        course = gps_data.Course
        # satellites = gps_data.Satellites
        speed = gps_data.Speed
        valid = gps_data.Valid
        hdop = gps_data.HDOP

        print('datapoint ', i)
        print('device_code: ', device_code)
        print('device_time: ', device_time)
        print('state: ', state)
        print('longitude: ', longitude)
        print('latitude: ', latitude)
        print('altitude: ', altitude)
        print('course: ', course)
        print('speed: ', speed)
        print('accelerations: ', accelerations)
        print('valid: ', valid)
        print('hdop: ', hdop)
        print()

------------Create Stub--------------
<Api_pb2_grpc.APIStub object at 0x7f7e72071320>
------------Get DataStream-----------
<_MultiThreadedRendezvous object>
------ Retreive data from stream-----


In [13]:
# with grpc.insecure_channel(endpoint_address) as channel:

#     print("------------Create Stub--------------")
#     stub = api_grpc.APIStub(channel)
#     print(stub)
    
#     print("------------Get DataStream-----------")
#     stream = stub.GetObjectsDataRangeAsStream(rangeStreamRequest)
#     # stream = script.get_data_stream(stub)
#     print(stream)

In [14]:
print("------------Open Channel--------------")
channel = grpc.insecure_channel(endpoint_address)
channel




------------Open Channel--------------


<grpc._channel.Channel at 0x7f8f67f575c0>

In [15]:
print("------------Create Stub--------------")
stub = api_grpc.APIStub(channel)
stub



------------Create Stub--------------


<Api_pb2_grpc.APIStub at 0x7f8f67f57940>

In [16]:
print("------------Get DataStream-----------")
stream = stub.GetObjectsDataRangeAsStream(rangeStreamRequest)
# stream = script.get_data_stream(stub)
stream

------------Get DataStream-----------


<_MultiThreadedRendezvous object>

In [17]:
for _ in stream:
    print(_)

_

<_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.OK
	details = ""
>

In [11]:
print("------------Close Channel--------------")
channel.close()

------------Close Channel--------------


In [16]:
# records_df = pd.DataFrame(columns=['device_code', 
#                             'state_number',
#                             'course',
#                             'device_time',
#                             'state',
#                             'longitude',
#                             'latitude',
#                             'altitude',
#                             'speed', 
#                             'accelerations',
#                             'fuel_spent',
#                             'satellites'
#                             'valid',
#                             'hdop'])

    
# with grpc.insecure_channel(endpoint_address) as channel:
#     print("------------Create Stub--------------")
#     stub = api_grpc.APIStub(channel)

#     print("------------Get DataStream-----------")
#     stream = stub.GetObjectsDataRangeAsStream(rangeStreamRequest)

#     print("------ Retreive data from stream-----")

#     i = 0
#     for object_data in stream:
#         i += 1
#         if i == max_records:
#             break

#         data_point = object_data.DataPoint
#         gps_data = data_point.Position
#         record = {
#             'device_code': object_data.DeviceCode,
#             'state_number': object_data.StateNumber,
#             'device_time': data_point.DeviceTime,            
#             'state': data_point.ObjectState,
#             'received_time': data_point.ReceivedTime,
#             'accelerations': data_point.Accelerations,
#             'fuel_spent': data_point.FuelSpent,
#             'longitude': gps_data.Longitude,
#             'latitude': gps_data.Latitude,
#             'altitude': gps_data.Altitude,
#             'course': gps_data.Course,
#             'satellites': gps_data.Satellites,
#             'speed': gps_data.Speed,
#             'valid': gps_data.Valid,
#             'hdop': gps_data.HDOP,
#         }

------------Create Stub--------------
------------Get DataStream-----------
------ Retreive data from stream-----
