<a href="https://colab.research.google.com/github/MpRonald/Machine-Learning/blob/main/ApacheBeamPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [77]:
# pip install apache-beam

In [78]:
import warnings
warnings.simplefilter("ignore")

In [79]:
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions

In [80]:
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options = pipeline_options)

In [81]:
def text2list(element, delimeter = '|'):
    return element.split(delimeter)

In [82]:
def list2dict(element, cols):
    return dict(zip(cols, element))

In [83]:
def data_treat(element):
    element['ano_mes'] = '-'.join(element['data_iniSE'].split('-')[:2])
    return element

In [84]:
def key_uf(element):
    key = element['uf']
    return (key, element)

In [85]:
def dengue_cases(element):
    uf, registers = element
    for register in registers:
        if bool(re.search(r'\d', register['casos'])):
            yield (f"{uf}-{register['ano_mes']}", float(register['casos']))
        else:
            yield (f"{uf}-{register['ano_mes']}", 0.0)

In [86]:
cols_df_dengue = ['id', 'data_iniSE', 'casos', 'ibge_code', 'cidade',
        'uf', 'cep', 'latitude', 'longitude']

In [87]:
df_dengue = (
    pipeline
    | "Read dengue dataset" >>
        ReadFromText('/content/drive/MyDrive/Datasets/casos_dengue.txt',
                     skip_header_lines = 1)
    | "Text to list" >> beam.Map(text2list)
    | "List to dict" >> beam.Map(list2dict, cols_df_dengue)
    | "Creating year-month" >> beam.Map(data_treat)
    | "Creating key by state" >> beam.Map(key_uf)
    | "Group by state" >> beam.GroupByKey()
    | "Unzip dengue cases" >> beam.FlatMap(dengue_cases)
    | "Sum cases by cases" >> beam.CombinePerKey(sum)
    | "Show results" >> beam.Map(print)
)



In [88]:
pipeline.run()



('CE-2015-11', 718.0)
('CE-2015-12', 566.0)
('CE-2019-12', 537.0)
('CE-2015-09', 950.0)
('CE-2015-06', 9605.0)
('CE-2015-05', 18660.0)
('CE-2018-09', 635.0)
('CE-2018-10', 393.0)
('CE-2016-10', 1956.0)
('CE-2016-12', 1339.0)
('CE-2016-03', 5502.0)
('CE-2016-07', 6850.0)
('CE-2017-01', 4304.0)
('CE-2017-07', 2704.0)
('CE-2017-10', 1133.0)
('CE-2018-05', 1265.0)
('CE-2018-06', 932.0)
('CE-2019-11', 567.0)
('CE-2018-04', 3192.0)
('CE-2017-09', 1438.0)
('CE-2018-07', 1019.0)
('CE-2017-03', 11215.0)
('CE-2017-06', 4546.0)
('CE-2019-09', 1635.0)
('CE-2015-10', 635.0)
('CE-2017-08', 1093.0)
('CE-2016-08', 3303.0)
('CE-2016-09', 1958.0)
('CE-2016-11', 1551.0)
('CE-2019-04', 5604.0)
('CE-2019-05', 5198.0)
('CE-2019-06', 4981.0)
('CE-2019-07', 2693.0)
('CE-2019-03', 4167.0)
('CE-2017-12', 529.0)
('CE-2018-12', 334.0)
('CE-2019-10', 690.0)
('CE-2015-03', 7290.0)
('CE-2016-05', 16837.0)
('CE-2016-06', 8875.0)
('CE-2015-01', 1747.0)
('CE-2015-02', 2697.0)
('CE-2015-04', 9292.0)
('CE-2015-07', 4778.

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7e0f17f73190>