In [3]:
from past.builtins import unicode
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | "Create" >> beam.Create(["cat dog", "snake cat", "dog cat cat"])
    counts = (
        lines 
        | "Split" >> (beam.FlatMap(lambda x: x.split(" "))
                           .with_output_types(unicode))
        | "Pair with one" >> beam.Map(lambda x: (x, 1))
        | "Group and sum" >> beam.CombinePerKey(sum)
        
    )
    counts | "Print" >> beam.ParDo(
        lambda w_c: print('%s: %s' % (w_c[0], w_c[1]))
    )

cat: 4
dog: 2
snake: 1


In [12]:
lines.__dict__

{'pipeline': <apache_beam.pipeline.Pipeline at 0x297e65e8788>,
 'tag': None,
 'element_type': str,
 'producer': AppliedPTransform(Cell 3: Create/Map(decode), ParDo),
 'is_bounded': True,
 '_windowing': Windowing(<apache_beam.transforms.window.GlobalWindows object at 0x00000297F778E2C8>, DefaultTrigger(), 1, 1)}

# Apache Beam

*Apache Beam* is an open-source SDK which allows you to build multiple data pipelines from batch or stream based integrations and run it in a direct or distributed way.

*pcollection* - Data structure
*transform* - operations

Create(value)	Creates a PCollection from an iterable. <br/>
Filter(fn)	Use callable fn to filter out elements.<br/>
Map(fn)	Use callable fn to do a one-to-one transformation.<br/>
FlatMap(fn)	Similar to Map, but fn needs to return an iterable of zero or more elements, and these iterables will be flattened into one PCollection.<br/>
Flatten()	Merge several PCollections into a single one.<br/>
Partition(fn)	Split a PCollection into several partitions. fn is a PartitionFn or a callable that accepts two arguments - element, num_partitions.<br/>
GroupByKey()	Works on a PCollection of key/value pairs (two-element tuples), groups by common key, and returns (key, iter<value>) pairs.<br/>
CoGroupByKey()	Groups results across several PCollections by key. e.g. input (k, v) and (k, w), output (k, (iter<v>, iter<w>)).<br/>
RemoveDuplicates()	Get distint values in PCollection.<br/>
CombinePerKey(fn)	Similar to GroupByKey, but combines the values by a CombineFn or a callable that takes an iterable, such as sum, max.<br/>
CombineGlobally(fn)	Reduces a PCollection to a single value by applying fn.<br/>

In [13]:
# import apache_beam as beam
# from apache_beam.options.pipeline_options import PipelineOptions


class ToLower(beam.DoFn):
    def process(self, element):
        return[{'Data': element.lower()}]


class ToReverse(beam.DoFn):
    def process(self, el):
        d = el['Data']
        return [d[::-1]]


if __name__ == '__main__':
    in_file = 'news.txt'
    out_file = 'processed'
    options = PipelineOptions()

    with beam.Pipeline(options=PipelineOptions()) as p:
        r = (
            p | beam.io.ReadFromText(in_file)
            | beam.ParDo(ToLower())
            | beam.ParDo(ToReverse())
            | beam.io.WriteToText(out_file, file_name_suffix='.txt')
        )

        result = p.run()

ERROR:apache_beam.io.filebasedsink:Exception in _rename_batch. src: F:\projects\Data\projects\ads_class\introduction_to_apache_beam\beam-temp-processed-06f1d2828a4611ea8e837470fd124444\3085380e-6214-4eb8-a2cd-deb77a101a91.processed.txt, dst: processed-00000-of-00001.txt, err: [WinError 183] Cannot create a file when that file already exists: 'F:\\projects\\Data\\projects\\ads_class\\introduction_to_apache_beam\\beam-temp-processed-06f1d2828a4611ea8e837470fd124444\\3085380e-6214-4eb8-a2cd-deb77a101a91.processed.txt' -> 'processed-00000-of-00001.txt'


Exception: Encountered exceptions in finalize_write: [OSError(FileExistsError(17, 'Cannot create a file when that file already exists'))] [while running 'Cell 13: WriteToText/Write/WriteImpl/FinalizeWrite']

In [14]:
r

<PCollection[Cell 13: WriteToText/Write/WriteImpl/FinalizeWrite.None] at 0x297f7531dc8>

In [15]:
r.__dict__

{'pipeline': <apache_beam.pipeline.Pipeline at 0x297e65e8c08>,
 'tag': None,
 'element_type': Any,
 'producer': AppliedPTransform(Cell 13: WriteToText/Write/WriteImpl/FinalizeWrite, ParDo),
 'is_bounded': True,
 '_windowing': Windowing(<apache_beam.transforms.window.GlobalWindows object at 0x00000297F77AFD08>, DefaultTrigger(), 1, 1)}