In [1]:
from typing import Dict
import datetime
import json
from google.cloud import bigquery

from percentage_of_volume import PoV

In [2]:
bq = bigquery.Client()

In [3]:
class PoVJobExecutor:
    def __init__(self):
        pass

    def query_single_job(self) -> Dict:
        query = '''
            SELECT id,
                    algorithm,
                    param, 
                    job_execution_start, 
                    status,
                    load_dt
            FROM `trading_terminal_poc.job_inventory`
            WHERE algorithm = "PoV" 
                   AND `status` = "CREATED"
            LIMIT 1
        '''
        job = bq.query(query).to_dataframe().iloc[0].to_dict()
        return job
    
    def preprocess_job(self, job: Dict) -> Dict:
        param = json.loads(job['param'])
        param['start_execution_datetime'] = datetime.datetime.fromisoformat(param['start_execution_datetime'])
        job['load_dt'] = job['load_dt'].to_pydatetime().isoformat()
        job['param'] = param

    def update_before_process_job(self, job: Dict):
        query = '''
            UPDATE `trading_terminal_poc.job_inventory`
            SET job_execution_start = @job_execution_start,
                status = @status
            WHERE id = @id
                    AND load_dt = @load_dt
        '''


        job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter('id', 'INTEGER', job['id']),
                bigquery.ScalarQueryParameter('job_execution_start', 'DATETIME', job['job_execution_start']),
                bigquery.ScalarQueryParameter('status', 'STRING', job['status']),
                bigquery.ScalarQueryParameter('load_dt', 'DATETIME', job['load_dt']),
            ]
        )

        query_job = bq.query(query, job_config=job_config)

        result = query_job.result()

    def update_after_process_job(self, job: Dict):
        query = '''
            UPDATE `trading_terminal_poc.job_inventory`
            SET job_execution_start = @job_execution_start,
                job_execution_end = @job_execution_end,
                status = @status
            WHERE id = @id
                    AND load_dt = @load_dt
        '''


        job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter('id', 'INTEGER', job['id']),
                bigquery.ScalarQueryParameter('job_execution_start', 'DATETIME', job['job_execution_start']),
                bigquery.ScalarQueryParameter('job_execution_end', 'DATETIME', job['job_execution_end'] ),
                bigquery.ScalarQueryParameter('status', 'STRING', job['status']),
                bigquery.ScalarQueryParameter('load_dt', 'DATETIME', job['load_dt']),
            ]
        )

        query_job = bq.query(query, job_config=job_config)

        result = query_job.result()
    

    def insert_job_result(self, job: Dict, result: Dict):
        result['id'] = job['id']
        bq.insert_rows_json('trading_terminal_poc.job_pov_result', [result])

    def execute(self):
        while True:
            job = self.query_single_job()
            self.preprocess_job(job)
            

            job['job_execution_start'] = datetime.datetime.now().isoformat()
            job['status'] = 'PROCESSING'
            self.update_before_process_job(job)

            try:
                param = job['param']
                result = PoV(**param)

                job['status'] = 'SUCCESS'
            except Exception as e:
                job['status'] = 'FAILED'
                result['remark'] = str(e)
            finally:
                job['job_execution_end'] = datetime.datetime.now().isoformat()

                self.update_after_process_job(job)

                self.insert_job_result(job, result)
            
            

In [4]:
executor = PoVJobExecutor()
executor.execute()

100%|██████████| 7575/7575 [00:00<00:00, 11534.65it/s]
100%|██████████| 18176/18176 [00:04<00:00, 4220.24it/s]
100%|██████████| 17113/17113 [00:01<00:00, 13179.73it/s]
  5%|▌         | 3385/63956 [00:02<00:47, 1277.04it/s]
100%|██████████| 11766/11766 [00:00<00:00, 14248.78it/s]
100%|██████████| 17066/17066 [00:05<00:00, 2925.72it/s]
100%|██████████| 15678/15678 [00:01<00:00, 11620.43it/s]
 56%|█████▌    | 6942/12457 [00:01<00:01, 4064.90it/s]
100%|██████████| 44484/44484 [00:03<00:00, 13699.95it/s]
 22%|██▏       | 6079/27173 [00:02<00:07, 2924.74it/s]
100%|██████████| 25503/25503 [00:01<00:00, 13576.70it/s]
  4%|▍         | 2188/50762 [00:01<00:33, 1465.22it/s]
100%|██████████| 212768/212768 [00:15<00:00, 13630.86it/s]
  2%|▏         | 3112/128296 [00:02<01:47, 1163.70it/s]
100%|██████████| 27863/27863 [00:01<00:00, 13979.32it/s]
 28%|██▊       | 11852/43072 [00:03<00:08, 3772.83it/s]
100%|██████████| 191800/191800 [00:15<00:00, 12339.85it/s]
  6%|▌         | 8836/158955 [00:04<01:13

KeyboardInterrupt: 

In [5]:
from decimal import Decimal

In [7]:
type(Decimal(0.00) == 0)

bool