### Apache Beam
**Apache Beam** Apache Beam is a library for data processing. It is often used for Extract-Transform-Load (ETL) jobs, where we:<br>
> Extract from a data source <br>
> Transform that data <br>
> Load that data into a data sink (like a database) <br>

##### In this tutorial, we need only to import these libraries

In [12]:
import apache_beam as beam
import logging
import pandas as pd
import re

##### **SMSSpamCollection** is the file that we will use in this tutorial. This file contains the informations mail about **Spam** and **Ham**

#### 1.Map

> **In PCollection**, Read a dataset from a txt File <br>
> **In PTransformation**, split each element of the PCollection by tab and putting them into a list <br>
> **In Writing on file**, write the results into a file.txt <br>

In [13]:
# Let's create a pipeline and assign it by naming **pipeline1**
pipeline1 = beam.Pipeline()
outputs = (
    pipeline1
    | "Read a dataset" >> beam.io.ReadFromText('SMSSpamCollection')
    # ADDED a Map
    | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
    | 'Write results' >> beam.io.WriteToText("C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output1")
    | 'Print the text file name' >> beam.Map(print) # or beam.LogElements()

)
# To run the pipeline
pipeline1.run()

C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output1-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x180cc0dbb50>

#### 2. Filter

> **In PCollection**, Read a dataset from a txt File <br>
> **In PTransformation**, split each element of the PCollection by tab and putting them into a list <br>
> **In PTransformation**, transform to only return a PCollection that only contains lists with the label **spam** <br>
> **In Writing on file**, write the results into a file.txt <br>

In [14]:
# Let's create a pipeline and assign it by naming **pipeline2**
pipeline2 = beam.Pipeline()
outputs = (
    pipeline2
    | "Read a dataset" >> beam.io.ReadFromText('SMSSpamCollection')
    # ADDED a Map
    | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
     # ADDED Filter
    | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
    | 'Write results' >> beam.io.WriteToText("C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output2")
    | 'Print the text file name' >> beam.Map(print) # or beam.LogElements()

)
# To run the pipeline
pipeline2.run()

C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output2-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x180cc0da010>

#### 3. FlatMap

> **In PCollection**, Read a dataset from a txt File <br>
> **In PTransformation**, split each element of the PCollection by tab and putting them into a list <br>
> **In PTransformation**, transform to only return a PCollection that only contains lists with the label **spam** <br>
> **In PTransformation**,  transform that takes in the function **lambda line: re.findall(r"[a-zA-Z']+", line[1])** to your code below <br>
> **In Writing on file**, write the results into a file.txt <br>

In [15]:
# Let's create a pipeline and assign it by naming **pipeline3**
pipeline3 = beam.Pipeline()
outputs = (
    pipeline3
    | "Read a dataset" >> beam.io.ReadFromText('SMSSpamCollection')
    # ADDED a Map
    | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
     # ADDED Filter
    | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
    # ADDED FlatMap
    | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
    | 'Write results' >> beam.io.WriteToText("C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output3")
    | 'Print the text file name' >> beam.Map(print) # or beam.LogElements()

)
# To run the pipeline
pipeline3.run()

C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output3-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x180cc5cda50>

#### 4. Combine

> **In PCollection**, Read a dataset from a txt File <br>
> **In PTransformation**, split each element of the PCollection by tab and putting them into a list <br>
> **In PTransformation**, transform to only return a PCollection that only contains lists with the label **spam** <br>
> **In PTransformation**,  transform that takes in the function **lambda line: re.findall(r"[a-zA-Z']+", line[1])** to your code below <br>
> **In PTransformation**, associate each word with a numerical value 1 by using **Map(lambda word: (word, 1))** to the pipeline <br>
> **In PTransformation**, combine these numerical values to sum up all the counts of each word <br>
> **In Writing on file**, write the results into a file.txt <br>

In [16]:
# Let's create a pipeline and assign it by naming **pipeline4**
pipeline4 = beam.Pipeline()
outputs = (
    pipeline4
    | "Read a dataset" >> beam.io.ReadFromText('SMSSpamCollection')
    # ADDED a Map
    | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
     # ADDED Filter
    | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
    # ADDED FlatMap
    | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
    | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
    # ADDED CombinePerKey
    | 'Group and sum' >> beam.CombinePerKey(sum)
    | 'Write results' >> beam.io.WriteToText("C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output4")
    | 'Print the text file name' >> beam.Map(print) # or beam.LogElements()

)
# To run the pipeline
pipeline4.run()

C:/Users/Lenovo/OneDrive/Documents1/Apache_Beam/Project2/output4-00000-of-00001


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x180cc495110>