# Funções Customizadas para Mapeamento

ParDo é uma transformação Beam para processamento paralelo genérico. O paradigma de processamento ParDo é semelhante à fase “Mapa” de um algoritmo no estilo Map/Shuffle/Reduce: uma transformação ParDo considera cada elemento na PCollection de entrada, executa alguma função de processamento (seu código de usuário) naquele elemento e emite zero , um ou vários elementos em uma PCollection de saída.

https://beam.apache.org/documentation/programming-guide/#pardo

ParDo(beam.DoFn())

- __init__
- process
- setup
- teardown
- start_bundle
- finish_bundle

define uma classe <-- herda de --- beam.DoFn

In [2]:
import apache_beam as beam

p1 = beam.Pipeline()

par_do_example = (
    p1
    | beam.io.ReadFromText("data_sample.txt")
    | beam.FlatMap(lambda record: record.split(","))
    | beam.FlatMap(print)
)

p1.run()



ID
Nome
Nota
Turma
Disciplina
1
Iury
10
T1
Mat
2
Davi
8
T1
Mat
3
Vini
6.7
T1
Mat
4
Carol
5.4
T1
Mat
5
Helena
7.8
T1
Mat
6
Michel
7.7
T2
Mat
7
Fernando
5.2
T2
Mat
8
Gabriel
8.9
T2
Mat
9
Carla
8.2
T2
Mat
10
Pablo
10
T2
Mat
11
Pedro
6.2
T2
Mat
12
Emerson
2.1
T2
Mat
13
Iury
9.2
T1
Port
14
Davi
8.4
T1
Port
15
Vini
7.8
T1
Port
16
Carol
4.3
T1
Port
17
Helena
6.2
T1
Port
18
Michel
9.5
T2
Port
19
Fernando
2.5
T2
Port
20
Gabriel
9.0
T2
Port
21
Carla
10
T2
Port
22
Pablo
8.5
T2
Port
23
Pedro
7.5
T2
Port
24
Emerson
9.2
T2
Port


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2b731d41150>

In [17]:
import apache_beam as beam

p1 = beam.Pipeline()

class SplitRow(beam.DoFn):
    def __init__(self, split_char, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.split_char = split_char
    
    def process(self, record):
        yield record.split(self.split_char)

par_do_example = (
    p1
    | beam.io.ReadFromText("data_sample.txt")
    | beam.ParDo(SplitRow(split_char=","))
    # | beam.FlatMap(lambda record: record.split(","))
    | beam.FlatMap(print)
)

p1.run()

['ID', 'Nome', 'Nota', 'Turma', 'Disciplina']
['1', 'Iury', '10', 'T1', 'Mat']
['2', 'Davi', '8', 'T1', 'Mat']
['3', 'Vini', '6.7', 'T1', 'Mat']
['4', 'Carol', '5.4', 'T1', 'Mat']
['5', 'Helena', '7.8', 'T1', 'Mat']
['6', 'Michel', '7.7', 'T2', 'Mat']
['7', 'Fernando', '5.2', 'T2', 'Mat']
['8', 'Gabriel', '8.9', 'T2', 'Mat']
['9', 'Carla', '8.2', 'T2', 'Mat']
['10', 'Pablo', '10', 'T2', 'Mat']
['11', 'Pedro', '6.2', 'T2', 'Mat']
['12', 'Emerson', '2.1', 'T2', 'Mat']
['13', 'Iury', '9.2', 'T1', 'Port']
['14', 'Davi', '8.4', 'T1', 'Port']
['15', 'Vini', '7.8', 'T1', 'Port']
['16', 'Carol', '4.3', 'T1', 'Port']
['17', 'Helena', '6.2', 'T1', 'Port']
['18', 'Michel', '9.5', 'T2', 'Port']
['19', 'Fernando', '2.5', 'T2', 'Port']
['20', 'Gabriel', '9.0', 'T2', 'Port']
['21', 'Carla', '10', 'T2', 'Port']
['22', 'Pablo', '8.5', 'T2', 'Port']
['23', 'Pedro', '7.5', 'T2', 'Port']
['24', 'Emerson', '9.2', 'T2', 'Port']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2b76752ba90>

In [20]:
import apache_beam as beam

p1 = beam.Pipeline()

class SplitRow(beam.DoFn):
    def __init__(self, split_char, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.split_char = split_char
    
    def process(self, record):
        yield record.split(self.split_char)
    
    def setup(self):
        print("__setup__")
    
    def start_bundle(self):
        print("start_bundle")
    
    def finish_bundle(self):
        print("finish_bundle")
    
    def teardown(self):
        print("teardown")
    


par_do_example = (
    p1
    | beam.io.ReadFromText("data_sample.txt")
    | beam.ParDo(SplitRow(split_char=","))
    # | beam.FlatMap(lambda record: record.split(","))
    | beam.FlatMap(print)
)

p1.run()

['ID', 'Nome', 'Nota', 'Turma', 'Disciplina']
['1', 'Iury', '10', 'T1', 'Mat']
['2', 'Davi', '8', 'T1', 'Mat']
['3', 'Vini', '6.7', 'T1', 'Mat']
['4', 'Carol', '5.4', 'T1', 'Mat']
['5', 'Helena', '7.8', 'T1', 'Mat']
['6', 'Michel', '7.7', 'T2', 'Mat']
['7', 'Fernando', '5.2', 'T2', 'Mat']
['8', 'Gabriel', '8.9', 'T2', 'Mat']
['9', 'Carla', '8.2', 'T2', 'Mat']
['10', 'Pablo', '10', 'T2', 'Mat']
['11', 'Pedro', '6.2', 'T2', 'Mat']
['12', 'Emerson', '2.1', 'T2', 'Mat']
['13', 'Iury', '9.2', 'T1', 'Port']
['14', 'Davi', '8.4', 'T1', 'Port']
['15', 'Vini', '7.8', 'T1', 'Port']
['16', 'Carol', '4.3', 'T1', 'Port']
['17', 'Helena', '6.2', 'T1', 'Port']
['18', 'Michel', '9.5', 'T2', 'Port']
['19', 'Fernando', '2.5', 'T2', 'Port']
['20', 'Gabriel', '9.0', 'T2', 'Port']
['21', 'Carla', '10', 'T2', 'Port']
['22', 'Pablo', '8.5', 'T2', 'Port']
['23', 'Pedro', '7.5', 'T2', 'Port']
['24', 'Emerson', '9.2', 'T2', 'Port']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2b7654f0c90>