# Data Processing Pipeline

In [1]:
from kedro.pipeline import Pipeline, node
from spaceflights.pipelines.data_processing.nodes import CompanyPreprocessor, ShuttlesPreprocessor, create_master_table

In [2]:

def create_pipeline(**kwargs):
    preprocess_companies = CompanyPreprocessor()
    preprocess_shuttles = ShuttlesPreprocessor()
    
    return Pipeline(
    [
        node(
            func=preprocess_companies,
            inputs='companies',
            outputs='preprocessed_companies',
            name='preprocess_companies_node'
        ),
        node(
            func=preprocess_shuttles,
            inputs='shuttles',
            outputs='preprocessed_shuttles',
            name='preprocess_shuttles_node'
        ),
        node(
            func=create_master_table,
            inputs=['preprocessed_shuttles', 'preprocessed_companies', 'reviews'],
            outputs='master_table',
            name='create_master_table_node'
        )
    ])

In [3]:
pipeline = create_pipeline()

In [4]:
pipeline

Pipeline([
Node(<spaceflights.pipelines.data_processing.nodes.ShuttlesPreprocessor object at 0x11f57fb50>, 'shuttles', 'preprocessed_shuttles', 'preprocess_shuttles_node'),
Node(<spaceflights.pipelines.data_processing.nodes.CompanyPreprocessor object at 0x11f57f520>, 'companies', 'preprocessed_companies', 'preprocess_companies_node'),
Node(create_master_table, ['preprocessed_shuttles', 'preprocessed_companies', 'reviews'], 'master_table', 'create_master_table_node')
])

In [5]:
from kedro.runner import SequentialRunner

In [6]:
runner = SequentialRunner()
runner.run(pipeline, catalog)

2021-11-11 19:08:21,912 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelDataSet)...
2021-11-11 19:08:30,727 - kedro.pipeline.node - INFO - Running node: preprocess_shuttles_node: <spaceflights.pipelines.data_processing.nodes.ShuttlesPreprocessor object at 0x11f57fb50>([shuttles]) -> [preprocessed_shuttles]
2021-11-11 19:08:30,812 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_shuttles` (CSVDataSet)...
2021-11-11 19:08:31,079 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2021-11-11 19:08:31,079 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVDataSet)...
2021-11-11 19:08:31,108 - kedro.pipeline.node - INFO - Running node: preprocess_companies_node: <spaceflights.pipelines.data_processing.nodes.CompanyPreprocessor object at 0x11f57f520>([companies]) -> [preprocessed_companies]
2021-11-11 19:08:31,135 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_companies` (CSVDataSet)...
2021-11-11 19:08:31,222 -

{}