<a href="https://colab.research.google.com/github/Ajay-user/Apache-beam/blob/main/foundation/Creating_an_input_transform_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ! pip install -q --upgrade pip setuptools wheel
# ! pip install --quiet apache-beam

In [14]:
import apache_beam as beam

In [16]:
# !mkdir data

In [None]:
%%writefile data/my-text-file-1.txt
This is just a plain text file, UTF-8 strings are allowed 🎉.
Each line in the file is one element in the PCollection.

Writing data/my-text-file-1.txt


In [None]:
%%writefile data/my-text-file-2.txt
There are no guarantees on the order of the elements.
ฅ^•ﻌ•^ฅ

Writing data/my-text-file-2.txt


In [None]:
%%writefile data/penguins.csv
species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222
1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333
2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5
2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334

Writing data/penguins.csv


## A simple pipeline

In [25]:
with beam.Pipeline() as pipe:
  (pipe
   |'read'>>beam.io.ReadFromText('data/*.txt')
   |'write'>>beam.io.WriteToText('data/output', file_name_suffix='.txt'))

In [28]:
! cat data/output*.txt

There are no guarantees on the order of the elements.
ฅ^•ﻌ•^ฅ
This is just a plain text file, UTF-8 strings are allowed 🎉.
Each line in the file is one element in the PCollection.


## Creating an input transform

In [34]:
import apache_beam as beam
from apache_beam.options import pipeline_options
from typing import Iterable


@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(int)
def Count(pbegin:beam.pvalue.PBegin, n:int)->beam.PCollection[int]:
  # generator function can be defined locally within the PTransform. 
  def count(n:int)->Iterable[int]:
    for i in range(n):
      yield i
  # ouput
  return (
      pbegin
      |'create inputs'>>beam.Create([n])
      |'generate numbers'>>beam.FlatMap(count)
  )
  

# counter 
n = 5

with beam.Pipeline() as pipe:
  (
   pipe
   |f'count to {n}'>> Count(n)
   | 'print'>> beam.Map(print)
  )



0
1
2
3
4


## Beam file system

Beam already supports several different file systems besides local files

In [46]:
from apache_beam.io.filesystems import FileSystems as beam_fs

beam_fs.match(patterns=['data/*.csv'])

[<apache_beam.io.filesystem.MatchResult at 0x7fba6b3b48d0>]

In [49]:
beam_fs.match(patterns=['data/*.csv'])[0].metadata_list

[FileMetadata(data/penguins.csv, 528, 1659180717.7101028)]

In [51]:
beam_fs.match(patterns=['data/*.csv'])[0].metadata_list[0].path

'data/penguins.csv'

In [64]:
from apache_beam.io.filesystems import FileSystems as beam_fs
import csv
import codecs

file_path = 'data/penguins.csv'

with beam_fs.open(file_path) as f:
  for row in csv.DictReader(codecs.iterdecode(f,'utf-8')):
    print(dict(row))

{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}
{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}
{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}
{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}
{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}
{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_l

## Reading CSV files

In [63]:
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems as beam_fs
from apache_beam.options.pipeline_options import PipelineOptions
import codecs
import csv
from typing import Dict, Iterable, List

@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(Dict[str,str])
def ReadCSVfiles(pbegin:beam.pvalue.PBegin, file_patterns:List[str])->beam.PCollection[Dict[str,str]]:
 
  def expand_pattern(file_patterns:str)->Iterable[str]:
    for result in beam_fs.match(patterns=[file_patterns])[0].metadata_list:
      yield result.path
  
  def readCSVfile(file_name:str)->Iterable[Dict[str,str]]:
    with beam_fs.open(file_name) as f:
      # the csv.DictReader need string but beam_fs.open give as output bytes
      # so encode them using codecs.iterdecode
      for row in csv.DictReader(codecs.iterdecode(f,'utf-8')):
        yield dict(row)
      
  return(
      pbegin
      |'create input'>>beam.Create(file_patterns)
      |'expand patterns'>>beam.FlatMap(expand_pattern)
      |'read csv file'>>beam.FlatMap(readCSVfile)
  ) 



input_patterns = ['data/*.csv']
options = PipelineOptions(flags=[], type_check_additional='all')

with beam.Pipeline(options=options) as pipe:
  (
   pipe
   |'reading csv files'>>ReadCSVfiles(input_patterns)
   |'printing'>>beam.Map(print)
  )

{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}
{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}
{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}
{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}
{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}
{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_l

## Reading from a SQLite database

In [83]:
import sqlite3

db_name = 'moon-phase.db'

with sqlite3.connect(db_name) as db:
  cursor = db.cursor()

  cursor.execute('''
    create table if not exists moon_phases (
    id integer primary key,
    phase_emoji text not null,
    peak_datetime datetime not null,
    emoji text not null
    )''')
  
  cursor.executemany('''
     insert into moon_phases 
     (id, phase_emoji, peak_datetime, emoji)
     values (?,?,?,?)
     ''', [
      (1, '🌕', '2017-12-03 15:47:00', 'Full Moon'),
      (2, '🌗', '2017-12-10 07:51:00', 'Last Quarter'),
      (3, '🌑', '2017-12-18 06:30:00', 'New Moon'),
      (4, '🌓', '2017-12-26 09:20:00', 'First Quarter'),
      (5, '🌕', '2018-01-02 02:24:00', 'Full Moon'),
      (6, '🌗', '2018-01-08 22:25:00', 'Last Quarter'),
      (7, '🌑', '2018-01-17 02:17:00', 'New Moon'),
      (8, '🌓', '2018-01-24 22:20:00', 'First Quarter'),
      (9, '🌕', '2018-01-31 13:27:00', 'Full Moon')
      ])
  

  cursor.execute('select * from moon_phases')

  for row in cursor.fetchall():
    print(row)

(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')
(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')
(3, '🌑', '2017-12-18 06:30:00', 'New Moon')
(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')
(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')
(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')
(7, '🌑', '2018-01-17 02:17:00', 'New Moon')
(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')
(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')


In [92]:
from typing import Iterable, List, Tuple


class SQLiteSelect(beam.DoFn):

  def __init__(self, db_name:str):
    super(SQLiteSelect, self).__init__()
    self.db_name = db_name

  def setup(self):
      self.connection = sqlite3.connect(self.db_name)
  
  def process(self, query:Tuple[str, List[str]])->Iterable[Dict[str,str]]:
      table, columns = query
      cursor = self.connection.cursor()
      cursor.execute(f"select {','.join(columns)} from {table}")
      for row in cursor.fetchall():
        yield dict(zip(columns, row))

  def teardown(self):
    self.connection.close()

class SQLiteSelect(beam.DoFn):
  def __init__(self, database_file: str):
    self.database_file = database_file
    self.connection = None

  def setup(self):
    self.connection = sqlite3.connect(self.database_file)

  def process(self, query: Tuple[str, List[str]]) -> Iterable[Dict[str, str]]:
    table, columns = query
    cursor = self.connection.cursor()
    cursor.execute(f"SELECT {','.join(columns)} FROM {table}")
    for row in cursor.fetchall():
      yield dict(zip(columns, row))

  def teardown(self):
    self.connection.close()


@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(Dict[str,str])
def SelectFromSQLite(pbegin:beam.pvalue.PBegin,
                     db_name:str,
                     query:List[Tuple[str, List[str]]])->beam.PCollection(Dict[str,str]):

  return (
      pbegin
      |'create input'>>beam.Create(query)
      |'querying sqlite'>>beam.ParDo(SQLiteSelect(db_name))
  )




queries = [
    # (table_name, [column1, column2, ...])
    ('moon_phases', ['phase_emoji', 'peak_datetime', 'emoji']),
    ('moon_phases', ['phase_emoji', 'peak_datetime']),
]


with beam.Pipeline() as pipe:
  (
   pipe
   |'read from sqlite' >>SelectFromSQLite(db_name, queries)
   |'printing'>>beam.Map(print)
  )

{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'emoji': 'Full Moon'}
{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'emoji': 'Last Quarter'}
{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'emoji': 'New Moon'}
{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'emoji': 'First Quarter'}
{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'emoji': 'Full Moon'}
{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'emoji': 'Last Quarter'}
{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'emoji': 'New Moon'}
{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'emoji': 'First Quarter'}
{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'emoji': 'Full Moon'}
{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00'}
{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00'}
{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00'}
{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00