# Custom inputs - ParDo

`ParDo` is the recommended way to implement a source, since using the `Source` interface can be tricky.

To learn more on *when* to use a `Source`, see the [When to use the Source interface
](https://beam.apache.org/documentation/io/developing-io-overview/index.html#when-to-use-source) page.

To learn more *about* `BoundedSource`, see [Custom Inputs - BoundedSource](https://colab.research.google.com/drive/1GLFB4adNS1oifRklCmyF4MFRQuea6MaV).

# Setup

First, let's install `apache-beam`.

In [2]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

>> pip install --quiet apache-beam



# Example 1: Count generator

We do the following in this example:
1. Create a generator function.
1. Iterate over it the generator. 
1. Apply the function using `ParDo`.

Firest, we create a generator that accepts a single input and yields zero or more elements. Notice that we use the `yield` keyword instead of `return` so that the generator resumes execution. For now, this is a simple wrapper over the `range` built-in function.

To learn more about generators, see the [Generators](https://docs.python.org/2/tutorial/classes.html#generators) page.

In [0]:
def count(n):
  for i in range(n):
    yield i

In Python, we can iterate over a generator using a `for` loop.

In [4]:
# Here's how we iterate over a generator.
for i in count(10):
  print(i)

0
1
2
3
4
5
6
7
8
9


In Apache Beam, we must create our inputs before we apply the generator function. In this case, we use the `Create` function with our `n` value as the input. The input of `beam.Create` must be an iterable, and since we only need a single `n` we'll use a list with a single value. 

After we generate our input, we can apply our generator function using the `beam.ParDo` transform. This will take our input values and generate elements.

Finally, we inspect the elements to see what's inside our `PCollection`. Remember that there are no ordering guarantees in the `PCollection`, unlike in the `for` loop iteration.

In [5]:
import apache_beam as beam
import logging

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([10])
      | 'Generate elements' >> beam.ParDo(count)
      | 'Inspect elements' >> beam.Map(logging.warning)
  )



Note that we can pass in a generator to `beam.ParDo`, although it should only work with a `beam.DoFn`. Internally, if `beam.ParDo` receives a `callable`, it will automatically create a `beam.DoFn` and implement its `process()` method for us. It is roughly equivalent to the following:

```python
class CountDoFn(beam.DoFn):
  def __init__(self):
    self.process = count

with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([10])
      | 'Generate elements' >> beam.ParDo(CountDoFn())
      | 'Inspect elements' >> beam.Map(logging.warning)
  )
```

We can also wrap the `ParDo` into a `PTransform` to have a nicer `Source` interface.

In [6]:
class Count(beam.PTransform):
  def __init__(self, n):
    self.n = n

  def expand(self, pcollection):
    return (
      pcollection
      | 'Create N' >> beam.Create([self.n])
      | 'Generate elements' >> beam.ParDo(count)
    )

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Count' >> Count(10)
      | 'Inspect elements' >> beam.Map(logging.warning)
  )



# Example 2: Range generator

Now let's see how we can implement a `range` generator.

We create multiple input ranges that go into a `PCollection`. The `beam.ParDo` transforms could be executed in parallel, even if each generator for every input is run sequentially. The long chain of inputs are generated in batches that run in parallel.

In [7]:
import apache_beam as beam
import logging

def generate_range(start_stop_step):
  start, stop, step = start_stop_step
  for i in range(start, stop, step):
    yield i

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([
          (0, 5, 1),
          (0, 50, 10),
          (0, 500, 100),
      ])
      | 'Generate elements' >> beam.ParDo(generate_range)
      | 'Inspect elements' >> beam.Map(logging.warning)
  )



# Example 3: Reading lines from files

Apache Beam already has the function `beam.io.ReadAllFromText()` that reads lines from files, but the following example shows you how to implement a simple version of this function from scratch.

First, let's create some text files for our example.

In [8]:
run('mkdir -p data1 data2')

>> mkdir -p data1 data2



In [9]:
%%writefile data1/sample.txt
Hello from Apache Beam!
Each line is a different element.
csv,rows,must,span,one,line,per,element
{'json': 'elements', 'should': 'also', 'be': 1, 'line': '.'}

Writing data1/sample.txt


In [10]:
%%writefile data1/emojis.txt
UTF-8 files are also supported!
(*・‿・)ノ⌒*:･ﾟ✧
(ﾉ◕ヮ◕)ﾉ*:・ﾟ✧
(ﾉ☉ヮ⚆)ﾉ ⌒*:･ﾟ✧
※\(^o^)/※

Writing data1/emojis.txt


In [11]:
%%writefile data2/more-data.txt
Hello from another directory!
ฅ^•ﻌ•^ฅ

Writing data2/more-data.txt


To read the lines from these files, we do the following:

1. Create a PCollection of the input file patterns.
1. Expand these file patterns into a list of all the files that match the pattern using the [`glob`](https://docs.python.org/2/library/glob.html) module.
1. Use `beam.FlatMap` for the glob transform to get a single filename per element instead of a list of filenames.
1. Use `beam.ParDo` on the function `read_lines` on every filename, which yields each line as an individual element.

Each file is read sequentially, but all files could be processed in parallel.

In [12]:
import apache_beam as beam
import logging
import glob

def read_lines(filename):
  with open(filename) as f:
    for line in f:
      yield line.strip()

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([
          'data1/*.txt',
          'data2/*.txt',
        ])
      | 'Expand file patterns' >> beam.FlatMap(glob.glob)
      | 'Read lines' >> beam.ParDo(read_lines)
      | 'Inspect elements' >> beam.Map(logging.warning)
  )



# Example 4: Reading from a SQLite database
In this example, we set up a simple SQLite database with sample data and read it from Apache Beam.

For the sample data, we'll take some rows from the `bigquery-public-data.moon_phases.moon_phases` [BigQuery Public Dataset](https://cloud.google.com/bigquery/public-data/).

In [13]:
import sqlite3

with sqlite3.connect('moon-phases.db') as db:
  cursor = db.cursor()

  # Create the moon_phases table.
  cursor.execute('''
    CREATE TABLE IF NOT EXISTS moon_phases (
      id INTEGER PRIMARY KEY,
      phase_emoji TEXT NOT NULL,
      peak_datetime DATETIME NOT NULL,
      phase TEXT NOT NULL
    )
  ''')

  # Truncate the table if it's already populated.
  cursor.execute('DELETE FROM moon_phases')

  # Insert some sample data.
  insert_moon_phase = '''
    INSERT INTO moon_phases(phase_emoji, peak_datetime, phase)
    VALUES(?, ?, ?)
  '''
  cursor.execute(insert_moon_phase, (u'🌕', '2017-12-03 15:47:00', 'Full Moon'))
  cursor.execute(insert_moon_phase, (u'🌗', '2017-12-10 07:51:00', 'Last Quarter'))
  cursor.execute(insert_moon_phase, (u'🌑', '2017-12-18 06:30:00', 'New Moon'))
  cursor.execute(insert_moon_phase, (u'🌓', '2017-12-26 09:20:00', 'First Quarter'))
  cursor.execute(insert_moon_phase, (u'🌕', '2018-01-02 02:24:00', 'Full Moon'))
  cursor.execute(insert_moon_phase, (u'🌗', '2018-01-08 22:25:00', 'Last Quarter'))
  cursor.execute(insert_moon_phase, (u'🌑', '2018-01-17 02:17:00', 'New Moon'))
  cursor.execute(insert_moon_phase, (u'🌓', '2018-01-24 22:20:00', 'First Quarter'))
  cursor.execute(insert_moon_phase, (u'🌕', '2018-01-31 13:27:00', 'Full Moon'))

  # Query for the data in the table to make sure it's populated.
  cursor.execute('SELECT phase_emoji, peak_datetime, phase FROM moon_phases')
  for row in cursor.fetchall():
    print(u'{} {} {}'.format(row[0], row[1], row[2]))

🌕 2017-12-03 15:47:00 Full Moon
🌗 2017-12-10 07:51:00 Last Quarter
🌑 2017-12-18 06:30:00 New Moon
🌓 2017-12-26 09:20:00 First Quarter
🌕 2018-01-02 02:24:00 Full Moon
🌗 2018-01-08 22:25:00 Last Quarter
🌑 2018-01-17 02:17:00 New Moon
🌓 2018-01-24 22:20:00 First Quarter
🌕 2018-01-31 13:27:00 Full Moon


To read from the database, we create a function that does the following:
1. Connects to our database.
1. Performs the query.
1. Yields each row as a new element. 

We can pass the database filename as a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs).

In [14]:
import apache_beam as beam
import logging
import sqlite3

def sqlite_select(query, database_file):
  with sqlite3.connect(database_file) as db:
    cursor = db.cursor()
    cursor.execute(query)
    for row in cursor.fetchall():
      yield row

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([
          'SELECT phase_emoji, peak_datetime, phase FROM moon_phases',
        ])
      | 'Select from SQLite' >> beam.ParDo(sqlite_select, 'moon-phases.db')
      | 'Format rows' >> beam.Map(
          lambda row: u'{} {} {}'.format(row[0], row[1], row[2]))
      | 'Inspect row' >> beam.Map(logging.warning)
  )



We can also wrap this process up into a `PTransform` to have a nicer `Source` interface.

In [15]:
class ReadFromSQLite(beam.PTransform):
  def __init__(self, database_file, query):
    self.database_file = database_file
    self.query = query

  def expand(self, pcollection):
    return (
      pcollection
      | 'Create query' >> beam.Create([self.query])
      | 'Execute query' >> beam.ParDo(sqlite_select, self.database_file)
    )

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read from SQLite' >> ReadFromSQLite(
          'moon-phases.db',
          'SELECT phase_emoji, peak_datetime, phase FROM moon_phases',
        )
      | 'Format rows' >> beam.Map(
          lambda row: u'{} {} {}'.format(row[0], row[1], row[2]))
      | 'Inspect row' >> beam.Map(logging.warning)
  )



This is the simplest approach and works best if you're processing a small number of queries, a common use case. However, this approach creates a new connection for each query and is less ideal if you need to process multiple queries.

Another approach would be to create a `DoFn` where you can store the database connection object so that you only need to connect to the database once. Each worker has to initialize their own connection. You can't create the connection at the constructor since this might potentially be executed in parallel on multiple workers.

We can use the `start_bundle()` and `finish_bundle()` methods to open and close the connection, respectively. These are called on each bundle of queries. Therefore, each bundle of queries will use the same connection and close it when the bundle finishes. This will create at most one connection per worker concurrently.

In [16]:
import apache_beam as beam
import logging
import sqlite3

class SQLiteSelect(beam.DoFn):
  def __init__(self, database_file):
    self.database_file = database_file
    self.db = None

  def start_bundle(self):
    logging.warning('sqlite3 connect')
    self.db = sqlite3.connect(self.database_file)
  
  def finish_bundle(self):
    logging.warning('sqlite3 close')
    self.db.close()

  def process(self, query):
    cursor = self.db.cursor()
    cursor.execute(query)
    for row in cursor.fetchall():
      yield row

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Create inputs' >> beam.Create([
          'SELECT phase_emoji, peak_datetime, phase FROM moon_phases',
        ])
      | 'Select from SQLite' >> beam.ParDo(SQLiteSelect('moon-phases.db'))
      | 'Format rows' >> beam.Map(
          lambda row: u'{} {} {}'.format(row[0], row[1], row[2]))
      | 'Inspect row' >> beam.Map(logging.warning)
  )

