# The BigFlow tutorial part 3

### What you will lean in part 3
* Creating custom components.
* Creating table sensors and checks.
* Loading pandas DataFrame to BQ table.
* Setting custom schedule interval for workflow.
* Setting custom number of retries for workflow step.

In [None]:
import bigflow as bf

dataset = bf.Dataset(
    project_id='',  # put you own project ID
    dataset_name='transaction_aggregates',
    internal_tables=['user_transaction_metrics', 'global_transaction_metrics'])

### Beyond queries
You can create, run and schedule components that are Python code. You can do pretty much anything in them, same things you can do in a jupyter notebook. 

As you can see on the example below:
* Custom component takes a dataset as a argument.
* Inside custom component you can use all standard dataset methods (`create_table`, `write_truncate`, ...).
* You don't need to run components inside a custom component - BigFlow will run them with a proper date automatically.
* You can use method `load_table_from_dataframe` to save specified dataframe to a table. It works with both partitioned and non-partitioned tables.

In [None]:
import pandas as pd
from datetime import datetime
from datetime import timedelta

@bf.component(ds=dataset)
def calculate_user_transaction_value_median_and_save_to_table(ds):
    ds.create_table('''
    CREATE TABLE IF NOT EXISTS global_transaction_metrics (
        metric_name STRING,
        metric_value STRING,

        partition_timestamp TIMESTAMP)
    PARTITION BY DATE(partition_timestamp)
    ''', operation_name='create_table')
    
    daily_user_transaction_cost_dataframe = ds.collect('''
    SELECT CAST(metric_value as FLOAT64) as metric_value,
        partition_timestamp
    FROM `{user_transaction_metrics}`
    WHERE DATE(partition_timestamp) = DATE('{dt}')
    AND metric_name = 'USER_TRANSACTION_VALUE'
    ''', operation_name='fetch_metrics')
    
    metric_median = daily_user_transaction_cost_dataframe.loc[:,'metric_value'].median()
    metric_median_dataframe = pd.DataFrame([{
        'metric_name': 'USER_TRANSACTION_VALUE_MEDIAN',
        'metric_value': str(metric_median),
        'partition_timestamp': pd.Timestamp(ds.dt, tz='UTC')
    }])
    
    ds.load_table_from_dataframe('global_transaction_metrics', metric_median_dataframe)

### Custom component methods 
Custom component acts like the components that you created in the previous tutorials.

In [None]:
calculate_user_transaction_value_median_and_save_to_table.run('2019-01-01')

You can peek and run a single operation inside your custom component.

In [None]:
calculate_user_transaction_value_median_and_save_to_table.peek('2019-01-01', operation_name='fetch_metrics', limit=5)

### How it works
Creating an inline component is just shortcut for creating a custom component. When you do that:

In [None]:
create_global_daily_transaction_metrics_table = dataset.create_table('''
CREATE TABLE IF NOT EXISTS global_daily_transaction_metrics (
    metric_name STRING,
    metric_value STRING,

    partition_timestamp TIMESTAMP)
PARTITION BY DATE(partition_timestamp)
''')

You implicitly do that:

In [None]:
@bf.component(ds=dataset)
def create_global_daily_transaction_metrics_table(ds):
    ds.create_table('''
    CREATE TABLE IF NOT EXISTS global_daily_transaction_metrics (
        metric_name STRING,
        metric_value STRING,

        partition_timestamp TIMESTAMP)
    PARTITION BY DATE(partition_timestamp)
    ''')

### Making you own table check/sensor
What if you want to wait for some table, before you start your processing? You need some component that checks if table you depend on is ready.

In [None]:
@bf.component(ds=dataset)
def user_transaction_metrics_is_ready(ds):
    result = ds.collect('''
    SELECT count(*) > 0 as table_ready
    FROM `{user_transaction_metrics}`
    WHERE DATE(partition_timestamp) = DATE('{dt}')
    ''')

    if not result.iloc[0]['table_ready']:
        raise ValueError('user_transaction_metrics is not ready')

In [None]:
# Should throw ValueError
user_transaction_metrics_is_ready.run('2022-01-01')

### Making use of the check
Lets create workflow that uses the `user_transaction_metrics_is_ready` component and the `calculate_user_transaction_value_median_and_save_to_table` component.

The `to_job` method takes `retry_count` and `retry_pause_sec` arguments. Combining those arguments with `user_transaction_metrics_is_ready` lets you implement table check or sensor.

In [None]:
workflow = bf.Workflow(
    definition=[
        user_transaction_metrics_is_ready.to_job(
            retry_count=100,
            retry_pause_sec=40),
        calculate_user_transaction_value_median_and_save_to_table.to_job()],
    schedule_interval='* * * * *')

In [None]:
workflow.run('2019-01-01')

### Deployment
If you want to deploy this workflow, get rid of `run`s and `peek`s. Then, generate and deploy DAG in the same way you did in part 2.

### Want more?
If you want to discover more features, write to us **chibox-team@allegrogroup.com**. We will deliver next chapters.