# Script Table Operator (基本的な仕組み)

- Script Table Operator はデータベースのあるノード上で直接スクリプト (Linuxシェルコマンド) を実行する機能です
- このノートブックでは、この仕組みと実装方法を基礎から紹介します

In [2]:
%pip install pandas "sqlalchemy<2" ipython-sql teradataml

import warnings
warnings.simplefilter("ignore", (FutureWarning, DeprecationWarning))
# pandas, teradataml の警告を非表示にして見やすくするため設定
# 実行結果には影響しない

Note: you may need to restart the kernel to use updated packages.


In [3]:
from getpass import getpass
from urllib.parse import quote_plus

# 接続情報
host = "host.docker.internal"
user = "demo_user"
database = "demo_user"
password = getpass("Password > ")
dbs_port = 1025
encryptdata = "true"

connstr = (
  f"teradatasql://{user}:{quote_plus(password)}@{host}/?"
  f"&database={database}"
  f"&dbs_port={dbs_port}"
  f"&encryptdata={encryptdata}"
)

%load_ext sql
%config SqlMagic.autopandas=True
%sql {connstr}

# 接続確認
%sql SELECT database, current_timestamp

Password >  ········


 * teradatasql://demo_user:***@host.docker.internal/?database=demo_user&dbs_port=1025&encryptdata=true
1 rows affected.


Unnamed: 0,Database,Current TimeStamp(6)
0,DEMO_USER,2024-04-02 09:49:11.490000-04:00


In [4]:
# teradataml のコンテキストを開始
from sqlalchemy import create_engine
from teradataml import create_context, remove_context, DataFrame
engine = create_engine(connstr)
context = create_context(tdsqlengine=engine, temp_database_name=user)

# 接続確認
DataFrame('"dbc"."dbcInfoV"')

InfoKey,InfoData
RELEASE,17.20.03.23
VERSION,17.20.03.23
LANGUAGE SUPPORT MODE,Standard


- Script Table Operator はデータベースのあるノード上で直接スクリプト (Linuxシェルコマンド) を実行する機能です
- 下記はその最も単純な例で、`echo "Hello STO!"` というメッセージを表示させるコマンドを実行します
- 結果が複数行得られる（ここでは4つ）理由はこのコマンドがAMP（プロセス）ごとに同時に実行されるためです
- AMPはテラデータの分散単位で、システムごとに決まっています

In [20]:
q = r"""
SELECT * FROM
  SCRIPT(
    SCRIPT_COMMAND('echo "Hello STO!"')    /* command */
    RETURNS ('answer varchar(50)')         /* output type */
  )
"""

DataFrame(query=q)

answer
Hello STO!
Hello STO!
Hello STO!
Hello STO!


- 複数列の結果を出力するには、区切り字で分割された行を出力します。
- この例では、1行の出力を各AMPで実行するため、同じ結果がAMPの数だけ結果に現れます。

In [21]:
q = r"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "pi,3.14"')
    RETURNS ('variable varchar(10), "value" float')
    DELIMITER (',')
  )
"""

DataFrame(query=q)

variable,value
pi,3.14
pi,3.14
pi,3.14
pi,3.14


- 複数行の結果を出力するには、改行区切り字で分割された行を出力します。
- この例では、2行の出力を各AMPで実行するため、結果は AMP数 × 2 行になります。

In [22]:
q = r"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "hello"; echo "world"')
    RETURNS ('x varchar(10) CHARACTER SET UNICODE')
    DELIMITER (',')
  )
"""

DataFrame(query=q)

x
hello
hello
world
hello
world
world
world
hello


## STOでPythonを利用する下調べ：ノード上のPythonプログラムの探索

- STOの仕組みを利用して、各データベースノードにインストールされたPythonを実行することができます
- が、その前にこのシステムにインストールされたPythonを探してみましょう
- ひとつのシステムに複数のPythonがインストールされていることがよくあるためです

In [23]:
# Pythonプログラムの確認
# DISTINCT を入れることで同じ結果の重複を排除
q = r"""
SELECT DISTINCT * FROM

  SCRIPT(
    SCRIPT_COMMAND('which python; which python3; which tdpython; which tdpython3')
    RETURNS ('answer varchar(20)')
  )
"""

DataFrame(query=q)

answer
/usr/bin/tdpython3
/usr/bin/python
/usr/bin/python3


- このシステムには 少なくとも3つのPythonがインストールされていることがわかりました
- それぞれのバージョンを調べます

In [25]:
q = r"""
SELECT DISTINCT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "python: $(python -V)";
                    echo "python3: $(python3 -V)";
                    echo "tdpython3: $(tdpython3 -V)"')
    RETURNS ('answer VARCHAR(100)')
  )
"""

DataFrame(query=q)

answer
python3: Python 3.4.10
tdpython3: Python 3.8.17
python:


- 結果、`tdpython3` が `3.8` と最も新しいことがわかりました。以降はこれを利用します
- 実際のシステムでは、どのPythonを使うべきかはシステム管理者に確認してください

## STOでPythonプログラムを実行

- まずは Helloプログラムで動作を確認します
- 再現可能性のため、ノートブック上でPythonファイルを作成して保存する形にしていますが、実際にはエディタなどでPythonスクリプトを作成するほうが書きやすいだろうと思います

In [28]:
# "Hello Python on Vantage!"  というメッセージを出力するPythonスクリプトを実行してみます

# スクリプトを作成して保存
script = r"""
print("Hello Python on Vantage!")
"""

with open("hello.py", "w") as f:
    f.write(script)


from teradataml import install_file
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
# 自動で切り替えるオプションがないので例外処理で対応
try:
  install_file("hello", file_path="hello.py", file_on_client=True, replace=False)
except Exception as e:
  install_file("hello", file_path="hello.py", file_on_client=True, replace=True)

# ファイルの検索場所を指定
# 各セッションで一度実行
from teradataml import get_connection
q = f"SET SESSION SEARCHUIFDBPATH = {database}" 
conn = get_connection()
conn.execute(q)

q = f"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('tdpython3 {database}/hello.py')
    RETURNS ('message varchar(50)')
  )
"""
DataFrame(query=q)

File hello.py replaced in Vantage


message
Hello Python on Vantage!
Hello Python on Vantage!
Hello Python on Vantage!
Hello Python on Vantage!


- STOはスクリプトの標準出力を結果としてテーブルに保持します
- 標準出力は、Pythonでは `print` や `sys.stdout.write` などで書き出す内容に当たります
- 上のプログラムは `print` 関数で標準出力にメッセージを出力しています

### データの利用

- STOにおけるデータはスクリプトに対する標準入力より受け取ります
- 下記の例により、その動きを確認します

In [33]:
import pandas as pd

# 日本の都市の気温データをロード
filename = "data/temperature.csv"
df = pd.read_csv(filename)
df["date"] = pd.to_datetime(df["date"], format="%Y/%m/%d")
print(df.shape)
display(df)

# Teradata にロード
from teradataml import copy_to_sql
from teradatasqlalchemy import DATE, VARCHAR
copy_to_sql(df, "temperature", if_exists="replace",
            primary_index=["date", "location"],
            types={"location": VARCHAR(20), "date": DATE()})

# ロード結果の確認
DataFrame("temperature")

(10661, 4)


Unnamed: 0,date,location,avg_temp,max_temp
0,2020-01-01,Tokyo,5.5,10.2
1,2020-01-02,Tokyo,6.2,11.3
2,2020-01-03,Tokyo,6.1,12.0
3,2020-01-04,Tokyo,7.2,12.2
4,2020-01-05,Tokyo,5.4,10.2
...,...,...,...,...
10656,2024-02-27,Sendai,5.1,7.8
10657,2024-02-28,Sendai,5.1,8.3
10658,2024-02-29,Sendai,5.8,10.0
10659,2024-03-01,Sendai,5.4,10.5


date,location,avg_temp,max_temp
20/05/04,Sapporo,16.7,22.5
22/04/10,Fukuoka,18.7,24.6
21/11/12,Naha,20.5,22.8
24/01/03,Sendai,6.7,9.8
20/08/18,Fukuoka,30.0,34.9
23/01/31,Fukuoka,5.6,11.7
23/11/03,Nagoya,18.8,25.7
20/05/24,Nagoya,22.4,27.2
23/12/20,Sapporo,-4.1,-1.2
22/04/22,Osaka,18.5,23.1


In [35]:
# 気温を摂氏から華氏に変換するスクリプト

script = r"""
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で　 '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# 気温を 摂氏から華氏に変換
x["avg_temp_f"] = x["avg_temp"] * 9/5 + 32
x["max_temp_f"] = x["max_temp"] * 9/5 + 32

out = x[["avg_temp", "avg_temp_f", "max_temp", "max_temp_f"]]
# 結果は標準出力へTSV形式で与える
out.to_csv(sys.stdout, sep="\t", index=False, header=False)
"""
filename = "temperature1.py"
with open(filename, "w") as f:
  f.write(script)


# Teradata側にスクリプトを配置
from teradataml import install_file
try:
  install_file("temperature1", file_path=filename, file_on_client=True, replace=False)
except Exception as e:
  install_file("temperature1", file_path=filename, file_on_client=True, replace=True)


# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )
    SCRIPT_COMMAND('tdpython3 {database}/temperature1.py;')
    RETURNS ('avg_temp FLOAT, avg_temp_f  FLOAT, max_temp FLOAT, max_temp_f FLOAT')
  )
"""

x = DataFrame(query=q)
x

File temperature1.py replaced in Vantage


avg_temp,avg_temp_f,max_temp,max_temp_f
30.0,86.0,34.9,94.82
18.8,65.84,25.7,78.25999999999999
22.4,72.32,27.2,80.96
11.5,52.7,13.5,56.3
16.7,62.06,22.5,72.5
18.5,65.3,23.1,73.58
18.7,65.66,24.6,76.28
20.5,68.9,22.8,73.04
-4.7,23.54,-0.3,31.46
5.6,42.08,11.7,53.06


- 上の例は1行に対して1行の結果を返すスクリプトなので、分散処理が容易なケースです
- スクリプトはAMPごとに実行され、結果を標準出力する仕組みなので、AMP内で自由にデータを集約することができます


In [37]:
# AMPごとにデータを集計する

script = r"""
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で　 '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

variables = ["avg_temp", "max_temp"]
out = x[variables].mean()

# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
"""
filename = "temperature2.py"
with open(filename, "w") as f:
  f.write(script)


# Teradata側にスクリプトを配置
from teradataml import install_file
try:
  install_file("temperature2", filename, file_on_client=True, replace=False)
except Exception as e:
  install_file("temperature2", filename, file_on_client=True, replace=True)


# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )
    SCRIPT_COMMAND('tdpython3 {database}/temperature2.py;')
    RETURNS ('avg_temp FLOAT, max_temp FLOAT')
  )
"""

x = DataFrame(query=q)
x

File temperature2.py replaced in Vantage


avg_temp,max_temp
16.34968128983877,20.49673790776153
16.39501322251606,20.52372497166604
16.604632768361583,20.77709981167608
16.55687221396731,20.712407132243687


- 上の例は4つの集計値を得ることができました
- これらは、各AMPに配置されたデータを集計したものですが、今回はデータをランダムにAMP間に分散しているため、あまり意味のある集計に放っていません
- 多くのケースにおいて、データを特定の条件によって分割し、その分割ごとに計算を行いたいことがあります

In [39]:
# AMPごとにデータを集計する
# ただし、location ごとデータをAMPに配置する

script = r"""
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で　 '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

location = x["location"].loc[0]  # このAMPに含まれるlocation
variables = ["avg_temp", "max_temp"]
out = x.mean()
out = [location] + out.to_list()

# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
"""
filename = "temperature3.py"
with open(filename, "w") as f:
  f.write(script)


# Teradata側にスクリプトを配置
from teradataml import install_file
try:
  install_file("temperature3", filename, file_on_client=True, replace=False)
except Exception as e:
  install_file("temperature3", filename, file_on_client=True, replace=True)

# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )  PARTITION BY location
    SCRIPT_COMMAND('tdpython3 {database}/temperature3.py;')
    
    RETURNS ('location VARCHAR(10), avg_temp FLOAT, max_temp FLOAT')
  )
"""

x = DataFrame(query=q)
x

File temperature3.py replaced in Vantage


location,avg_temp,max_temp
Fukuoka,17.791989494418907,21.85896257386737
Nagoya,16.704005252790544,21.621470781352592
Osaka,17.31726854891661,21.67577150361129
Sapporo,9.887590282337491,13.871569271175312
Naha,23.538804990151014,26.225082074852264
Tokyo,16.460866710439923,21.14917925147735
Sendai,13.636703873933024,17.99198949441891


- `PARTITION BY location` とすることで、観測点ごとに平均値を計算することができました
- 7つの観測点に対してAMPは4つで不足していますが、問題なく分割して結果が得られます

In [40]:
# 上の結果はクエリでより簡単に計算できるので、結果の同一性を確認
df = DataFrame("temperature")
df.groupby("location").mean()

location,mean_date,mean_avg_temp,mean_max_temp
Sendai,22/01/31,13.63670387393303,17.991989494418913
Osaka,22/01/31,17.317268548916616,21.675771503611298
Sapporo,22/01/31,9.887590282337488,13.871569271175307
Tokyo,22/01/31,16.460866710439923,21.149179251477342
Nagoya,22/01/31,16.704005252790548,21.621470781352592
Naha,22/01/31,23.53880499015101,26.22508207485227
Fukuoka,22/01/31,17.791989494418914,21.85896257386737
