# 5. スマートフォンの伝送データをリアルタイムに加工して送信してみよう

このチュートリアルでは **intdash Motion** で取得したデータに、**intdash SDK for Python** （以下、intdash SDKと呼びます）にて関数処理をかけ、intdashにアップロードします。 関数処理のサンプルとして、加速度データに対して移動平均を適用します。

このケースでは、以下の方法を中心に解説します。

- 他のエッジからリアルタイムで伝送されるデータを取得する
- 取得したデータに対して関数処理を適用する
- 関数処理を適用したデータをリアルタイムにアップロードする


## 5.0 事前準備

本シナリオを実施する前に、以下を用意する必要があります。

- 計測用のエッジ
- intdash Motionアプリ
- 汎用センサーデータに紐づく信号定義

### 使用データ
本シナリオでは、事前に以下のデータをサーバー側に準備する必要があります。

|データ項目|本シナリオで登場するデータ名|
|:---|:---|
|データ取得用エッジ|edge1|
|データアップロード用エッジ|edge2|
|信号定義(※)| `sp_ACCX`, `sp_ACCY`, `sp_ACCZ`|

(※) SDK チュートリアル [3. スマートフォンの伝送データをCSVで保存してみよう](../3_save-data-as-csv/3_save-data-as-csv.ipynb) で使用した信号定義と同じものを使用します。   


### パッケージのimportとクライアントの生成
`intdash.Client` に与える `url` は intdashサーバーの環境情報から、`username` と `password` はログイン用エッジで発行したアクセス情報を指定してください。  

In [4]:
import pandas as pd

import intdash
from intdash import timeutils

# Create client
client = intdash.Client(
    url = "https://example.intdash.jp",
    username = "edge1",
    password="password_here"
)

### 信号定義が登録されていることを確認する
このシナリオで使用する信号定義が登録されていることを確認します。  
登録されていない場合、次手順の **「(Option) 信号定義を登録する」** を参照してください。

In [48]:
signals = client.signals.list(label='sp')

In [49]:
for s in signals:
    print(s.label,  end=', ')

sp_ACCX, sp_ACCY, sp_ACCZ, 

### (Option) 信号定義の登録する
```
Warning:
    既にサーバー側に対象の信号定義が登録されている場合、本手順はスキップしてください。
```

SDKチュートリアル [3. スマートフォンの伝送データをCSVで保存してみよう](../3_save-data-as-csv/3_save-data-as-csv.ipynb) で使用した信号定義と同じ信号定義を使います。  
信号定義を登録するためには、以下のファイルを実行してください。

[0_create-signal-general-sensor.ipynb](../0_create-signal-general-sensor/0_create-signal-general-sensor.ipynb)  

今回は、「汎用センサー型」のうち、「加速度」に対してのみ変換定義を登録します。  

## 5.1 使用するエッジの取得

In [5]:
edges = client.edges.list(name='edge')
edge1 = edges[0]
edge2 = edges[1]

In [6]:
edge1.name, edge2.name

('edge1', 'edge2')

## 5.2 Queueの作成
サーバーからデータを受け取ってからサーバーに返すまでの処理はQueueを使って行います。

In [8]:
import queue

q = queue.Queue(maxsize=5)

## 5.3 データの取得 (Downstream) の準備
あるエッジが送信している時系列データを、サーバーを介して受け取る処理を定義します。

### 5.3.1 リクエストを作成する
`src_edge_uuid` には、データの送り元のエッジを指定します。この例では、 **intdash Motion** を実行しているエッジ `edge1` です。
`intdash.DataFilter` の `data_id` に信号定義の `label` 名を指定します。

In [9]:
d_specs = [
        intdash.DownstreamSpec(
            src_edge_uuid = edge1.uuid, 
            filters = [
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCX',channel=1),  # Acceleration
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCY',channel=1),  # Acceleration
                 intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='sp_ACCZ',channel=1),  # Acceleration
            ],
        ),
    ]

### 5.3.2 データ受け取り時の関数処理を定義する
本シナリオでは、データの受け取り側は、時系列データを受け取りそのままQueueに追加します。

In [10]:
# Put the received time-series data to the Queue.
def callback(unit):
    try:
        q.put_nowait(unit)
    except queue.Full:
        pass

## 5.4 データのアップロード (Upstream) の準備
受け取ったデータを加工してサーバーに新しい時系列データとして送付する処理を定義します。

### 5.4.1 リクエストを作成する
アップロード時に使用するエッジのUUIDを指定します。

In [11]:
u_specs = [
        intdash.UpstreamSpec(
            src_edge_uuid = edge2.uuid,
        ),
    ]

### 5.4.2 加工処理を定義する
本シナリオでは、移動平均を算出してサーバー側に返す処理を定義します。

In [12]:
import numpy as np

# The function to calculate moving average.
def calc_ave(score, array, ave_num):
    array.append(score)
    if len(array) > ave_num:
        array.popleft()
  
    return  np.sum(array)/ len(array)

### 5.4.3 データの送付側の関数処理を定義する

今回は以下の処理を定義します。

- Queueからデータを取得する
- 取得した時系列データに対してデータ型ごとに処理を分岐する
- 加工処理にデータを入れる
- 新たに作成したデータを送付する( yield で intdash.Unit を返す)

In [13]:
AVE_NUM = 5

import struct
from collections import deque

acc_x_dq = deque([])
acc_y_dq = deque([])
acc_z_dq = deque([])


# Calculate moving average of the received time-series data, convert it to Unit and upload. 
def upload_func():
    while True:
        try:
            unit = q.get_nowait()
            
            # Skip basetime.
            if unit.data.data_type.value == intdash.DataType.basetime.value:
                yield unit
                continue
                
            if unit.data.data_type.value != intdash.DataType.float.value:
                yield unit
                continue
                
            # Get intdash.intdash.data.GeneralSensor.
            sensor_data = unit.data
                
            if unit.data.data_id == 'sp_ACCX':
                acc_x = unit.data.value
                ave_acc_x = calc_ave(acc_x, acc_x_dq, AVE_NUM)
                
                if ave_acc_x is None:
                    continue
               
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data =  intdash.data.Float(data_id='sp_ACCX', value=ave_acc_x ),
                    )
                continue
                
                 
            if unit.data.data_id == 'sp_ACCY':
                acc_y = unit.data.value
                ave_acc_y = calc_ave(acc_y, acc_y_dq, AVE_NUM)
                
                if ave_acc_y is None:
                    continue
               
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data =  intdash.data.Float(data_id='sp_ACCY', value=ave_acc_y ),
                    )
                continue
                
            if unit.data.data_id == 'sp_ACCZ':
                acc_z = unit.data.value
                ave_acc_z = calc_ave(acc_z, acc_z_dq, AVE_NUM)
                
                if ave_acc_z is None:
                    continue
                
                yield intdash.Unit(
                      elapsed_time = unit.elapsed_time,
                      channel = 1,
                      data = intdash.data.Float(data_id='sp_ACCZ', value=ave_acc_z ),
                    )
                
                continue
                
        except queue.Empty:
            yield

## 5.5 ストリーム処理を開始する

In [18]:
wsconn = client.connect_websocket()

### 5.5.1 Upstreamを開始する

In [19]:
wsconn.open_upstreams(
    specs = u_specs,
    iterators = [upload_func()],
)

### 5.5.2 Downstreamを開始する

In [20]:
wsconn.open_downstreams(
    specs = d_specs,
    callbacks = [callback],
)

## 5.6 Visual M2M Data Visualizer でデータを確認する
**Visual M2M Data Visualizer** （以下、Data Visualizer と呼びます）を使用すると、リアルタイムでデータの通信が行われていることが確認できます。本notebookと同じディレクトリに保存されている 「Screenファイル(.scrn)」 と「DATファイル(.dat)」をData Visualizerにインポートすると、以下のようにデータを確認することができます。 **intdashチュートリアル2の「2.2 Data Visualizerの設定を行う」** を確認してください。
 
 
以下の画面では、`Acceleration raw` パネルに変換前のデータ（Motionアプリが送信しているデータ）が表示され、 `Acceleration Converted` パネルに `intdash-py` を使って計算した移動平均が表示されています。

<img src="https://user-images.githubusercontent.com/70192465/94385047-10d80900-017f-11eb-9c00-aae106b49aaa.png">

##  5.7 リアルタイム処理の接続を切断する
処理を終了したい場合、必ず以下を実行し切断処理をおこなってください。

In [21]:
wsconn.close()