In [None]:
# # start dataflow job
# python dataflow_python_examples/data_transformation.py \
#   --project=$PROJECT \
#   --region=us-east5 \
#   --runner=DataflowRunner \
#   --staging_location=gs://$PROJECT/test \
#   --temp_location gs://$PROJECT/test \
#   --input gs://$PROJECT/data_files/head_usa_names.csv \
#   --save_main_session

In [None]:
# !pip install "apache-beam[interactive]"

In [2]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.jdbc import WriteToJdbc

In [208]:
# customizing pipeline options
class Myoptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input', 
            help='Input for the pipeline', 
            default='gs://dataflow-samples/shakespeare/kinglear.txt')
        
        parser.add_argument(
            '--output',
            help='Output for the pipeline',
            default='gs://$PROJECT/output/')

# beam.Row and infering schema

In [14]:
import typing

class Transaction(typing.NamedTuple):
    bank: str
    purchase_amount: float

p = beam.Pipeline()
output = (
          p
          | beam.Create([{"bank": "Wells Fargo", "purchase_amount": 103.30}])
          | beam.Map(lambda item : beam.Row(bank=item["bank"], purchase_amount = item["purchase_amount"])).with_output_types(Transaction)
          | beam.Map(print)
        )

p.run()

Row(bank='Wells Fargo', purchase_amount=103.3)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe212ecaeb0>

# beam.Select and infering schema

In [15]:
p = beam.Pipeline()

output = (
          p
          | beam.Create([{"bank": "Wells Fargo", "purchase_amount": 103.30}])
          | beam.Select(bank = lambda item: item["bank"], purchase_amount = lambda item: item["purchase_amount"]).with_output_types(Transaction)
          | beam.Map(print)
        )

p.run()

Row(bank='Wells Fargo', purchase_amount=103.3)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe212b5e7f0>

# creating a DoFn

In [49]:
class ComputeWordLength(beam.DoFn):
    def process(self, element):
        return [(element , len(element))]
    
p = beam.Pipeline()
# creating Pcollection from data in memory
words = beam.Create('who is the man from the moon'.split())
wordlengths = p | words | beam.ParDo(ComputeWordLength()) | beam.Map(print)
p.run()

('who', 3)
('is', 2)
('the', 3)
('man', 3)
('from', 4)
('the', 3)
('moon', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe1f5ac78b0>

# passing side inputs to ParDo

In [421]:
text = """
Lorem ipsum dolor sit amet consectetur adipisicing elit. Quisquam vitae unde voluptatibus dolores perspiciatis, quis amet eveniet aperiam atque placeat laborum? Consequuntur illo accusamus, praesentium doloremque eaque recusandae earum perspiciatis!."""

p = beam.Pipeline()

words = p | beam.Create(text.split())

def filter_using_length(word, lower_bound, upper_bound=float('inf')):
    if lower_bound <= len(word) <= upper_bound:
        yield word

average_word_len = (
    words
    | beam.Map(len)
    | beam.CombineGlobally(beam.combiners.MeanCombineFn()))

small_words = words | "small words" >> beam.FlatMap(filter_using_length, lower_bound=1, upper_bound=3)
larger_words = words | "large words" >> beam.FlatMap(filter_using_length, lower_bound= beam.pvalue.AsSingleton(average_word_len))
p.run()

# tagging multiple outputs

In [436]:
p = beam.Pipeline()
numbers = p | beam.Create([1,2,3,4,5,6,7,8,9,10])

def even_odd_filter(x):
    yield beam.pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
    if x % 10 == 0:
        yield x

results = numbers | beam.FlatMap(even_odd_filter).with_outputs()#('odd', 'even')
p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa297107310>

# creating composite transforms

In [441]:
class ComputeWordLength(beam.PTransform):
    def expand(self, pcoll):
        return pcoll | beam.Map(lambda x: len(x))
    
    p = beam.Pipeline()

# Creating a CombineFn

In [369]:
class ComputeAverag(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)
    
    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input, count + 1
    
    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)
    
    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum/count if count else float('NaN')

p = beam.Pipeline()
# creating Pcollection from data in memory
pcoll = beam.Create([1, 2, 3, 4, 5])
average = p | pcoll | beam.CombineGlobally(ComputeAverag()) | beam.Map(print)
p.run()

15


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fa2c2c13220>

In [19]:
p = beam.Pipeline()

emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

emails = p | 'CreateEmails' >> beam.Create(emails_list)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey()) | beam.Map(print)
p.run()

('amy', {'emails': ['amy@example.com'], 'phones': ['111-222-3333', '333-444-5555']})
('james', {'emails': [], 'phones': ['222-333-4444']})
('carl', {'emails': ['carl@example.com', 'carl@email.com'], 'phones': ['444-555-6666']})
('julia', {'emails': ['julia@example.com'], 'phones': []})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe1f373db20>

In [None]:
input_data = 'data/IBM.csv'
pipeline = beam.Pipeline()

outputs = (
    pipeline
    | 'ReadData' >> beam.io.ReadFromText(input_data, skip_header_lines=1)
    | 'SplitData' >> beam.Map(lambda x: x.split(','))
    | 'FilterData' >> beam.Filter(lambda x: x[1] == 'Yes')
    | 'flatmap' >> beam.FlatMap(lambda x: x)
    | 'pair words with 1' >> beam.Map(lambda x: (x, 1))
    # | "group by key" >> beam.GroupByKey()
    # | 'count words' >> beam.Map(lambda x: (x[0], sum(x[1])))
    | 'group and sum' >> beam.CombinePerKey(sum)
    | 'print' >> beam.Map(print)
)
pipeline.run()

In [None]:
ib.show_graph(p)

In [17]:
pc = p | beam.Create([1, 10, 100, 1000])

def bounded_sum(values, bound=500):
  return min(sum(values), bound)

small_sum = pc | "small sum" >> beam.CombineGlobally(bounded_sum)  # [500]
large_sum = pc | "large sum" >> beam.Map(lambda x : x**2) | beam.Map(print)

In [126]:
config = {
"dbname":"bigdata",
"user":"postgres",
"password":"*****",
"host":"localhost",
"port":"5432"}

conn = psycopg2.connect(**config)
cur = conn.cursor()
cur.execute("select * from employee")
headers = [desc.name for desc in cur.description]

p = beam.Pipeline()
res = (
    p
    | "input" >> beam.Create(cur.fetchall())
    | beam.Map(lambda row : beam.Row(name= row[1], dept=row[2], salary = row[3]))
    | beam.GroupBy("dept")\
        .aggregate_field("name", beam.combiners.CountCombineFn(), "total_employees")\
            .aggregate_field("salary", sum, "total_salary")\
                .aggregate_field("salary", beam.combiners.MeanCombineFn(), "avg_salaries")
    | "format" >> beam.Map(lambda row : f"{row.dept},{row.total_employees},{row.total_salary},{row.avg_salaries}")
    | "save to file" >> beam.io.WriteToText("data/output.txt")
    )
p.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fc53176c040>