# 파이프라인 Flow

### [ Bigquery -> Google Tasks Queue -> MongoDB ]

1. BigQuery에서 "extract_token_addr_bigquery.sql" 을 통해 추출한
   "tokenAddr_value" table로부터 token_address, value 값을 가져옴
2. google tasks queue 실행 (index range 100000, 108856)
   - token_address, value 값을 opensea.io asset 검색 api에 보내어
   해당 자산 주인의 eoa address 를 가져옴
   - [eoa_address, token_address, value] 형식으로 mongoDB에 저장함

## 1. BigQuery에서 token_address, value 가져오기

In [1]:
%load_ext google.cloud.bigquery

In [2]:
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
import google.cloud.bigquery.magics
google.cloud.bigquery.magics.context.use_bqstorage_api = True

In [3]:
# google auth 승인
credentials, project_id = google.auth.default()

# BigQuery Client 생성
bqclient = bigquery.Client(
    credentials=credentials,
    project=project_id
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

In [4]:
# Bigquery에서 token_address, value 값을 받아올 query
query_string = """
SELECT * 
FROM `bigquery-279701.take_home.tokenAddr_value`
"""
# Bigquery에서 해당 query의 값을 가져오고, DataFrame 형태로 만듦
tokenAddr_value_df = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(tokenAddr_value_df.head())

                                token_address     value
0  0x273f7f8e6489682df756151f5525576e322d51a3  20010647
1  0x273f7f8e6489682df756151f5525576e322d51a3  30260186
2  0x273f7f8e6489682df756151f5525576e322d51a3  20370001
3  0x273f7f8e6489682df756151f5525576e322d51a3  20150059
4  0x273f7f8e6489682df756151f5525576e322d51a3  30090008


In [5]:
tokenAddr_value_df.shape

(108856, 2)

In [6]:
tokenAddr_value_df.head()

Unnamed: 0,token_address,value
0,0x273f7f8e6489682df756151f5525576e322d51a3,20010647
1,0x273f7f8e6489682df756151f5525576e322d51a3,30260186
2,0x273f7f8e6489682df756151f5525576e322d51a3,20370001
3,0x273f7f8e6489682df756151f5525576e322d51a3,20150059
4,0x273f7f8e6489682df756151f5525576e322d51a3,30090008


## 2. Google Tasks Queue 실행
* Opensea api로 자산 주인의 eoa_address 가져와 mongoDB에 저장하기
* Opensea api [link](https://docs.opensea.io/reference#getting-assets)

In [11]:
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

project = "bigquery-279701"
queue = "queue-mongoDB"
location = "us-central1"

# Client Credential 생성
task_client = tasks_v2.CloudTasksClient()
parent = task_client.queue_path(project, location, queue)

# task 기본 파라미터 입력
task = {
            'http_request': {  # Specify the type of request.
                'http_method': 'POST',
                'url': "https://us-central1-bigquery-279701.cloudfunctions.net/mongo"  # The full url path that the task will be sent to.
            }
}

In [12]:
# task 생성 메소드
# payload를 통하여 value, token_address를 task queue의 파라미터로 넘겨줌

def create_task(project, queue, location, payload=None, in_seconds=None, task_name=None):
    
    # payload = {'value' : '111111', 'token_address' : '0x34324'}
    # value : token id
    # token_address : token의 주소
    if payload is not None:
        # Add the payload to the request
        task['http_request']['headers'] = {
            'Content-type': 'application/json'}
        task['http_request']['body'] = json.dumps(payload).encode()
    
    # 기본 템플릿 메소드
    if in_seconds is not None:
        # Convert "seconds from now" into an rfc3339 datetime string.
        d = datetime.datetime.utcnow() + datetime.timedelta(seocnds=in_seconds)

        # Create Timestamp protobuf.
        timestamp = timestamp_pb2.Timestamp()
        timestamp.FromDatetime(d)

        # Add the timestamp to the tasks.
        task['schedule_time'] = timestamp
        
    # 기본 템플릿 메소드
    if task_name is not None:
        # Add the name to tasks.
        task['name'] = task_name
        
    # task queue에 위에서 세팅한 task를 넘겨줌
    response = task_client.create_task(parent, task)    
    return response

In [13]:
import json

# 위의 create_task를 실행시키기 위한 메소드
def dispatch_task(payload):
    resp = create_task(project=project, queue=queue, location=location, payload=payload)

In [14]:
import json
from tqdm.notebook import tqdm

# BigQuery에서 받아온 token address, value 중에서
# index 100000, 108856 사이의 값만 "queue-mongoDB" 큐에 task로 실행함
for i in tqdm(range(100000, 108856):
    data = {}
    data['value'] = tokenAddr_value_df.loc[i]['value']
    data['token_address'] = tokenAddr_value_df.loc[i]['token_address']
    dispatch_task(data)

HBox(children=(FloatProgress(value=0.0, max=20000.0), HTML(value='')))


