In [1]:
import argparse
import logging
import snappy
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.core import CombinePerKey, GroupByKey
from apache_beam.io.avroio import ReadFromAvro
from apache_beam.transforms.combiners import Sample
import json
from operator import itemgetter
import numpy as np
from apache_beam.transforms.userstate import BagStateSpec, CombiningValueStateSpec,  TimerSpec, on_timer, StateSpec
from apache_beam.coders.coders import VarIntCoder, PickleCoder, BytesCoder, StrUtf8Coder, FastPrimitivesCoder
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.combiners import CountCombineFn
import time
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
import unittest
from apache_beam.metrics.metric import Metrics
from apache_beam.runners.direct.direct_runner import DirectRunner

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


In [2]:
from apache_beam.transforms import core
from apache_beam.transforms import cy_combiners
from apache_beam.transforms import ptransform
from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.typehints import KV
from apache_beam.typehints import Any
from apache_beam.typehints import Dict
from apache_beam.typehints import Iterable
from apache_beam.typehints import List
from apache_beam.typehints import Tuple
from apache_beam.typehints import TypeVariable
from apache_beam.typehints import Union
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types

In [3]:
# Type variables
T = TypeVariable('T')
K = TypeVariable('K')
V = TypeVariable('V')

In [41]:
class LevelsDoFn(beam.DoFn):
    LEVELS = CombiningValueStateSpec(
        'bids', 
        FastPrimitivesCoder(), 
        DepthCombineFn()
    )
    
    def process(
            self, 
            element,
            lvl_state=beam.DoFn.StateParam(LEVELS)
        ):
        
            lvl_state.add(element[1])
            
            yield lvl_state.read()

In [55]:
class CombineTest(unittest.TestCase):

  def test_builtin_combines(self):
    with TestPipeline() as p:

        levels = [
            ("hi", (1, 1)),
            ("hi", (2, 1))
        ]

        result = (p
        | beam.Create(levels)
        | beam.ParDo(LevelsDoFn()))

        assert_that(
          result, equal_to([
             {1:1},
             {1:1, 2:1}
          ])
        )


In [56]:
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:Running ((((((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_ParDo(LevelsDoFn)_4))+(ref_AppliedPTransform_assert_that/WindowInto(WindowIntoFn)_8))+(ref_AppliedPTransform_assert_that/ToVoidKey_9))+(ref_AppliedPTransform_assert_that/Group/pair_with_1_12))+(assert_that/Group/Flatten/Transcode/1))+(assert_that/Group/Flatten/Write/1)
INFO:root:Running (((ref_AppliedPTransform_assert_that/Create/Read_7)+(ref_AppliedPTransform_assert_that/Group/pair_with_0_11))+(assert_that/Group/Flatten/Transcode/0))+(assert_that/Group/Flatten/Write/0)
INFO:root:Running (assert_that/Group/Flatten/Read)+(assert_that/Group/GroupByKey/Write)
INFO:root:Running (((assert_that/Group/GroupByKey/Read)+(ref_AppliedPTransform_assert_that/Group/Map(_merge_tagged_vals_under_key)_18))+(ref_AppliedPTransform_assert_that/Unkey_19))+(ref_AppliedPTransform_assert_that/Match_20)
.
---------------------

In [28]:
class MapDepthIdKVDoFn(beam.DoFn):
   def process(self, element):
        event_id = '_'.join(element['event_id'])
        return [(event_id, element)]

In [29]:
class ParseTestDepthFn(beam.DoFn):
  """Parses the raw game event info into a Python dictionary.
  Each event line has the following format:
    username,teamname,score,timestamp_in_ms,readable_time
  e.g.:
    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
  The human-readable time string is not used here.
  """
  def __init__(self):
    super(ParseTestDepthFn, self).__init__()
    self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

  def process(self, elem):
    try:
        yield {
            "event_id": ["okex_spot", "depthUpdate", "ETH", "BTC", str(elem[0])],
            "event_time_ms": elem[0],
            "event_type": "depthUpdate",
            "exchange": "okex_spot",
            "quote_asset": "BTC", 
            "base_asset": "ETH",
            "bids": gen_levels(elem[2]), 
            "asks": gen_levels(elem[1]),
        }
    except:  # pylint: disable=bare-except
      # Log and count parse errors
      self.num_parse_errors.inc()
      logging.error('Parse error on "%s"', elem)

In [106]:
class EnrichDepthsDoFn(beam.DoFn):
    MAX_BUFFER_SIZE = 500;

    BUFFER_STATE = BagStateSpec('buffer', StrUtf8Coder())
    COUNT_STATE = CombiningValueStateSpec(
      'count',
      VarIntCoder(),
      CountCombineFn()
    )

    EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
    MAX_BUFFER_DURATION = 5
    ALLOWED_LATENESS = 5

    def process(
              self, 
              element,
              w=beam.DoFn.WindowParam,
              buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
              count_state=beam.DoFn.StateParam(COUNT_STATE),
              expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
        ):
          expiry_timer.set(w.end+EnrichDepthsDoFn.ALLOWED_LATENESS)
          jsone = json.dumps(element)
#           print(jsone)
#           print("="*90)
          buffer_state.add(jsone)
          count_state.add(1)

    @on_timer(EXPIRY_TIMER)
    def expiry(
             self,
             buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
             count_state=beam.DoFn.StateParam(COUNT_STATE)
    ):
        events_state = buffer_state.read()
        count = count_state.read()
        
        
        def gen_events(events):
            for event in events:
                yield json.loads(event)
        
        events = gen_events(events_state)
        
        for event in events:
            print(event)
        
        sorted_depths = sorted(
            events,
            key=itemgetter('event_time_ms')
        )
        
        yield sorted_depths   
                
        count_state.clear()
        buffer_state.clear()

In [6]:
class EnrichLevelsDoFn(beam.DoFn):
    BIDS_STATE = CombiningValueStateSpec(
        'bids', 
        PickleCoder(), 
        DepthCombineFn()
    )
    
    ASKS_STATE = CombiningValueStateSpec(
        'asks', 
        PickleCoder(), 
        DepthCombineFn()
    )
    
    def process(
              self, 
              element,
              w=beam.DoFn.WindowParam,
              asks_state=beam.DoFn.StateParam(ASKS_STATE),
              bids_state=beam.DoFn.StateParam(BIDS_STATE),
        ):
#             pass
            print(element)

            for d in element[1]:
                for a in d["asks"]:
                    asks_state.add(a)
                for b in d["bids"]:
                    bids_state.add(b)

                asks = asks_state.read()
                bids = bids_state.read()
                
                yield {
                    'asks':asks,
                    'bids':bids
                }

NameError: name 'FastPrimitivesCoder' is not defined

In [56]:
class EnrichDepths(beam.PTransform):
  """Computes the longest session ending in each month."""
  def expand(self, pcoll):
    return (pcoll
            | 'DiscreteWindows' >> beam.WindowInto(window.FixedWindows((5*60)*10, 0)) # 3 sec
            | 'MapDepthIdKV' >> beam.ParDo(MapDepthIdKVDoFn())
            | "EnrichEvents" >> beam.ParDo(EnrichDepthsDoFn())
            | "EnrichLevels" >> beam.ParDo(EnrichLevelsDoFn())
           )

In [31]:
def gen_levels(side):
    return [{"quantity": x[0], "price": x[1]# class EnrichDepthsDoFnTest(unittest.TestCase):

#     LABELS = [
#         ["okex_spot", "depthUpdate", "ETH", "BTC"]
#     ]

#     SAMPLE_DATA = [
#         [
#           1557669000145, 
#           [
#               [1,1],
#               [1,2],
#               [1,3],
#               [1,4]
#           ],
#           [
#               [1,1],
#               [1,2],
#               [1,3],
#               [1,4]
#           ]
#         ],
#         [
#           1557669000146, 
#           [
#               [0,1],
#               [0,2],
#               [0,3],
#               [0,4]
#           ],(
#           [
#               [0,1],
#               [0,2],
#               [0,3],
#               [0,4]
#           ]
#         ]
#     ]

    
#     def create_data(self, p):
#         return (p
#             | beam.Create(EnrichDepthsDoFnTest.SAMPLE_DATA)
#             | beam.ParDo(ParseTestDepthFn())
#             | beam.Map(lambda e:beam.window.TimestampedValue(e, e['event_time_ms'])))

#     def test_hourly_team_score(self):
#         with TestPipeline() as p:
#             result = (
#                     self.create_data(p)
#                     | "EnrichEvents" >> EnrichDepths()
#             )
            
#             td = EnrichDepthsDoFnTest.SAMPLE_DATA[0]

#             assert_that(
#               result, equal_to([
#                   {
#                         "event_id": ["okex_spot", "depthUpdate", "ETH", "BTC", str(td[0])],
#                         "event_time_ms": td[0],
#                         "event_type": "depthUpdate",
#                         "exchange": "okex_spot",
#                         "quote_asset": "BTC", 
#                         "base_asset": "ETH",
#                         "bids": gen_levels(td[2]), 
#                         "asks": gen_levels(td[1]),
#                   }
#               ])
#             )
} for x in side]

In [115]:
# class EnrichDepthsDoFnTest(unittest.TestCase):

#     LABELS = [
#         ["okex_spot", "depthUpdate", "ETH", "BTC"]
#     ]

#     SAMPLE_DATA = [
#         [
#           1557669000145, 
#           [
#               [1,1],
#               [1,2],
#               [1,3],
#               [1,4]
#           ],
#           [
#               [1,1],
#               [1,2],
#               [1,3],
#               [1,4]
#           ]
#         ],
#         [
#           1557669000146, 
#           [
#               [0,1],
#               [0,2],
#               [0,3],
#               [0,4]
#           ],(
#           [
#               [0,1],
#               [0,2],
#               [0,3],
#               [0,4]
#           ]
#         ]
#     ]

    
#     def create_data(self, p):
#         return (p
#             | beam.Create(EnrichDepthsDoFnTest.SAMPLE_DATA)
#             | beam.ParDo(ParseTestDepthFn())
#             | beam.Map(lambda e:beam.window.TimestampedValue(e, e['event_time_ms'])))

#     def test_hourly_team_score(self):
#         with TestPipeline() as p:
#             result = (
#                     self.create_data(p)
#                     | "EnrichEvents" >> EnrichDepths()
#             )
            
#             td = EnrichDepthsDoFnTest.SAMPLE_DATA[0]

#             assert_that(
#               result, equal_to([
#                   {
#                         "event_id": ["okex_spot", "depthUpdate", "ETH", "BTC", str(td[0])],
#                         "event_time_ms": td[0],
#                         "event_type": "depthUpdate",
#                         "exchange": "okex_spot",
#                         "quote_asset": "BTC", 
#                         "base_asset": "ETH",
#                         "bids": gen_levels(td[2]), 
#                         "asks": gen_levels(td[1]),
#                   }
#               ])
#             )


In [118]:
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:Running (((((((((((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_ParDo(ParseTestDepthFn)_4))+(ref_AppliedPTransform_Map(<lambda at <ipython-input-115-ed684db18926>:45>)_5))+(ref_AppliedPTransform_EnrichEvents/DiscreteWindows_7))+(ref_AppliedPTransform_EnrichEvents/MapDepthIdKV_8))+(ref_AppliedPTransform_EnrichEvents/EnrichEvents_9))+(ref_AppliedPTransform_EnrichEvents/EnrichLevels_10))+(ref_AppliedPTransform_assert_that/WindowInto(WindowIntoFn)_14))+(ref_AppliedPTransform_assert_that/ToVoidKey_15))+(ref_AppliedPTransform_assert_that/Group/pair_with_1_18))+(assert_that/Group/Flatten/Transcode/0))+(assert_that/Group/Flatten/Write/0)
E

['hello', [{'event_id': ['okex_spot', 'depthUpdate', 'ETH', 'BTC', '1557669000145'], 'event_time_ms': 1557669000145, 'event_type': 'depthUpdate', 'exchange': 'okex_spot', 'quote_asset': 'BTC', 'base_asset': 'ETH', 'bids': [{'quantity': 1, 'price': 1}, {'quantity': 1, 'price': 2}, {'quantity': 1, 'price': 3}, {'quantity': 1, 'price': 4}], 'asks': [{'quantity': 1, 'price': 1}, {'quantity': 1, 'price': 2}, {'quantity': 1, 'price': 3}, {'quantity': 1, 'price': 4}]}]]
{'quantity': 1, 'price': 1}
{'quantity': 1, 'price': 2}



ERROR: test_hourly_team_score (__main__.EnrichDepthsDoFnTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 744, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 634, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
  File "apache_beam/runners/common.py", line 831, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 846, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "<ipython-input-117-8f9e25a208c8>", line 26, in process
    asks_state.add(a)
  File "/home/thorad/.conda/envs/axiom/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 305, in add
    self._combinefn.add_input(accumulator, value))
  File "<

In [541]:
def run(argv=None):
    """Build and run the pipeline."""

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = False
    p = beam.Pipeline(options=pipeline_options)

    # ==================================================================>
    # Trades From Avro
    # ==================================================================>

#     tradeRecords = \
#         p | 'readTrades' >> ReadFromAvro(
#             "../resources/trades.avro", 
#             use_fastavro=True
#         )

    # ==================================================================>
    # DepthUpdaate from Avro
    # ==================================================================>

    depthUpdateRecords = \
        p | 'readDepthUpdates' >> ReadFromAvro(
            "../resources/depths/*.avro", 
            use_fastavro=True
        )
    
    # maintains orderly depth state and emits full levels for each 
    # depth update event recieved.
    enrichedDepths = (
        depthUpdateRecords
        | 'FormatPubsub' >> beam.ParDo(FormatPubsubDoFn())
        | 'DiscreteWindows' >> beam.WindowInto(window.FixedWindows((2*60)*100, 0))
        | 'AddSessionInfo' >> beam.ParDo(AddSessionInfoDoFn())
        | 'MapToWindowedPartitionIdKV' >> beam.ParDo(MapToWindowedPartitionIdKVDoFn())
        | 'FilterByPartition' >> beam.Filter(lambda kv: kv[0] == "okex_spot_depthUpdate_ETH_BTC")
        | 'BatchDepthUpdates' >> beam.ParDo(BatchDepthUpdatesDoFn())
#         | 'EnrichDepths' >> beam.ParDo(EnrichDepthsDoFn())
        | 'PrintOutputs' >> beam.ParDo(PrintDoFn())
    )
    
#     mergedEvents = ((enrichedDepths,tradeRecords) | 'MergePCollections' >> beam.Flatten())
    
#     windowed = (
#        enrichedDepths
#        | 'AddEventTimestamp' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time_ms"]))
#        | 'DiscreteWindows' >> beam.WindowInto(window.FixedWindows((2*60)*1000, 0))  2min
#        | 'AddSessionInfo' >> beam.ParDo(AddSessionInfoDoFn())
#        | 'MapToWindowedPartitionIdKV' >> beam.ParDo(MapToWindowedPartitionIdKVDoFn())
#        | 'FilterByPartition' >> beam.Filter(lambda kv: kv[0] == "okex_spot_depthUpdate_ETH_BTC")
#      | 'ExtractDoFn' >> beam.ParDo(ExtractDoFn())   
#      | 'FilterDuplicates' >> FilterDuplicates()
#      )

#     tradeAggregation = (windowed 
# #          | 'FilterDepths' >> beam.ParDo(FilterTrades())

#          | 'MapToWindowedPartitionIdKV' >> beam.ParDo(MapToWindowedPartitionIdKVDoFn())
#          | 'GroupByWindowKey' >> GroupByKey()
#          | 'AggregateTrades' >> beam.ParDo(AggregateTradesDoFn())
#          | 'PrintOutputs' >> beam.ParDo(PrintDoFn())
#     )    
        
#     depthUpdateAggregation = (windowed 
# #        | 'FilterDepths' >> beam.ParDo(FilterDepths())
# #        | 'MapToWindowedPartitionIdKV' >> beam.ParDo(MapToWindowedPartitionIdKVDoFn())
# #        | 'GroupByWindowKey' >> GroupByKey()
#        | 'AggregateDepths' >> beam.ParDo(AggregateDepthsDoFn())
#        | 'PrintDepthOutputs' >> beam.ParDo(PrintDoFn())
#     )
    
#        | 'Merge trade and depth aggregations'
#        | 'JoinByWindow' >> beam.ParDo(FilterDuplicatesDoFn())
#        | 'AggregateFeatures' >> beam.ParDo(AggregateFeaturesDoFn())

    # ==================================================================>
    # Windowed Aggregations and Ingress
    # ==================================================================>
    
    #     filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
    #       './outputData.txt')

    # For each window 
    # --------------------------------->
    # combine 
    # window depth updates and trades
    # remove duplicates
    # update depth cache
    # aggregate trade events
    # aggregate depth events
    # join depth and trade aggregations by window 
    # emit full row
    # store row in avro with partitioned file layout

    result = p.run()
    result.wait_until_finish()

In [553]:
run()



TypeError: '_ConcatIterable' object does not support item assignment [while running 'EnrichDepths']