ToString

In [4]:
import apache_beam as beam

# Define the data for jobs and cities
jobs = [
    ("sam", "Engineer"),
    ("sandeep", "DevOps Engineer"),
    ("john", "Data Scientist"),
    ("emma", "Product Manager")
]

city = [
    ("sam", "Delhi"),
    ("sandeep", "Pune"),
    ("john", "San Francisco"),
    ("emma", "New York")
]

# Create a pipeline with the DirectRunner
p = beam.Pipeline(beam.runners.DirectRunner())

# Create PCollections from the sample data
jobs_create = p | "Create jobs" >> beam.Create(jobs)
city_create = p | "Create city" >> beam.Create(city)

# Apply CoGroupByKey to group jobs and city data by name
cogbk = ({'jobs': jobs_create, 'city': city_create}) | "CoGroupByKey" >> beam.CoGroupByKey()

# Convert the grouped data to strings using ToString.Kvs
string_cogbk = cogbk | "ToString.Kvs" >> beam.ToString.Kvs()

# Print the resulting PCollection
string_cogbk | "Print" >> beam.Map(print)

# Run the pipeline
result = p.run()
result.wait_until_finish()


sam,{'jobs': ['Engineer'], 'city': ['Delhi']}
sandeep,{'jobs': ['DevOps Engineer'], 'city': ['Pune']}
john,{'jobs': ['Data Scientist'], 'city': ['San Francisco']}
emma,{'jobs': ['Product Manager'], 'city': ['New York']}


'DONE'

Key

In [5]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = [
        ('category1', 100),
        ('category2', 200),
        ('category1', 150),
        ('category3', 300)
    ]
    
    key_values = p | "Create Data" >> beam.Create(data)
    keys = key_values | "Extract Keys" >> beam.Keys()
    
    keys | "Print Keys" >> beam.Map(print)


category1
category2
category1
category3


Values

In [6]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = [
        ('category1', 100),
        ('category2', 200),
        ('category1', 150),
        ('category3', 300)
    ]
    
    key_values = p | "Create Data" >> beam.Create(data)
    values = key_values | "Extract Values" >> beam.Values()
    
    values | "Print Values" >> beam.Map(print)


100
200
150
300


In [None]:
kvswap

In [7]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = [
        ('category1', 100),
        ('category2', 200),
        ('category1', 150),
        ('category3', 300)
    ]
    
    key_values = p | "Create Data" >> beam.Create(data)
    swapped_kvs = key_values | "Swap KVs" >> beam.KvSwap()
    
    swapped_kvs | "Print Swapped KVs" >> beam.Map(print)


(100, 'category1')
(200, 'category2')
(150, 'category1')
(300, 'category3')


Distinct

In [8]:
import apache_beam as beam

# Sample data with duplicates
data = [
    'apple', 'banana', 'apple', 'orange', 'banana', 'grape'
]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(data)
    
    # Apply the Distinct transformation to remove duplicates
    distinct_data = input_data | "Remove Duplicates" >> beam.Distinct()
    
    # Print the distinct elements
    distinct_data | "Print Distinct Elements" >> beam.Map(print)


apple
banana
orange
grape


CombineGlobally

In [9]:
import apache_beam as beam

# Sample data: a list of numbers
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(numbers)
    
    # Apply CombineGlobally transformation to sum all the numbers
    summed_value = input_data | "Sum Globally" >> beam.CombineGlobally(sum)
    
    # Print the summed value
    summed_value | "Print Summed Value" >> beam.Map(print)


55


In [None]:
CombinePerKey

In [10]:
import apache_beam as beam

# Sample data: a list of key-value pairs
key_value_pairs = [
    ('cat', 1), 
    ('dog', 2), 
    ('cat', 3), 
    ('dog', 4), 
    ('mouse', 5)
]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(key_value_pairs)
    
    # Apply CombinePerKey transformation to find the maximum value for each key
    max_per_key = input_data | "Max Per Key" >> beam.CombinePerKey(max)
    
    # Print the maximum values per key
    max_per_key | "Print Max Per Key" >> beam.Map(print)


('cat', 3)
('dog', 4)
('mouse', 5)


Count.Globally

In [11]:
import apache_beam as beam

# Sample data: a list of elements
elements = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(elements)
    
    # Apply Count.Globally transformation to count all elements
    total_count = input_data | "Count Globally" >> beam.combiners.Count.Globally()
    
    # Print the total count
    total_count | "Print Total Count" >> beam.Map(print)


10


Count.PerKey

In [12]:
import apache_beam as beam

# Sample data: a list of key-value pairs
key_value_pairs = [
    ('cat', 1), 
    ('dog', 2), 
    ('cat', 3), 
    ('dog', 4), 
    ('mouse', 5)
]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(key_value_pairs)
    
    # Apply Count.PerKey transformation to count occurrences per key
    counts_per_key = input_data | "Count Per Key" >> beam.combiners.Count.PerKey()
    
    # Print the counts per key
    counts_per_key | "Print Counts Per Key" >> beam.Map(print)


('cat', 2)
('dog', 2)
('mouse', 1)


Count.PerElement

In [13]:
import apache_beam as beam

# Sample data: a list of elements
elements = [1, 2, 3, 1, 2, 3, 4, 5]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(elements)
    
    # Apply Count.PerElement transformation to count occurrences per element
    counts_per_element = input_data | "Count Per Element" >> beam.combiners.Count.PerElement()
    
    # Print the counts per element
    counts_per_element | "Print Counts Per Element" >> beam.Map(print)


(1, 2)
(2, 2)
(3, 2)
(4, 1)
(5, 1)


In [14]:
import apache_beam as beam

# Sample data: a list of numbers
numbers = [1, 2, 3, 4, 5]

# Create a Beam pipeline
with beam.Pipeline() as p:
    # Create a PCollection from the sample data
    input_data = p | "Create Input Data" >> beam.Create(numbers)
    
    # Apply Mean.Globally transformation to calculate the mean of the numbers
    mean_value = input_data | "Calculate Mean Globally" >> beam.combiners.Mean.Globally()
    
    # Print the mean value
    mean_value | "Print Mean Globally" >> beam.Map(print)


3.0
