In [1]:
import apache_beam as beam

# Initialize the pipeline with the DirectRunner
p = beam.Pipeline()

# Here you can define your pipeline steps (currently, it does nothing)

# Run the pipeline
result = p.run()

# Wait until the pipeline execution is complete
result.wait_until_finish()


'DONE'

In [2]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    # This is a placeholder for pipeline operations
    pass


In [5]:
import apache_beam as beam

def run_pipeline():
    # Sample texts
    texts = [
        'Apache Beam is a unified programming model for both batch and streaming data processing.',
        'It provides a way to write data processing pipelines and run them on any execution engine.',
        "Beam's model is easy to use yet powerful in its capabilities."
    ]

    # Create a pipeline
    with beam.Pipeline() as pipeline:
        # Create a PCollection from the texts
        pcollection = pipeline | 'Create PCollections' >> beam.Create(texts)

        # Apply a transformation: split each text into words
        words = pcollection | 'Split Words' >> beam.FlatMap(lambda text: text.split())

        # Count the occurrences of each word
        word_counts = words | 'Count Words' >> beam.combiners.Count.PerElement()

        # Output the results to console
        word_counts | 'Print Results' >> beam.Map(print)

if __name__ == '__main__':
    run_pipeline()


('Apache', 1)
('Beam', 1)
('is', 2)
('a', 2)
('unified', 1)
('programming', 1)
('model', 2)
('for', 1)
('both', 1)
('batch', 1)
('and', 2)
('streaming', 1)
('data', 2)
('processing.', 1)
('It', 1)
('provides', 1)
('way', 1)
('to', 2)
('write', 1)
('processing', 1)
('pipelines', 1)
('run', 1)
('them', 1)
('on', 1)
('any', 1)
('execution', 1)
('engine.', 1)
("Beam's", 1)
('easy', 1)
('use', 1)
('yet', 1)
('powerful', 1)
('in', 1)
('its', 1)
('capabilities.', 1)


In [6]:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://sandeep-apache/data.txt',
                        help='Input file path in GCS.')
    parser.add_argument('--output',
                        dest='output',
                        default='gs://sandeep-apache/output.txt',
                        help='Output file path in GCS.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    # Define PipelineOptions with Google Cloud specific options
    options = PipelineOptions(pipeline_args)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.region = 'us-east-1'  # Replace with your preferred GCP region
    google_cloud_options.temp_location = 'gs://sandeep-apache/temp'  # Temporary storage location
    google_cloud_options.staging_location = 'gs://sandeep-apache/staging'  # Staging location for dataflow jobs
    
    with beam.Pipeline(options=options) as pipeline:
        # Read from the specified input file in GCS
        lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
        
        # Count the occurrences of each word
        word_counts = (
            lines
            | 'Split Words' >> beam.FlatMap(lambda line: line.split())
            | 'Count Words' >> beam.combiners.Count.PerElement()
        )
        
        # Format the output and print to console
        def format_output(word_count):
            (word, count) = word_count
            return f'{word}: {count}'
        
        word_counts | 'FormatOutput' >> beam.Map(format_output) | 'PrintResults' >> beam.Map(print)
        
        # Write the output to the specified output file in GCS
        # Commenting out this line since we are printing instead of writing to a file
        # word_counts | 'WriteToText' >> beam.io.WriteToText(known_args.output)

if __name__ == '__main__':
    run()




The: 3
Essence: 1
of: 18
Data: 15
Engineering:: 1
Building: 1
the: 16
Backbone: 1
Modern: 1
Infrastructure: 1
In: 2
today's: 1
digital: 1
age,: 1
data: 31
has: 1
emerged: 1
as: 7
a: 8
crucial: 2
asset: 1
for: 6
organizations,: 1
driving: 2
decision-making: 1
processes: 1
and: 35
enabling: 1
innovation: 2
across: 1
industries.: 1
At: 1
heart: 1
this: 2
transformation: 1
is: 12
engineering—a: 1
field: 1
dedicated: 1
to: 14
designing,: 1
constructing,: 1
maintaining: 1
infrastructure: 1
required: 1
manage: 1
process: 2
large: 1
volumes: 1
efficiently.: 1
This: 3
article: 1
delves: 1
into: 3
core: 1
aspects: 1
engineering,: 2
highlighting: 1
its: 2
importance,: 1
key: 1
components,: 1
skills: 2
necessary: 1
excel: 2
in: 7
domain.: 1
Importance: 1
Engineering: 2
engineering: 5
forms: 1
backbone: 1
any: 1
data-driven: 1
organization.: 2
It: 1
ensures: 2
that: 6
collected,: 2
stored,: 1
processed,: 1
made: 2
accessible: 1
manner: 1
both: 1
reliable: 1
scalable.: 1
Without: 1
robust: 2
practic

In [14]:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--output',
                        dest='output',
                        default='gs://sandeep-apache/output.txt',
                        help='Output file path in GCS.')
    parser.add_argument('--project',
                        dest='project',
                        default='techlanders-internal',
                        help='GCP project ID.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    # Define the default BigQuery table
    default_table = 'notebook_dataset.users'  # Replace with your default BigQuery table
    
    # Define PipelineOptions with Google Cloud specific options
    options = PipelineOptions(pipeline_args)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = 'us-east-1'  # Replace with your preferred GCP region
    google_cloud_options.temp_location = 'gs://sandeep-apache/temp'  # Temporary storage location
    google_cloud_options.staging_location = 'gs://sandeep-apache/staging'  # Staging location for Dataflow jobs
    
    with beam.Pipeline(options=options) as pipeline:
        # Read from BigQuery
        query = f'SELECT name, surname, age FROM `{default_table}`'
        rows = pipeline | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
            query=query,
            use_standard_sql=True
        )
        
        # Print the rows to console
        rows | 'PrintResults' >> beam.Map(print)
        
        # Write the output to the specified output file in GCS
        # Commenting out this line since we are printing instead of writing to a file
        # rows | 'WriteToText' >> beam.io.WriteToText(known_args.output)

if __name__ == '__main__':
    run()




{'name': 'Anil Kumar', 'surname': 'Yellamandala', 'age': 21}
{'name': 'Sandipan', 'surname': 'Pramanik', 'age': 21}
{'name': 'Pankaj', 'surname': 'Sharma', 'age': 21}
{'name': 'Subodh', 'surname': 'Agrawal', 'age': 21}
{'name': 'Sweta', 'surname': 'Ghatak', 'age': 23}
{'name': 'Himanshu', 'surname': 'Gupta', 'age': 23}
{'name': 'Papia', 'surname': 'Dastidar', 'age': 23}
{'name': 'Rohit', 'surname': 'Manhas', 'age': 26}
{'name': 'AMIT', 'surname': 'CHOUDHARY', 'age': 26}
{'name': 'Shikha', 'surname': 'Jain', 'age': 26}
{'name': 'MAHESH', 'surname': 'SUBBAIAH', 'age': 26}
{'name': 'Shambhu', 'surname': 'Sharma', 'age': 27}
{'name': 'Ritam', 'surname': 'Bit', 'age': 27}
{'name': 'Krishna', 'surname': 'Choudhury', 'age': 27}
{'name': 'Abhishek', 'surname': 'Dubey', 'age': 27}
{'name': 'GAUTAM', 'surname': 'DUTTA', 'age': 33}
{'name': 'Pankaj', 'surname': 'Tiwary', 'age': 33}
{'name': 'K S', 'surname': 'Suraj', 'age': 33}
{'name': 'Virgina', 'surname': 'Priyadarshini D S', 'age': 33}
{'name

In [None]:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--output',
                        dest='output',
                        default='gs://sandeep-apache/data.txt',
                        help='Output file path in GCS.')
    parser.add_argument('--project',
                        dest='project',
                        default='techlanders-internal',
                        help='GCP project ID.')
    parser.add_argument('--dataset',
                        dest='dataset',
                        default='notebook_dataset',
                        help='BigQuery dataset name.')
    parser.add_argument('--table',
                        dest='table',
                        default='users',
                        help='BigQuery table name.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    # Define PipelineOptions with Google Cloud specific options
    options = PipelineOptions(pipeline_args)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = 'us-east-1'  # Replace with your preferred GCP region
    google_cloud_options.temp_location = 'gs://sandeep-apache/temp'  # Temporary storage location
    google_cloud_options.staging_location = 'gs://sandeep-apache/staging'  # Staging location for Dataflow jobs
    
    with beam.Pipeline(options=options) as pipeline:
        # Define the table_spec
        table_spec = f'{known_args.project}:{known_args.dataset}.{known_args.table}'
        
        # Read from BigQuery
        query = f'SELECT * FROM `{table_spec}`'
        rows = pipeline | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
            query=query,
            use_standard_sql=True
        )
        
        # Print the rows to console
        rows | 'PrintResults' >> beam.Map(print)
        
        # Write the output to the specified output file in GCS
        # Commenting out this line since we are printing instead of writing to a file
        # rows | 'WriteToText' >> beam.io.WriteToText(known_args.output)

if __name__ == '__main__':
    run()
