## Enriching Data with moving statistics

The bigquery cell magic allows for a rapid explorative development, but clutters the notebook with large result sets.

In [70]:
%load_ext google.cloud.bigquery

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery


In [71]:
%%bigquery
SELECT 
    DEP_T, AIRLINE, ARR, DEP_DELAY, ARR_DELAY
from `going-tfx.examples.ATL_JUNE_SIGNATURE` 
where date='2006-06-12'
order by dep_t limit 2

Unnamed: 0,DEP_T,AIRLINE,ARR,DEP_DELAY,ARR_DELAY
0,545,US,PHX,4.0,-2.0
1,600,YV,IAD,15.0,18.0


---
Doing it with ```datalab.bigquery``` doesn't display the large result set

In [94]:
import google.datalab.bigquery as bq
samples=bq.Query("""
SELECT 
    DEP_T, AIRLINE, ARR, DEP_DELAY, ARR_DELAY
FROM 
    `going-tfx.examples.ATL_JUNE_SIGNATURE` 
WHERE 
    date='2006-06-12'
ORDER BY 
    dep_t
""").execute().result().to_dataframe()

In [99]:
two_records = samples[:2].to_dict(orient='records')
print("%s samples, showing first 2:" % len(samples))
two_records

1075 samples, showing first 2:


[{u'AIRLINE': 'US',
  u'ARR': 'PHX',
  u'ARR_DELAY': -2.0,
  u'DEP_DELAY': 4.0,
  u'DEP_T': 545},
 {u'AIRLINE': 'YV',
  u'ARR': 'IAD',
  u'ARR_DELAY': 18.0,
  u'DEP_DELAY': 15.0,
  u'DEP_T': 600}]

---
### Beam Transform ```DoFn```s

In [88]:
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions

#### Add Timestamp

In [89]:
class AddTimeStampDoFn(beam.DoFn):

    def __init__(self, offset, *args, **kwargs):
        self.offset = offset
        super(beam.DoFn, self).__init__(*args, **kwargs)
    
    def process(self, element):
        timestamp = (self.offset + 
                      (element['DEP_T'] // 100) * 3600 +
                      (element['DEP_T'] % 100) * 60)
        time_stamped = beam.window.TimestampedValue(element, timestamp)
        yield time_stamped

#### Add hour of day

In [90]:
class Add_HOD(beam.DoFn):
    def process(self, element):
        element=element.copy()
        dep_t = element['DEP_T']
        element['HOD'] = dep_t // 100
        yield element

#### Key out DEP_DELAY for averaging

In [91]:
class DEP_DELAY_by_HOD(beam.DoFn):
    def process(self, element):
        element=element.copy()
        yield element['HOD'], element['DEP_DELAY']

#### Key the whole records
Keying the records allows as to CoGroupByKey after the windowed statistics are available

In [92]:
class Record_by_HOD(beam.DoFn):
    def process(self, element):
        element=element.copy()
        yield element['HOD'], element

#### Unnest 
Unnest the resulting structure coming from ```CoGroupByKey``` to simple records

In [79]:
class Flatten_EnrichedFN(beam.DoFn):
    def process(self, element):
        hod=element[0]
        avg=element[1]['avg'][0]
        cnt=element[1]['cnt'][0]
        records=element[1]['rec'][0]
        for record in records:
            record['CNT_BTH']=cnt
            record['AVG_DEP_DELAY_BTH']=avg
            yield record

We'll add a timestamp as if the records were all today's records

### Creating the pipeline

In [82]:
import time
import datetime
OFFSET = int(time.time() // 86400 * 86400)
OFFSET

1543622400

In [101]:
data = samples.to_dict(orient='records')

In [102]:
windowed = (
    data
    | "Add_timestamp" >> beam.ParDo(AddTimeStampDoFn(OFFSET))
    | "Add_HOD" >> beam.ParDo(Add_HOD())
    | "Window_1h" >> beam.WindowInto(window.FixedWindows(3600))
)
len(windowed)

1075

#### Counts by the hour

In [109]:
records_by_hod = (
    windowed
    | "Record_by_HOD" >> beam.ParDo(Record_by_HOD())
    | "Group_by_HOD" >> beam.GroupByKey()
)
counts_by_hod = (
    records_by_hod
    | "Count" >> beam.CombineValues(beam.combiners.CountCombineFn())
)
counts_by_hod[:5]

[(5, 1), (7, 47), (6, 10), (9, 106), (8, 64)]

In [110]:
#records_by_hod[2]

#### Averages by the hour

In [111]:
avgs_by_hod = (
    windowed    
    | "Make_HOD" >> beam.ParDo(DEP_DELAY_by_HOD())
    | "Avg_by_HOD" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
)
avgs_by_hod[:3]

[(5, 4.0), (7, 4.148936170212766), (6, 0.8)]

#### Co-Group and Flatten

In [113]:
combined = ( {'cnt': counts_by_hod, 'avg': avgs_by_hod, 'rec': records_by_hod }
 | "Co_Group_HOD" >> beam.CoGroupByKey()
 | "Flatten" >> beam.ParDo(Flatten_EnrichedFN())
)
combined[:3]

[{u'AIRLINE': 'US',
  u'ARR': 'PHX',
  u'ARR_DELAY': -2.0,
  'AVG_DEP_DELAY_BTH': 4.0,
  'CNT_BTH': 1,
  u'DEP_DELAY': 4.0,
  u'DEP_T': 545,
  'HOD': 5},
 {u'AIRLINE': 'DL',
  u'ARR': 'PHL',
  u'ARR_DELAY': -2.0,
  'AVG_DEP_DELAY_BTH': 4.148936170212766,
  'CNT_BTH': 47,
  u'DEP_DELAY': 7.0,
  u'DEP_T': 700,
  'HOD': 7},
 {u'AIRLINE': 'EV',
  u'ARR': 'IAH',
  u'ARR_DELAY': -3.0,
  'AVG_DEP_DELAY_BTH': 4.148936170212766,
  'CNT_BTH': 47,
  u'DEP_DELAY': 0.0,
  u'DEP_T': 700,
  'HOD': 7}]