# Import Library

In [2]:
import os
import pandas as pd
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

# Prepocessing

In [13]:
df = pd.read_csv("news_articles.csv")
df = df.query("text_without_stopwords == text_without_stopwords")
df = df.groupby('label').apply(lambda s: s.sample(500)).reset_index(drop=True)
df = df[["text_without_stopwords", "label"]]
df.to_csv("data/news_articles.csv",index=False)
df

Unnamed: 0,text_without_stopwords,label
2046,,Real
2047,,Real
2048,,Real
2049,,Real
2050,,Real
2051,,Real
2052,,Real
2053,,Real
2054,,Real
2055,,Real


# Set Variabel

In [11]:
PIPELINE_NAME = "gesang_wibawono-pipeline"

DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/fake_news_transform.py"
TRAINER_MODULE_FILE = "modules/fake_news_trainer.py"

OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

# Pipeline

In [12]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    """
    Main
    """

    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        "----direct_num_workers=0"
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

In [13]:
logging.set_verbosity(logging.INFO)

from modules.components import init_components

components = init_components(
    DATA_ROOT,
    transform_module=TRANSFORM_MODULE_FILE,
    trainer_module=TRAINER_MODULE_FILE,
    training_steps=20,
    eval_steps=10,
    serving_model_dir=serving_model_dir,
)

pipeline = init_local_pipeline(components, pipeline_root)
BeamDagRunner().run(pipeline=pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: output\gesang_wibawono-pipeline
INFO:absl:Generating ephemeral wheel package for 'D:\\learning\\fake-news\\modules\\fake_news_transform.py' (including modules: ['components', 'fake_news_trainer', 'fake_news_transform']).
INFO:absl:User module package has hash fingerprint version 21b7143bb664e0ef25e97060c4b3d952f2566cbaeebe6b9dcc0ba4cfb2ec4b37.
INFO:absl:Executing: ['C:\\Users\\BPS\\.conda\\envs\\a443-churn\\python.exe', 'C:\\Users\\BPS\\AppData\\Local\\Temp\\tmpuhbqlwaf\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\BPS\\AppData\\Local\\Temp\\tmpxowskk2c', '--dist-dir', 'C:\\Users\\BPS\\AppData\\Local\\Temp\\tmp45rd7r9t']
INFO:absl:Successfully built user code wheel distribution at 'output\\gesang_wibawono-pipeline\\_wheels\\tfx_user_code_Tr

INFO:absl:Running as an resolver node.
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[Latest_blessed_model_resolver] Resolved inputs: ({'model': [], 'model_blessing': []},)
INFO:absl:node Latest_blessed_model_resolver is finished.
INFO:absl:node CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "gesang_wibawono-pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "20231120-153057.594609"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "gesang_wibawono-pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spe

Error: field larger than field limit (131072) [while running 'InputToRecord/ParseCSVLine']

In [2]:
!pip freeze >> requirements.txt