In [1]:
from __future__ import absolute_import
import argparse
import logging
import re
import json
import apache_beam as beam
import numpy as np
from datetime import datetime
from apache_beam.io import ReadFromText
from apache_beam.transforms import window
from apache_beam.io import WriteToText
from apache_beam.io import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.dataframe.io import read_csv
from beam_nuggets.io import kafkaio


TIME_FORMAT = '%Y-%m-%d %H:%M:%S'



In [2]:
def print_windows(element, window=beam.DoFn.WindowParam,  pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
    logging.getLogger().error(f"PRINT WINDOW HAS SENT")
    print(window)
    print(pane_info)
    print(timestamp)
    print(element)
    print('-----------------')

In [3]:
class time_windowed_data(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam,  pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
        logging.getLogger().info(f"{timestamp, element}")
        yield timestamp, element

In [4]:
def to_unix_time(time_str, date_time_format=TIME_FORMAT):
    logging.getLogger().info(f"{time_str}")
    ts = datetime.strptime(time_str, date_time_format)
    return float(ts.strftime('%s'))

In [5]:
class KeyValuePair(beam.DoFn):
      # Use classes and functions to perform transformations on your PCollections
      # Yield the element(s) needed as input for the next transform
      def process(self, element):
            list_element = element.split(",")
            logging.getLogger().info(f"{list_element}")
            element = (list_element[2], {"timestamp": to_unix_time(list_element[0]), "bytes": float(list_element[1]), "zone":list_element[-1]})
            yield element

In [6]:
class KeyValuePair_json(beam.DoFn):
      # Use classes and functions to perform transformations on your PCollections
      # Yield the element(s) needed as input for the next transform
      def process(self, element):
            element = json.loads(element[1])
            logging.getLogger().info(f"{element}")
            element = (element['user_id'], {"timestamp": to_unix_time(element["timestamp"]), "bytes": float(element["bytes"]), "zone": element["zone"]})
            yield element

In [7]:
class MeanBytes(beam.DoFn):
    def process(self, element):
        logging.getLogger().info(f"{type(element[1][1])}")
        logging.getLogger().info(f"{type(element[1][0])}")
        logging.getLogger().info(f"{element[0]}")
        yield (element[1][0], (np.array(element[1][1]).mean(), element[1][1]))

In [8]:
class OutageAlert(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam,  pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
        logging.getLogger().info(f"{element}")
        if element[1][0] == 0:
            logging.getLogger().error(f"{timestamp.to_utc_datetime()} --user_id:{element[0]} experienced 5 minute average traffic of {element[1][0]} bytes --Internet Outage --last readings {element[1][1]}")
            yield (element[0], (window, timestamp, element[1]))

In [11]:
options = PipelineOptions(direct_num_workers=8, direct_running_mode='multi_threading')
# options = PipelineOptions()
input_path = 'syn-internet-usage.csv'
logging.getLogger().setLevel(logging.ERROR)
consumer_config = {"topic": "intelliblock_stream",
                   "bootstrap_servers": "localhost:9092"}

In [12]:
with beam.Pipeline(options=options) as p:
       elements = (p
        | "read data into dataframe" >> ReadFromText(input_path, skip_header_lines=1)
        | "keyPairDo" >> beam.ParDo(KeyValuePair())
        | 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
        | beam.Map(lambda e: (e[0], e[1]['bytes']))
       )
       
       results = (
           elements
           | "windowing" >> beam.WindowInto(
               beam.window.SlidingWindows(5*60,3*60),
               trigger=beam.trigger.Repeatedly(beam.trigger.AfterAny(beam.trigger.AfterProcessingTime(1*60),
                                                                     beam.trigger.AfterCount(2))),
               accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
           | "combine by key" >> beam.CombinePerKey(beam.combiners.ToListCombineFn())
           |  beam.ParDo(time_windowed_data())
           | "get bytes" >> beam.ParDo(MeanBytes())
           | "Alert if user experienc outage" >> beam.ParDo(OutageAlert()))
       # results | beam.ParDo(print_windows)

AttributeError: 'NoneType' object has no attribute 'time'

In [None]:
with beam.Pipeline(options=options) as p:
    elements = (
           p
           | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config,value_decoder=bytes.decode)
           | "keyPairDo" >> beam.ParDo(KeyValuePair_json())
           | 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
           | beam.Map(lambda e: (e[0], e[1]['bytes'])))
    results = (
        elements
           | "windowing" >> beam.WindowInto(
               beam.window.FixedWindows(5),
               trigger=beam.trigger.Repeatedly(beam.trigger.AfterAny(beam.trigger.AfterProcessingTime(1),beam.trigger.AfterCount(2))),
               accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
           | "combine by key" >> beam.CombinePerKey(beam.combiners.ToListCombineFn())
#            |  beam.ParDo(time_windowed_data())
#            | "get bytes" >> beam.ParDo(MeanBytes())
#            | "Alert if user experienc outage" >> beam.ParDo(OutageAlert())
           | beam.ParDo(print_windows)
    )