# Data Engineer coding task
Convert level 3 to level 1 data.

# Why Beam? Beam=Batch+strEAM

Apache Beam (https://beam.apache.org/): An advanced unified programming model

Implement batch and streaming data processing jobs that run on any execution engine.

Apache Beam supports distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.

So that Apache Beam application can be deployed on Google Cloud Dataflow as a SERVERLESS solution.

Also, Apache Beam batch application can work in Colab.


# Batch implementation

In order to provide a **Serverless** solution, this task was completed as a notebook. 
This notebook was developed and tested on Colab (https://colab.research.google.com/) along with an Apache Beam DirectRunner.

 <font color=red>Colab will raise some errors for the Apache beam installation. Please ignore them, which won't affect the following functions.</font> 

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

>> pip install --quiet apache-beam



## Data Source

The original data was upload to Google Cloud Storage in advance, which saves time to provide data.

We are to download the data to the local.

In [None]:
run('gsutil cp gs://level3-to-level1-data/market_data_v2.csv .')

>> gsutil cp gs://level3-to-level1-data/market_data_v2.csv .
Copying gs://level3-to-level1-data/market_data_v2.csv...
- [1 files][  2.6 MiB/  2.6 MiB]                                                
Operation completed over 1 objects/2.6 MiB.                                      



In [None]:
!ls -l

total 4680
-rw-r--r-- 1 root root 2026912 Dec 13 09:38 L1_market_data-00000-of-00001.csv
-rw-r--r-- 1 root root 2757472 Dec 13 14:00 market_data_v2.csv
drwxr-xr-x 1 root root    4096 Dec  2 22:04 sample_data


## The Primary Process Code 
 <font color=red>Intendedly deleted </font>

## The Apache Beam ParDo wrapper

In [None]:
import apache_beam as beam

class OneRowParDo(beam.DoFn):
    def process(self, element):
      return processOneMsg(element)

## The Beam Pipeline for batch, actually from an CSV

In [None]:
streamingMode=False
pubSubTopic="projects/grasshopper-298307/topics/L3toL1"

def run_pipeline():
    if streamingMode:
        options = PipelineOptions(args, save_main_session=True, streaming=True)
    else:
        options = beam.options.pipeline_options.PipelineOptions(streaming=False)

    p = beam.Pipeline(options=options)

    if streamingMode:
      read = (
          p 
          | 'Read from PubSub '>> beam.io.ReadFromPubSub(topic=pubSubTopic)
      )
    else:
      read = (
          p 
          | 'Reads from csv' >> beam.io.ReadFromText('market_data_v2.csv') 
      )
    process= (
        read 
        | 'Structures data' >> beam.ParDo(OneRowParDo())
        | 'Save L1 data into a file' >> beam.io.WriteToText('L1_market_data',file_name_suffix=".csv")
    )
    
    result = p.run()
    result.wait_until_finish()  # For it to hold the terminal until it finishes

run_pipeline()

## The generated L1 data has been validated with expected_L1_market_data.csv

For any matched row, the rest of columns carry the same values as the expected file.

You may download the generated data.

In [None]:
!ls -l

total 4680
-rw-r--r-- 1 root root 2026912 Dec 13 14:00 L1_market_data-00000-of-00001.csv
-rw-r--r-- 1 root root 2757472 Dec 13 14:00 market_data_v2.csv
drwxr-xr-x 1 root root    4096 Dec  2 22:04 sample_data


# Implementation Discussion

## Architecture

Due to the shared order book in the memory, the implementation is not scalable. 

Even a STATEFUL Apache Beam doesn't provide the scalability.

In order to be scalable, the shared order book should be put into an external persistent data base, such as Google BigTable.

It depends on the business requirement. If one node can deal with the data in time and the history can be quickly replayed to rebuid the order book. One node model can be accepted. 
