In [None]:
import argparse
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions
from apache_beam.transforms.window import FixedWindows

from apache_beam.io.kafka import ReadFromKafka

# Define opciones de pipeline
dataflow_options =["--project=gcp-test-chaki",
                    "--region=us-central1",
                    "--runner=DataflowRunner",
                    "--temp_location=gs://bucket-kafka-test/temp/",
                    "--job_name=test-job1",
                    "--worker_machine_type=n1-standard-1",
                    "--num_workers=1"]
options = PipelineOptions(dataflow_options)
gcloud = options.view_as(GoogleCloudOptions)

# Define opciones de configuración
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = 'setup.py'

# Define opciones de Kafka
kafka_options = {
    "bootstrap_servers": "127.0.0.1:9092",  # Dirección IP y puerto de Kafka
    "topics": ["test_topic"],  # Nombre del topic de Kafka
    "consumer_config": {"group.id": "my-group"}  # Configuración de consumidor de Kafka
}

# Define una función para procesar mensajes de Kafka
def process_message(message):
    # Decodifica el mensaje
    message_str = message.value.decode('utf-8')

    # Carga el mensaje como un objeto JSON
    message_json = json.loads(message_str)

    # Procesa el mensaje
    # En este ejemplo, simplemente imprimimos el mensaje en la consola
    print(message_json)

# Define el pipeline
with beam.Pipeline(options=options) as p:
    # Lee datos de Kafka utilizando el transform ReadFromKafka
    # Este transform lee datos de Kafka en formato PCollection
    kafka_data = (p
                  | "ReadFromKafka" >> ReadFromKafka(**kafka_options)
                  | "ProcessData" >> beam.Map(process_message))


In [16]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.io.gcp.bigquery import WriteToBigQuery


# DATAFLOW CONFIGURATION (pipeline options)
dataflow_options = [
    '--project=gcp-test-chaki',
    '--region=us-central1',
    '--runner=DataflowRunner',
    '--temp_location=gs://bucket-kafka-test/temp/',
    '--job_name=test-job1',
    '--worker_machine_type=n1-standard-1',
    '--num_workers=1'
]

options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)


# KAFKA CONFIGURATION
kafka_topic = 'my-kafka-topic'
kafka_bootstrap_servers = 'localhost:9092'
kafka_read_time = None  # Change to datetime.datetime(2022, 1, 1) to read from a specific time

kafka_consumer_config = {
    'bootstrap.servers': kafka_bootstrap_servers,
    'group.id': 'my-group-id'
}


# BIGQUERY CONFIGURATION
project_id = 'gcp-test-chaki'
dataset_id = 'test_kafka_dataflow'
table_id = 'my-bigquery-table'

table_spec = f'{project_id}:{dataset_id}.{table_id}'


# IMPLEMENTACION DEL PIPELINE
with beam.Pipeline(options=options) as p:
    lines = p | 'Read from Kafka' >> ReadFromKafka(
        consumer_config=kafka_consumer_config,
        topics=[kafka_topic],
        start_read_time=kafka_read_time,
        # Use 'stop_read_time' to read up to a specific timestamp
    )

    lines | 'Write to BigQuery' >> WriteToBigQuery(
        table_spec,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        project=project_id
    )


ValueError: Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of gcp-test-chaki:my-bigquery-dataset.my-bigquery-table.