## 데이터 로더(Data Loaders)

이 노트북은 `pyTigerGraph` 에서 **graph loader** 의 사용을 보여줍니다. 데이터 로더의 역할은 `TigerGraph` 데이터베이스에서 데이터를 가져오는 것입니다. 현재 다음 데이터 로더가 제공됩니다.

* EdgeLoader, 간선의 배치(batch)를 반환합니다.
* VertexLoader, 정점의 배치(batch)를 반환합니다.
* GraphLoader, 무작위로 샘플링된(연결이 끊긴) 하위 그래프(subgraphs)를 pandas의 `dataframe`, `PyG` 또는 `DGL` 형식으로 반환 합니다.
* NeighborLoader, 이웃 샘플링을 사용하여 하위 그래프(subgraphs)를 `dataframe`, `PyG` 또는 `DGL`  형식으로 반환 합니다.
* EdgeNeighborLoader, 간선들에서 이웃 샘플링을 사용하여 하위 그래프(subgraphs)를 `dataframe`, `PyG` 또는 `DGL` 형식으로 반환 합니다.

위의 모든 데이터 로더는 모든 배치를 HTTP 응답(기본값)으로 가져오거나 Kafka를 통해 모든 배치를 스트리밍할 수 있습니다. 전자의 메커니즘은 작은 그래프로 테스트하기에 좋고 빠르지만 데이터 크기 제한이 2GB입니다. 큰 그래프의 경우 크기 제한 및 네트워크 연결 문제로 인해 HTTP 채널이 실패할 수 있습니다. Kafka를 통한 스트리밍은 데이터 견고성과 확장성을 위해 제공됩니다. 또한 Kafka는 다중 소비자(multi-consumer) 사용 사례에 탁월하며 동일한 데이터의 다중 소비자가 있는 경우 모델 검색 또는 하이퍼파라미터 튜닝에 효율적입니다.

데이터 로더는 동종 및 이종 그래프를 모두 지원합니다. 기본적으로 모든 정점 및 간선 유형에서 로드하고 그래프를 동종 그래프로 처리합니다. 그러나 사용자가 로드할 정점 및 간선 유형과 각 유형에서 로드할 속성을 지정할 수도 있습니다. 이 방법으로 사용자는 이기종 그래프 출력을 얻을 수 있습니다.

**NOTE**: 현재 ML Workbench에서 제공하는 모든 기능을 사용하려면 데이터베이스를 한 번만 활성화(activage)해야 합니다. ML Workbench on Cloud를 사용하는 경우 activator가 포함되며 아래 셀을 실행(먼저 주석 해제)하여 활성화(activage)할 수 있습니다. 다른 버전의 Workbench의 경우 https://act.tigergraphlabs.com 에서 활성기(activator)를 다운로드할 수 있습니다. 자세한 지침은 해당 웹사이트에도 포함되어 있습니다.


In [None]:
# 아래 주석을 제거하고 필요한 정보를 입력하십시오. 자세한 지침은 https://act.tigergraphlabs.com을 참조하십시오.
# !mlwb activate [database address] -u [username] -p [password] -s [secret]

### 데이터베이스 연결 

이 `TigerGraphConnection` 클래스는 TigerGraph 데이터베이스에 대한 연결을 나타냅니다. 후드 아래에는 데이터베이스와 통신하는 데 필요한 정보가 저장됩니다. 꽤 많은 데이터베이스 작업을 수행할 수 있습니다. 자세한 내용은 해당 [문서](https://docs.tigergraph.com/pytigergraph/current/intro/) 를 참조하십시오.

**Note**: 2022년 7월 5일 이후에 생성된 TG 클라우드 DB는 사용자 username/password 대신 Secret가 필요합니다. 그렇지 않으면 비워 둘 수 있습니다.

In [1]:
from pyTigerGraph import TigerGraphConnection

In [2]:
conn = TigerGraphConnection(
    host="http://127.0.0.1", # 데이터베이스 서버의 주소로 변경
    graphname="Cora",
    username="tigergraph",
    password="tigergraph",
    gsqlSecret="" # 2022년 7월 5일 이후에 생성된 TG 클라우드 DB에는 user/pass 대신 secret이 필요합니다.  
)

<span style="color:red">아래 셀의 주석을 제거하고 토큰 인증이 활성화된 경우 토큰을 가져오고 설정하기 위해 실행합니다.</span>. 
* 이것은 tgcloud의 모든 데이터베이스에 필요합니다.
* `<secret>`은 사용자 암호입니다. 자세한 내용은 https://docs.tigergraph.com/tigergraph-server/current/user-access/managing-credentials#_secrets 를 참조하세요.
* secret을 모르는 경우  `secret=conn.createSecret()`를 사용하여 secret을 만들 수 있습니다.

In [None]:
#conn.getToken(<secret>)

In [3]:
# 모든 정점 유형에 대한 정점 수
conn.getVertexCount('*')

{'Paper': 2708}

In [4]:
# 모든 간선 유형에 대한 간선 수
conn.getEdgeCount()

{'Cite': 10556}

### 그래프 로더(Graph Loader)

`GraphLoader`는 데이터베이스에서 (연결이 끊긴) 하위 그래프를 가져옵니다. 연결된 하위 그래프를 생성하는 NeighborLoader와 달리 이 로더는 해당 에지에 연결된 에지 및 정점의 배치(임의)를 가져옵니다.

**Note**: TigerGraph의 그래프에서 로더를 처음 초기화하는 경우 해당 쿼리를 데이터베이스에 설치하고 최적화하므로 초기화에 1분이 걸릴 수 있습니다. 하지만 쿼리 설치는 한 번만 하면 되므로 동일한 TG 그래프에서 다시 로더를 초기화할 때 시간이 걸리지 않습니다.
        
데이터 로더를 사용하는 방법에는 두 가지가 있습니다. 예시는 [여기](https://github.com/TigerGraph-DevLabs/mlworkbench-docs/blob/main/tutorials/basics/2_dataloaders.ipynb)를 참조하세요.
* 첫째, iterable로 사용할 수 있습니다. 즉, 루프를 통해 모든 데이터 배치를 가져올 수 있습니다. 모든 데이터를 한 번에 로드하는 경우(`num_batches=1`), 반복자에는 하나의 배치(모든 데이터 중)만 있습니다.
* 둘째, 클래스의 `data` 속성에 직접 접근할 수 있습니다. 로드할 데이터 배치가 하나만 있는 경우 반복자 대신 배치를 직접 제공하므로 이 경우 더 합리적일 수 있습니다. 로드할 데이터 배치가 여러 개인 경우 로더 자체를 반환합니다.

인수(Args):
* v_in_feats (list or dict, optional):
        입력 기능(feature)으로 사용할 정점 속성입니다.
        목록인 경우 모든 정점 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 정점 유형에 존재하지 않으면 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택할 정점 유형이고 값은 각 정점 유형에 대해 
        선택해야 하는 속성의 목록입니다.
        숫자 및 부울 속성만 허용됩니다. 
        속성 유형은 데이터베이스 스키마에서 자동으로 결정됩니다. 기본값은 없음입니다.
* v_out_labels(목록 또는 사전, 선택 사항):
        예측을 위한 레이블로 사용할 정점 속성입니다.
        목록인 경우 모든 정점 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 정점 유형에 존재하지 않으면 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택할 정점 유형이고 값은 각 정점 유형에 대해 
        선택해야 하는 속성의 목록입니다.
        숫자 및 부울 속성만 허용됩니다. 기본값은 없음입니다.
* v_extra_feats (list or dict, optional):
        학습/테스트 데이터의 지표와 같이 가져올 기타 속성입니다.
        목록인 경우 모든 정점 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 정점 유형에 존재하지 않으면 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택할 정점 유형이고 값은 각 정점 유형에 대해 
        선택해야 하는 속성의 목록입니다.
        모든 유형의 속성이 허용됩니다. 기본값은 없음입니다.
* e_in_feats (list or dict, optional):
        입력 기능(feature)으로 사용할 에지 속성입니다.
        목록인 경우 모든 에지 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 에지 유형에 존재하지 않는 경우 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택해야 하는 에지 유형이고 값은 에지 유형별로 
        선택해야 하는 속성의 목록입니다.
        숫자 및 부울 속성만 허용됩니다. 
        속성 유형은 데이터베이스 스키마에서 자동으로 결정됩니다. 기본값은 없음입니다.
* e_out_labels (list or dict, optional):
        예측을 위한 레이블로 사용할 에지 속성입니다.
        목록인 경우 모든 에지 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 에지 유형에 존재하지 않는 경우 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택해야 하는 에지 유형이고 값은 에지 유형별로 
        선택해야 하는 속성의 목록입니다.
        숫자 및 부울 속성만 허용됩니다. 기본값은 없음입니다.
* e_extra_feats (list or dict, optional):
        훈련/테스트 데이터의 지표와 같이 얻을 다른 에지 속성.
        목록인 경우 모든 에지 유형의 목록에 있는 속성이 선택됩니다. 
        특정 속성이 모든 에지 유형에 존재하지 않는 경우 오류가 발생합니다. 
        dict인 경우 dict의 키는 선택해야 하는 에지 유형이고 값은 에지 유형별로 
        선택해야 하는 속성의 목록입니다.
        모든 유형의 속성이 허용됩니다. 기본값은 없음입니다.
* batch_size (int, optional):
        각 배치의 정점 수입니다.
        기본값은 없음입니다.
* num_batches (int, optional):
        에지를 분할할 배치 수입니다.
        기본값은 1입니다.
* shuffle (bool, optional):
        로드하기 전에 데이터를 섞을지 여부입니다.
        기본값은 False입니다.
* filter_by (str, optional):
        포함할 수 있는 에지를 나타내는 데 사용되는 부울 속성입니다.
        기본값은 없음입니다.
* output_format (str, optional):
        로더의 출력 데이터 형식입니다.
        "PyG", "DGL" 및 "dataframe"만 지원됩니다. 기본값은 "데이터 프레임"입니다.
* add_self_loop (bool, optional):
        그래프에 자체 루프를 추가할지 여부입니다. 기본값은 False입니다.
* loader_id (str, optional):
        임의의 문자열이 될 수 있는 로더의 식별자입니다. Kafka 주제 이름으로도 사용됩니다. 
        `None`이면 임의의 문자열이 생성됩니다. 기본값은 없음입니다.
* buffer_size (int, optional):
        메모리에 프리페치하고 저장할 데이터 배치 수입니다. 기본값은 4입니다.
* timeout (int, optional):
        GSQL 쿼리에 대한 시간 초과 값(ms)입니다. 기본값은 300000입니다.

#### 전체 그래프를 PyG 그래프로 로컬에 직접 로드 

In [6]:
%%time
graph_loader = conn.gds.graphLoader(
    num_batches=1,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG")

Installing and optimizing queries. It might take a minute if this is the first time you use this loader.
Query installation finished.
CPU times: user 220 ms, sys: 58.1 ms, total: 278 ms
Wall time: 49.9 s


In [7]:
%%time
# `data` 속성을 통해 데이터의 유일한 배치를 가져옵니다.
data = graph_loader.data

CPU times: user 1.71 s, sys: 176 ms, total: 1.89 s
Wall time: 1.7 s


In [8]:
data

Data(edge_index=[2, 10556], edge_feat=[10556], is_train=[10556], is_val=[10556], x=[2708, 1433], y=[2708], train_mask=[2708], val_mask=[2708], test_mask=[2708])

#### http를 통해 하위 그래프(subgraph) 가져오기

In [9]:
%%time
graph_loader = conn.gds.graphLoader(
    num_batches=10,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by=None
)

CPU times: user 3.08 ms, sys: 1.99 ms, total: 5.07 ms
Wall time: 7.76 ms


In [10]:
%%time
for i, batch in enumerate(graph_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 938], edge_feat=[938], is_train=[938], is_val=[938], x=[954, 1433], y=[954], train_mask=[954], val_mask=[954], test_mask=[954])
----Batch 1----
Data(edge_index=[2, 1089], edge_feat=[1089], is_train=[1089], is_val=[1089], x=[1260, 1433], y=[1260], train_mask=[1260], val_mask=[1260], test_mask=[1260])
----Batch 2----
Data(edge_index=[2, 1086], edge_feat=[1086], is_train=[1086], is_val=[1086], x=[1237, 1433], y=[1237], train_mask=[1237], val_mask=[1237], test_mask=[1237])
----Batch 3----
Data(edge_index=[2, 1062], edge_feat=[1062], is_train=[1062], is_val=[1062], x=[1256, 1433], y=[1256], train_mask=[1256], val_mask=[1256], test_mask=[1256])
----Batch 4----
Data(edge_index=[2, 1059], edge_feat=[1059], is_train=[1059], is_val=[1059], x=[1149, 1433], y=[1149], train_mask=[1149], val_mask=[1149], test_mask=[1149])
----Batch 5----
Data(edge_index=[2, 1020], edge_feat=[1020], is_train=[1020], is_val=[1020], x=[997, 1433], y=[997], train_mask=[997], val_mask=

#### 이종(heterogeneous) 그래프의 경우

`Cora`는 동종(homogeneous) 그래프 이므로 다른 그래프에 연결하여 이종(demostrate) 그래프의 사용 사례를 시연합니다.

In [11]:
conn = TigerGraphConnection(
    host="http://127.0.0.1", # 데이터베이스 서버의 주소로 변경
    graphname="hetero",
    username="tigergraph",
    password="tigergraph",
    gsqlSecret="" # 2022년 7월 5일 이후에 생성된 TG 클라우드 DB에는 user/pass 대신 secret이 필요합니다.  
)

# 토큰 인증이 활성화된 경우 토큰을 가져오고 설정하려면 아래 주석을 제거하십시오.
#conn.getToken(<secret>) # <secret>은 사용자 암호입니다. 자세한 내용은 https://docs.tigergraph.com/tigergraph-server/current/user-access/managing-credentials#_secrets를 참조하세요.

In [12]:
print(conn.gsql("ls"))

---- Graph hetero
Vertex Types:
- VERTEX v0(PRIMARY_ID id INT, x LIST<DOUBLE>, y INT, train_mask BOOL, val_mask BOOL, test_mask BOOL) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
- VERTEX v1(PRIMARY_ID id INT, x LIST<DOUBLE>, train_mask BOOL, val_mask BOOL, test_mask BOOL) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
- VERTEX v2(PRIMARY_ID id INT, x LIST<DOUBLE>, train_mask BOOL, val_mask BOOL, test_mask BOOL) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
Edge Types:
- DIRECTED EDGE v0v0(FROM v0, TO v0, is_train BOOL, is_val BOOL)
- DIRECTED EDGE v1v1(FROM v1, TO v1, is_train BOOL, is_val BOOL)
- DIRECTED EDGE v1v2(FROM v1, TO v2, is_train BOOL, is_val BOOL)
- DIRECTED EDGE v2v0(FROM v2, TO v0, is_train BOOL, is_val BOOL)
- DIRECTED EDGE v2v1(FROM v2, TO v1, is_train BOOL, is_val BOOL)
- DIRECTED EDGE v2v2(FROM v2, TO v2, is_train BOOL, is_val BOOL)

Graphs:
- Graph hetero(v0:v, v1:v, v2:v, v0v0:e, v1v1:e, v1v2:e, v2v0:e,

In [13]:
loader = conn.gds.graphLoader(
    v_in_feats={"v0": ["x"],
                "v1": ["x"]},
    v_out_labels={"v0": ["y"]},
    v_extra_feats={"v0": ["train_mask", "val_mask", "test_mask"]},
    e_extra_feats={"v0v0": ["is_train", "is_val"],
                    "v1v1": ["is_train", "is_val"]},
    batch_size=1024,
    shuffle=False,
    filter_by=None,
    output_format="PyG",
    add_self_loop=False,
    loader_id=None,
    buffer_size=4
)

Installing and optimizing queries. It might take a minute if this is the first time you use this loader.
Query installation finished.


In [14]:
%%time
for i, batch in enumerate(loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
HeteroData(
  [1mv0[0m={
    x=[76, 77],
    y=[76],
    train_mask=[76],
    val_mask=[76],
    test_mask=[76]
  },
  [1mv1[0m={ x=[110, 57] },
  [1m(v0, v0v0, v0)[0m={
    edge_index=[2, 413],
    is_train=[413],
    is_val=[413]
  },
  [1m(v1, v1v1, v1)[0m={
    edge_index=[2, 558],
    is_train=[558],
    is_val=[558]
  }
)
----Batch 1----
HeteroData(
  [1mv0[0m={
    x=[76, 77],
    y=[76],
    train_mask=[76],
    val_mask=[76],
    test_mask=[76]
  },
  [1mv1[0m={ x=[110, 57] },
  [1m(v0, v0v0, v0)[0m={
    edge_index=[2, 297],
    is_train=[297],
    is_val=[297]
  },
  [1m(v1, v1v1, v1)[0m={
    edge_index=[2, 486],
    is_train=[486],
    is_val=[486]
  }
)
CPU times: user 70.4 ms, sys: 11 ms, total: 81.4 ms
Wall time: 98.4 ms


### Kafka를 통한 스트리밍 

**Note**: Kafka 스트리밍 기능은 Enterprise Edition에서만 사용할 수 있습니다. 사용하려면 Enterprise Edition을 활성화해야 합니다.

In [None]:
conn = TigerGraphConnection(
    host="http://127.0.0.1", # 데이터베이스 서버의 주소로 변경
    graphname="hetero",
    username="tigergraph",
    password="tigergraph",
    gsqlSecret="" # 2022년 7월 5일 이후에 생성된 TG 클라우드 DB에는 user/pass 대신 secret이 필요합니다.  
)

# 토큰 인증이 활성화된 경우 토큰을 가져오고 설정하려면 아래 주석을 제거하십시오.
#conn.getToken(<secret>) # <secret>은 사용자 암호입니다. 자세한 내용은 https://docs.tigergraph.com/tigergraph-server/current/user-access/managing-credentials#_secrets를 참조하세요.

#### 카프카(Kafka) 설정 
여기에서 Kafka를 설정합니다. 구성되면 설정이 새로 생성된 모든 데이터 로더와 공유되며 각 로더에 대해 Kafka를 설정할 필요가 없습니다. 자세한 설정 은 공식 [문서](https://docs.tigergraph.com/pytigergraph/current/gds/gds#_configurekafka) 를 참조하십시오.

In [None]:
conn.gds.configureKafka(
    kafka_address="127.0.0.1:9092",
    kafka_security_protocol="SASL_PLAINTEXT",
    kafka_sasl_mechanism="PLAIN",
    kafka_sasl_plain_username="your username",
    kafka_sasl_plain_password="your password"
)

#### 하위 그래프(subgraphs) 가져오기

In [11]:
%%time
graph_loader = conn.gds.graphLoader(
    num_batches=10,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by=None,
)



Installing and optimizing queries. It might take a minute if this is the first time you use this loader.
Query installation finished.
CPU times: user 55.7 ms, sys: 15.1 ms, total: 70.8 ms
Wall time: 26 s


In [12]:
%%time
for i, batch in enumerate(graph_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 1018], edge_feat=[1018], is_train=[1018], is_val=[1018], x=[1006, 1433], y=[1006], train_mask=[1006], val_mask=[1006], test_mask=[1006])
----Batch 1----
Data(edge_index=[2, 1036], edge_feat=[1036], is_train=[1036], is_val=[1036], x=[1226, 1433], y=[1226], train_mask=[1226], val_mask=[1226], test_mask=[1226])
----Batch 2----
Data(edge_index=[2, 1083], edge_feat=[1083], is_train=[1083], is_val=[1083], x=[1234, 1433], y=[1234], train_mask=[1234], val_mask=[1234], test_mask=[1234])
----Batch 3----
Data(edge_index=[2, 1074], edge_feat=[1074], is_train=[1074], is_val=[1074], x=[1263, 1433], y=[1263], train_mask=[1263], val_mask=[1263], test_mask=[1263])
----Batch 4----
Data(edge_index=[2, 1023], edge_feat=[1023], is_train=[1023], is_val=[1023], x=[1107, 1433], y=[1107], train_mask=[1107], val_mask=[1107], test_mask=[1107])
----Batch 5----
Data(edge_index=[2, 1072], edge_feat=[1072], is_train=[1072], is_val=[1072], x=[1040, 1433], y=[1040], train_mask=[1040