Dataflow : dataflow is a programming model that is designed to process large volumes of data in a *parallel* and scalable manner. It is a simple yet powerful model that allows developers to design flexible parallel pipelines for data transformation, across a variety of execution environments. Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes, including Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Dataflow pipelines can also be run on local machines or on resource managers like Apache Mesos and Apache YARN.



| critira | Dataflow | Dataproc |
| ------- | -------- | -------- |
| recommended for| new data processing pipelines,unified batch and streaming | existing Hadoop/Spark applications,ML and data science ecosystem,large_batch job,preemptibe VMs |
|Fully managed | yes | No |
|Auto scaling | yes,transform-by-transform (adaptive) | yes,based on cluster utilization(reactive) |
|expertise| Apache Beam | hadoop,hive,pig,spark,sparkML,sparkR,presto,zeppelin,... |
|serverless | yes | no |

![pipeline](Media/pipeline.png)

![constructpipeline](Media/constructpipeline.png)

![constructBranchingPipeline](Media/constructBranchingPipeline.png)

**pipeline is a directed graph of steps**  

    p = beam.Pipeline()
    p.run()

In [None]:
import apache_beam as beam
if --name-- == '--main--':
    with beam.Pipeline(argv=sys.argv) as p :
        (p
         |beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') #read input
         |beam.FlatMap(count_words) #find all words
         |beam.CombinePerKey(sum) #combine words
         |beam.io.WriteToText('gs://my-bucket/counts.txt') #write output
        )

In [None]:
# run pipeline
import apache_beam as beam
options = {
    'project': 'my-project', # replace with your project ID
    'runner': 'DataflowRunner', # run on Cloud Dataflow where to run the pipeline
    'staging_location': 'gs://my-bucket/staging',
    'region': 'us-central1', # run in the Dataflow region closest to your location
    'setup_file': './setup.py', # replace with your setup.py file
    'temp_location': 'gs://my-bucket/temp',
    'job_name': 'unique-job-name',
}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options=pipeline_options) # create pipeline

pipeline execution using dataflow runner

run local 
```shell
python ./main.py
```
run on dataflow on cloud
```shell
python ./main.py \
--runner DataflowRunner \
--project $PROJECT_ID  \
--job_name $JOB_NAME  \
--temp_location gs://$BUCKET/tmp/  \
--staging_location gs://$BUCKET/staging/  \
--region $REGION
```

read data from local file system ,cloud storage,pub/sub,bigquery
```python
with beam.Pipeline(options=pipeline_options) as p:
```
read from cloud storage (return string)
```python
lines = p | beam.io.ReadFromText('gs://BUCKET_NAME/FILE_NAME')
```
read from pub/sub (return string)
```python
lines = p | beam.io.ReadFromPubSub(topic='projects/PROJECT_ID/topics/TOPIC_NAME')
```
read from bigquery (return rows)
```python
  query = 'SELECT * FROM `PROJECT_ID.DATASET.TABLE`'
  bq_source = beam.io.BigQuerySource(query=query, use_standard_sql=True) #setup
  bq-data = p | beam.io.Read(bq_source) #read
```

 **write to a bigquery table **
 establish reference to bigquery table
```python
table_spec = bigquery.TableReference(
    projectId=PROJECT_ID,
    datasetId=DATASET_ID,
    tableId=TABLE_ID)
```
write to bigquery table
```python
bq_data | beam.io.WriteToBigQuery(
    table_spec,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
```

# create A PColletion from in-memory data
```python
city_zip_list = [
    ('berlin', '10115'),
    ('berlin', '10999'),
    ('flensburg', '24937'),
    ('flensburg', '24944'),
    ('bremen', '28195'),
    ('bremen', '28759'),
    ('hamburg', '20095'),
    ('hamburg', '20148'),
]
citycodes = p | 'CreateCityCodes' >> beam.Create(city_zip_list) # display name of pipeline step
```


Map and FlatMap
```python
# use map 1:1 transformation between input and output
'wordLength' >> beam.Map(lambda word: len(word))
# use flatmap 1:n transformation between input and output
def my_grep(line, term):
    if term in line:
        yield line # yield is like return but for generator
'grep' >> beam.FlatMap(lambda line: my_grep(line, 'cat')) # searchterm is 'cat'
```
flatmap is similar to map but it can return multiple values for each input element

parDo implement parallel processing
```python
words = .... # input PCollection of strings
class computeWordlengthFn(beam.DoFn): # DoFn is a class that contains the logic for processing elements of a PCollection
    def process(self, element):
        return [len(element)]
wordlengths = words | beam.ParDo(computeWordlengthFn()) # output is PCollection of integers
```

### section 2


**GroupByKey explicitly shuffles key-values pairs**
```python
cityAndZipcodes = p | beam. Map (fields[0], fields[1])
grouped = cityAndZipCodes | beam. GroupByKey()
```
|cityandzip codes | grouped|
|-----------------|--------|
|Lexington, 40513 <br> Nashville, 37027 <br> Lexington, 40502 <br>Seattle, 98125 <br>Mountain View, 94041<br>Seattle, 98133<br>Lexington, 40591<br>Mountain View, 94085<br>| Lexington, [40513, 40502, 40592] <br>Nashville,[37027]<br>Seattle, [98125, 98133]<br> Mountain View, [94041, 94085] |





 





data skew makes grouping less efficient at scale

![CoGroupByKey](Media/CoGroupByKey.png)

combine(reduce) a pcollection
```python
# Applied to a PCollection of valies
totalamount =salesAmounts | beam.CombineGlobally(sum)
# Applied to a PCollection of key-value pairs
totalsalesPerPerson = salesRecords | beam.CombinePerKey(sum)
```


**combinrFn works by overriding existing operations **
```python
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0) # sum, count
    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input, count + 1
    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)
    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum / count if count else float('NaN')
pc = p | beam.Create([1, 2, 3, 4, 5])
average = pc | beam.CombineGlobally(AverageFn())
```

![combine](Media/combine.png)

![flatten](Media/flatten.png)

![partition](Media/partition.png)

![sideinput](Media/sideinputs.png)

How side inputs work 
words = -
```python
words = p | beam.Create(['cat', 'dog', 'elephant', 'rat', 'rat', 'cat'])
def filter_using_length(word,lower_bound,upper_bound=float('inf')):
    if lower_bound <= len(word) <= upper_bound:
        yield word
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0,3)

avg_word_length = (words | 'length' >> beam.Map(len) | 'average' >> beam.CombineGlobally(beam.combiners.MeanCombineFn()))

larger_than_average = words | 'larger' >> beam.Filter(lambda w, avg: w > avg, avg_word_length)
# side input is a PCollectionView
larger_than_average = words | 'larger' >> beam.FlatMap(filter_using_length, lower_bound=pvalue.AsSingleton(avg_word_length))
```



**Every PCollection is processed within window**

* the default window is global window,it starts when the data is input and ends when the last element in window  is processed
* In Bounded Pcollections, commonly the elements are all marked as occuring at the same time, Example : TextIO does this so the global window basically ignores the timestamps 
* the global window is not useful for unbounded Pcollections, because the window never closes

setting a single global window for a PCollection
```python
from apache_beam import window
session_windowed_items =  items | 'window' >> beam.WindowInto(window.GlobalWindows())
```

Time-based windows can be useful for processing time-series data
1- you may have to prepare the date-timestamp ,in this example the dts of the data (log writing time) becomes
 the element time . now the elements have different timestamps from one another
2- using time based windowing the data is processed in groups . in  the example, each group gets its own average
3- there are different kinds if windowing (fixed, sliding, session)

using windowing with Batch (group by time)
```python
lines = p | beam.io.ReadFromText('access.log')
windowed_counts = (lines
                   | 'Timestamp' >> beam.Map(beam.window.TimestampedValue(x,extract_timestamp(x)))
                   | 'Window' >> beam.WindowInto(beam.window.SlidingWindows(60,30))
                   | 'Count' >> (beam.combineGlobally(beam.combiners.CountCombineFn()).without_defaults())
                   )
windowed_counts = windowed_counts | beam.ParDo(PrintWindowFn())
```

dataflow template enable rapid deployment of standard jobs types
dataflow template support non developer users to run jobs
execute templates using cloud console , gcloud command line tool or rest api
```shell
gcloud dataflow jobs run myjob --gcs-location gs://dataflow-templates/latest/Word_Count inputTopic=projects/myproject/topics/mytopic outputTable=project:dataset.table
```
