In [7]:
import apache_beam as beam
#An Examples of Values
with beam.Pipeline() as p:
    icons=(
    p|beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
        |beam.Values()
        |beam.Map(print)
    )
print("1=========================================================")
#An example of Keys
with beam.Pipeline() as p:
    icons=(
    p|beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
        |beam.Keys()
        |beam.Map(print)
    )
print("2=========================================================")
with beam.Pipeline() as p:
    icons=(
    p|beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
        |beam.KvSwap()
        |beam.Map(print)
    )

Strawberry
Carrot
Eggplant
Tomato
Potato
🍓
🥕
🍆
🍅
🥔
('Strawberry', '🍓')
('Carrot', '🥕')
('Eggplant', '🍆')
('Tomato', '🍅')
('Potato', '🥔')


In [27]:
import apache_beam as beam
with beam.Pipeline() as p:
    plants = (
    p|beam.Create([
        '   🍓Strawberry   \n',
          '   🥕Carrot   \n',
          '   🍆Eggplant   \n',
          '   🍅Tomato   \n',
          '   🥔Potato1   \n',
    ])
        |beam.Map(str.strip)|beam.Map(print)
    )
    
  
with beam.Pipeline() as p2:
    p1 = plants = (
      p2
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ]))
    p2=(p1|beam.Map(lambda x: x.strip('# \n'))
    |beam.Map(print))
  #  p.run()

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato1
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato


In [32]:
import apache_beam as beam
class SplitWords(beam.DoFn):
    def __init__(self,delimiter=','):
        self.delimiter = delimiter
    def process(self, text):
        for word in text.split(self.delimiter):
            yield word

with beam.Pipeline() as p:
    plants = (
    p | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
        |beam.ParDo(SplitWords(','))
        |beam.Map(print)
    )
        

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato


In [34]:
import apache_beam as beam

class AnalyzeElement(beam.DoFn):
    def process(
      self,
      elem,
      timestamp=beam.DoFn.TimestampParam,
      window=beam.DoFn.WindowParam):
        yield '\n'.join([
        '# timestamp',
        'type(timestamp) -> ' + repr(type(timestamp)),
        'timestamp.micros -> ' + repr(timestamp.micros),
        'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
        'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
        '',
        '# window',
        'type(window) -> ' + repr(type(window)),
        'window.start -> {} ({})'.format(
            window.start, window.start.to_utc_datetime()),
        'window.end -> {} ({})'.format(
            window.end, window.end.to_utc_datetime()),
        'window.max_timestamp() -> {} ({})'.format(
            window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])

with beam.Pipeline() as pipeline:
    dofn_params = (
      pipeline
      | 'Create a single test element' >> beam.Create([':)'])
      | 'Add timestamp (Spring equinox 2020)' >>
      beam.Map(lambda elem: beam.window.TimestampedValue(elem, 1584675660))
      |
      'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
      | 'Analyze element' >> beam.ParDo(AnalyzeElement())
      | beam.Map(print))

# timestamp
type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
timestamp.micros -> 1584675660000000
timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)

# window
type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)


In [37]:
import apache_beam as beam

class DoFnMethods(beam.DoFn):
    def __init__(self):
        print('__init__')
        self.window = beam.window.GlobalWindow()

    def setup(self):
        print('setup')

    def start_bundle(self):
        print('start_bundle')

    def process(self, element, window=beam.DoFn.WindowParam):
        self.window = window
        yield '* process: ' + element

    def finish_bundle(self):
        yield beam.utils.windowed_value.WindowedValue(
        value='* finish_bundle: 🌱🌳🌍',
        timestamp=0,
        windows=[self.window],
    )

    def teardown(self):
        print('teardown')

with beam.Pipeline() as pipeline:
    results = (
      pipeline
      | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
      | 'DoFn methods' >> beam.ParDo(DoFnMethods())
      | beam.Map(print))

__init__
setup
start_bundle
* process: 🍓
* process: 🥕
* process: 🍆
* process: 🍅
* process: 🥔
* finish_bundle: 🌱🌳🌍
teardown


In [42]:
import apache_beam as beam
durations = ['annual','biennial','perennial']
def by_duration(plant,num_partitions):
    return durations.index(plant['duration'])
with beam.Pipeline() as p:
    annuals, biennials, perennials= (
    p
        |beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'annual'},
      ])
        #|beam.Partition(by_duration,len(durations))
        |beam.Partition(
        lambda plant, num_partitions: durations.index(plant['duration'])
            ,len(durations)
        )
    )
    
    annuals|beam.Map(lambda x: print('annual:{}'.format(x)))
    biennials|beam.Map(lambda x: print('biennials:{}'.format(x)))
    perennials|beam.Map(lambda x: print('perennials:{}'.format(x)))

perennials:{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
biennials:{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
perennials:{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
annual:{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
annual:{'icon': '🥔', 'name': 'Potato', 'duration': 'annual'}


In [46]:
import apache_beam as beam
import json

def split_dataset(plant, num_partitions, ratio):
    assert num_partitions == len(ratio)
    bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
    total =0
    for i, part in enumerate(ratio):
        total += part
        if bucket < total:
            return i
        return len(ratio) -1
with beam.Pipeline() as p:
    train_dataset, test_dataset = (
    p| 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
        |beam.Partition(split_dataset, 2, ratio=[8,2])
    )
    train_dataset|beam.Map(lambda x: print('train: {}'.format(x)))
    test_dataset|beam.Map(lambda x: print('test: {}'.format(x)))
    

train: {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
train: {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
test: {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
test: {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
train: {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}
