# 3. 数値をリアルタイムで取得し、関数処理を実行する

このケースでは、以下の方法を中心に解説します。

- 他のエッジからリアルタイムで送付されるデータを取得する
- 取得したデータに対して関数処理を実行する
- 関数処理済みのデータをリアルタイムにアップロードする

## シナリオ
iOSアプリケーション  **intdash Motion** を活用します。
iPhoneからストリーミングされたデータを受信し、`intdash-py` にてセンサー情報に関数処理を実行し、サーバーにアップロードします。本シナリオでは、関数処理のサンプルとして移動平均を使用します。

## 事前準備

本シナリオを実施する前に、以下を用意する必要があります。

- 計測用のエッジ
- intdash Motion アプリ
- 汎用センサーデータに紐づく信号定義

### 使用データ
本シナリオでは、事前に以下のデータをサーバー側に準備する必要があります。

|データ項目|本シナリオで登場するデータ名|
|:---|:---|
|データ取得用エッジ|sdk_edge1|
|データアップロード用エッジ|sdk_edge2|
|信号定義(※)| `sp_ACCX`, `sp_ACCY`, `sp_ACCZ`|

(※) **「1. 時系列データを取得し、CSVで保存する」** で使用した信号定義と同じものを使用します。   


### パッケージのimportとクライアントの生成
`intdash.Client` に与える `url` は intdashサーバーの環境情報から、 `edge_token` はログイン用エッジで発行したトークンを指定してください。  
(※ `username` と `password` でのログインも可能ですが、継続して動作する場合はエッジトークンの使用を推奨します)

In [43]:
import pandas as pd

import intdash
from intdash import timeutils

# Create client
client = intdash.Client(
    url = "https://example.intdash.jp",
    edge_token = "your_token",
)

### 信号定義の登録
「1.時系列データを取得し、CSVで保存する」 で使用した信号定義と同じ信号定義を使います。
汎用センサー型から数値に変換するための実行ファイルは、以下を確認してください。

[汎用センサー（General Sensor）型向け 信号定義サンプル](https://docs.intdash.jp/sdk/python/latest/ja/guide/signals/generalsensor.html) 

本シナリオでは、「汎用センサー型」のうち、「加速度」に対してのみ変換定義を登録します。  

### 信号定義が登録されていることを確認
このシナリオで使用する信号定義が登録されていることを確認します。

In [2]:
signals = client.signals.list(label='sp')

In [49]:
for s in signals:
    print(s.label,  end=', ')

sp_ACCX, sp_ACCY, sp_ACCZ, 

## 使用するエッジの取得

In [3]:
sdk_edge1 = client.edges.list(name='sdk_edge1')[0]
sdk_edge2 = client.edges.list(name='sdk_edge2')[0]

In [4]:
sdk_edge1.name, sdk_edge2.name

('sdk_edge1', 'sdk_edge2')

## Queueの作成
サーバーからデータを受け取ってからサーバーに返すまでの処理はQueueを使って行います。

In [5]:
import queue
q = queue.Queue(maxsize=5)

## データの取得 (Downstream) の準備
あるエッジが送信している時系列データをサーバーを介して受け取る処理を定義します

### リクエストを作成する
`src_edge_uuid` には、データの送り元のエッジを指定します。この例では、 **intdash Motion** を実行しているエッジ `sdk_edge1` です。
`intdash.DataFilter` の `data_id` に信号定義の `label` 名を指定します。

In [6]:
d_specs = [
        intdash.DownstreamSpec(
            src_edge_uuid = sdk_edge1.uuid, #edge_uuid
            filters = [
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCX',channel=1),  # Acceleration X
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCY',channel=1),  # Acceleration Y
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCZ',channel=1),  # Acceleration Z
            ],
        ),
    ]

### データ受け取り時の関数処理を定義する
本シナリオでは、データの受け取り側は、時系列データを受け取りそのままQueueに追加する処理を定義します。

In [7]:
# Put the received time-series data to the Queue.
def callback(unit):
    try:
        q.put_nowait(unit)
    except queue.Full:
        pass

## データのアップロード (Upstream) の準備
受け取ったデータを加工してサーバーに新しい時系列データとして送付する処理を定義します。

### リクエストを作成する
アップロード時に使用するエッジのUUIDを指定します。

In [9]:
u_specs = [
        intdash.UpstreamSpec(
            src_edge_uuid = sdk_edge2.uuid,
        )
    ]

### 加工処理を定義する
本シナリオでは、移動平均を算出してサーバー側に返す処理を定義します。

In [10]:
import numpy as np

# The function calculate moving average.
def calc_ave(score, array, ave_num):
    array.append(score)
    if len(array) > ave_num:
        array.popleft()
  
    return  np.sum(array)/ len(array)

### データの送付側の関数処理を定義する

以下の処理を定義します

- Queueからデータを取得する
- 取得した時系列データに対してデータ型ごとに処理を分岐する
- 加工処理にデータをいれる
- 新たに作成したデータを送付する( yield で intdash.Unit を返す)

In [11]:
AVE_NUM = 5

import struct
from collections import deque

acc_x_dq = deque([])
acc_y_dq = deque([])
acc_z_dq = deque([])


# Calculate moving average of the received time-series data,  convert it to Unit and upload. 
def upload_func():
    while True:
        try:
            unit = q.get_nowait()
            
            # Skip basetime.
            if unit.data.data_type.value == intdash.DataType.basetime.value:
                yield unit
                continue
            
            # Skip other data.
            if unit.data.data_type.value != intdash.DataType.float.value:
                yield unit
                continue
                
            # Get intdash.intdash.data.GeneralSensor.
            sensor_data = unit.data
                
            if unit.data.data_id == 'sp_ACCX':
                acc_x = unit.data.value
                ave_acc_x = calc_ave(acc_x, acc_x_dq, AVE_NUM)
                
                if ave_acc_x is None:
                    continue
               
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data =  intdash.data.Float(data_id = 'sp_ACCX', value =ave_acc_x ),
                    )
                continue
                
                 
            if unit.data.data_id == 'sp_ACCY':
                acc_y = unit.data.value
                ave_acc_y = calc_ave(acc_y, acc_y_dq, AVE_NUM)
                
                if ave_acc_y is None:
                    continue
               
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data =  intdash.data.Float(data_id = 'sp_ACCY', value =ave_acc_y ),
                    )
                continue
                
            if unit.data.data_id == 'sp_ACCZ':
                acc_z = unit.data.value
                ave_acc_z = calc_ave(acc_z, acc_z_dq, AVE_NUM)
                
                if ave_acc_z is None:
                    continue
                
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data = intdash.data.Float(data_id = 'sp_ACCZ', value =ave_acc_z ),
                    )
                
                continue
                
        except queue.Empty:
            yield

## ストリーム処理を開始する

In [12]:
wsconn = client.connect_websocket()

### Upstreamを開始する

In [13]:
wsconn.open_upstreams(
    specs = u_specs,
    iterators = [upload_func()],
)

### Downstreamを開始する

In [14]:
wsconn.open_downstreams(
    specs = d_specs,
    callbacks = [callback],
)

##  切断する
処理を終了したい場合、必ず以下を実行し切断処理をおこなってください。

In [15]:
wsconn.close()

## Visual M2M Data Visualizer でデータを確認する
**Visual M2M Data Visualizer** を使用すると、リアルタイムでデータの通信が行われていることが確認できます。本notebookと同じディレクトリに保存されている 「SCREENファイル(.scrn)」 と「DATファイル(.dat)」を **Visual M2M Data Visualizer** にインポートすると、以下のようにデータを確認することができます。  (詳細は **Visual M2M Data Visualizer** の操作マニュアルを確認してください。)
 
 
以下の画面では、`Acceleration raw` パネルに変換前のデータ(Motionアプリが送信しているデータ)が表示され、 `Acceleration Converted` パネルに `intdash-py` を使って計算した移動平均が表示されています。

<img src="https://github.com/aptpod/intdash-py-samples/blob/master/img/img3.png?raw=true">