<a href="REF: https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection/blob/master/README.md">Reference1<a/>  <br>
<a href="https://faker.readthedocs.io/en/master/">Reference2<a/> 

In [None]:
import apache_beam as beam
import time

In [None]:
class UniqueCombine(beam.CombineFn):
    def create_accumulator(self):
        return set()
    
    def add_input(self, accumulator, element):
        accumulator.add(element)
        return accumulator
    
    def merge_accumulators(self, accumulators):
        return set.union(*accumulators)
    
    def extract_output(self, accumulator):
        return len(accumulator)
    
def pick(n):
    return n[0]

In [None]:
from datetime import datetime

In [None]:
datetime.now().isoformat()

In [None]:
with beam.Pipeline() as pipeline:
    total_unique_elements = (
        pipeline
        | 'Create produce' >> beam.Create(
            [{"name":"idris","salary":500,"test":5}, {"name":"idris","salary":500,"test":5}, {"name":"idris","salary":500,"test":6}, {"name":"ade","salary":900,"test":5}])
        | 'row' >> beam.Map(lambda row: beam.Row(name=row['name'],salary=row["salary"],test=row["test"]))
        | 'Pair with 1' >> beam.GroupBy("name", "salary")\
        .aggregate_field(
            "name", UniqueCombine(), "count"
        ).aggregate_field(
            "test", UniqueCombine(), "test"
        ).aggregate_field(
            "name", pick, "avg_salary"
        )
        )
    total_unique_elements | 'Add timestamp' >> beam.ParDo(AddTimestamp()).with_output_types(r) | beam.Map(print)

In [None]:
with beam.Pipeline() as pipeline:
    total_unique_elements = (
        pipeline
        | 'Create produce' >> beam.Create(
            [{"name":"idris","salary":500,"test":5}, {"name":"idris","salary":500,"test":5}, {"name":"idris","salary":500,"test":6}, {"name":"ade","salary":900,"test":5}])
        | 'row' >> beam.Map(lambda row: beam.Row(name=row['name'],salary=row["salary"],test=row["test"]))
        )
    total_unique_elements | 'Add timestamp' >> beam.ParDo(AddTimestamp()) | beam.Map(print)

In [None]:
import ipaddress

In [None]:
from faker import Faker

faker = Faker()

In [None]:
maskString = "255.255.252.0"

In [None]:
import ipaddress

def ip_to_cidr(ip, subnet_mask):
    # Parse the IP address and subnet mask
    ip_network = ipaddress.IPv4Network(f"{ip}/{subnet_mask}", strict=False)
    
    # Get the CIDR notation
    cidr_notation = str(ip_network)
    
    return cidr_notation

# Example usage
ip = "192.168.1.10"
subnet_mask = "255.255.255.0"

cidr = ip_to_cidr(ip, subnet_mask)
print(cidr) 

In [None]:
ip = ipaddress.IPv4Address(ip)

In [None]:
pipeline = beam.Pipeline()
raw_events = (
    pipeline
    | 'Read from PubSub' >> beam.Create(['{"foo": "bar"}' for _ in range(10)])
    | 'Parse events' >> beam.ParDo(EventParser()).with_outputs('valid', 'invalid')
)

(raw_events.invalid 
 |'Log invalid events' >> beam.io.fileio.WriteToFiles("data/invalid_events",shards=2,max_writers_per_bundle=2))

(raw_events.valid 
 | "time" >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
 | 'Add timestamps' >> beam.ParDo(AddTimestamp())
 | "add key" >> beam.Map(print))

pipeline.run()

In [None]:

import time
import json
import logging
import random
import apache_beam as beam
from utils.custom import NetworkPool, UserObject, JsonEvent
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.options.pipeline_options import PipelineOptions
        
def to_json(event):
    return json.dumps(event).encode('utf-8')        

class AgggregateEvent(beam.DoFn):
    def process(self, element):
        
        network = self.network_pool.get_network()
        user = self.user_obj.get_user()
        num_events = random.randint(5, self.max_events_per_session)
        for _ in range(num_events):
            event = JsonEvent.generate(user, network)
            yield event
                        
def run(args, beam_args):
    options = PipelineOptions(beam_args, save_main_session=True, streaming=True)
    pipeline = beam.Pipeline(options=options)
    (
        pipeline
        | "Trigger" >> PeriodicImpulse(start_timestamp=time.time(), fire_interval=(60/args.qps))
        | "Generate Events" >> beam.ParDo(EventGenerator())
        | "JSONIFY" >> beam.Map(to_json)
        | "Write to PubSub" >> beam.io.WriteToPubSub(args.topic)
    )
    return pipeline.run()

In [None]:
# raw data

{
  "subscriberId":(11111111111,99999999999)
  "srcIP": "ipv4"
  "dstIP": "subnet"
  "srcPort": (1000,5000)
  "dstPort": (1000,5000)
  "txBytes": (0,1000000000)
  "rxBytes": (0,1000000000)
  "tcpFlag": (0,65)
  "startTime": "timestamp"
  "endTime": "timestamp"
  "protocolName": "tcp|udp|http"
  "protocolNumber": (0,255)
  "geoCountry": "country"
  "geoCity": "city"
  "latitude": (0,90)
  "longitude": (0,180)
}

# transformed data
{
  "transaction_time": "2019-10-27 23:22:17.848000",
  "subscriber_id": "100",
  "dst_subnet": "12.0.1.2/22",
  "number_of_unique_ips": "1",
  "number_of_unique_ports": "1",
  "number_of_records": "2",
  "max_tx_bytes": "15",
  "min_tx_bytes": "10",
  "avg_tx_bytes": "12.5",
  "max_rx_bytes": "40",
  "min_rx_bytes": "40",
  "avg_rx_bytes": "40.0",
  "max_duration": "100",
  "min_duration": "9",
  "avg_duration": "54.5"
}

# model features
{  "number_of_records": "2",
  "max_tx_bytes": "15",
  "min_tx_bytes": "10",
  "avg_tx_bytes": "12.5",
  "max_rx_bytes": "40",
  "min_rx_bytes": "40",
  "avg_rx_bytes": "40.0",
  "max_duration": "100",
  "min_duration": "9",
  "avg_duration": "54.5"
  }

In [None]:
# python3 pipeline.py --streaming --project "electric-armor-395015" --topic "projects/electric-armor-395015/topics/test" --qps 10 --runner "DataflowRunner" --region europe-west2 temp_location gs://my_terraform_state_bucket/temp staging_location gs://my_terraform_state_bucket/staging

In [None]:
# gcloud pubsub subscriptions pull test-sub --auto-ack --limit 1

In [231]:
help(beam.io.ReadFromPubSub)

Help on class ReadFromPubSub in module apache_beam.io.gcp.pubsub:

class ReadFromPubSub(apache_beam.transforms.ptransform.PTransform)
 |  ReadFromPubSub(topic=None, subscription=None, id_label=None, with_attributes=False, timestamp_attribute=None)
 |  
 |  A ``PTransform`` for reading from Cloud Pub/Sub.
 |  
 |  Method resolution order:
 |      ReadFromPubSub
 |      apache_beam.transforms.ptransform.PTransform
 |      apache_beam.typehints.decorators.WithTypeHints
 |      apache_beam.transforms.display.HasDisplayData
 |      typing.Generic
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, topic=None, subscription=None, id_label=None, with_attributes=False, timestamp_attribute=None)
 |      Initializes ``ReadFromPubSub``.
 |      
 |      Args:
 |        topic: Cloud Pub/Sub topic in the form
 |          "projects/<project>/topics/<topic>". If provided, subscription must be
 |          None.
 |        subscription: Existing Cloud Pub/Sub subscription to us

In [251]:
import random
from faker import Faker
from typing import NamedTuple
from datetime import datetime, timedelta

client = Faker()

class Network(NamedTuple):
    ipv4: str
    port: int
    protocol_name: str
    protocol_num: int
    
class User(NamedTuple):
    subscriber_id: str
    ipv4: str
    port: int

class NetworkPool:
    def __init__(self, num_dest_ip=10000):
        self._client = client
        self._num_dest_ip = num_dest_ip
        self._protocols = [("TCP", 6), ("UDP", 17), ("HTTP", 80), ("HTTPS", 443)]
        self._ip = self._generate_ipv4()
        
    def _ipv4(self):
        return random.choice(self._ip)
     
    def _port(self):
        return random.randint(1000, 5000)
    
    def _protocol_info(self):
        return random.choice(self._protocols)
    
    def _generate_ipv4(self):
        return [self._client.ipv4_private() for _ in range(self._num_dest_ip)]
    
    def get_network(self):
        protocol = self._protocol_info()
        return Network(self._ipv4(), self._port(), protocol[0], protocol[1])

class UserObject:
    def __init__(self):
        self.client = client
        
    def _subscriber_id(self):
        return self.client.uuid4()
    
    def _ipv4(self):
        return self.client.ipv4()
    
    def _port(self):
        return random.randint(1000, 5000)
    
    def get_user(self):
        return User(self._subscriber_id(), self._ipv4(), self._port())
    
class JsonEvent:
    max_request_bytes = 5000
    allowed_lag_sec = 10
    tcp_flag = ["SYN", "ACK", "FIN", "RST", "PSH", "URG"]
    
    @classmethod
    def generate(cls, user, network, anomaly):
        start_time = datetime.now()
        time_diff = random.uniform(0, cls.allowed_lag_sec)
        end_time = start_time + timedelta(seconds=time_diff)
        return {"subscriberId": user.subscriber_id,
                "srcIP": user.ipv4,
                "srcPort": user.port,
                "dstIP": network.ipv4,
                "dstPort": network.port,
                "txBytes": cls._normalized_bytes(time_diff, anomaly),
                "rxBytes": cls._normalized_bytes(time_diff, anomaly),
                "startTime": start_time.isoformat(),
                "endTime": end_time.isoformat(),
                "tcpFlag": random.choice(cls.tcp_flag),
                "protocolName": network.protocol_name,
                "protocolNumber": network.protocol_num}
        
    @classmethod
    def _normalized_bytes(cls, lag_time, anomaly):
        random_byte = random.uniform(10, 100)
        normalized_byte = int(min((random_byte * lag_time), cls.max_request_bytes))
        return normalized_byte if not anomaly else normalized_byte * 10

In [276]:

import time
import json
import logging
import random
import apache_beam as beam
# from utils.custom import NetworkPool, UserObject, JsonEvent
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.options.pipeline_options import PipelineOptions

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class EventGenerator(beam.DoFn):
    def __init__(self, anomaly, max_events_per_session=20, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_events_per_session = max_events_per_session
        self.anomaly = anomaly
        self.network_pool = NetworkPool()
    
    def setup(self):
        self.user_obj = UserObject()    

    def process(self, element):
        network = self.network_pool.get_network()
        user = self.user_obj.get_user()
        num_events = random.randint(5, self.max_events_per_session)
        logger.debug(f"Generating {num_events} events for user {user.subscriber_id} on network {network.ipv4}")
        for _ in range(num_events):
            event = JsonEvent.generate(user, network, self.anomaly)
            yield event

def to_json(event):
    return json.dumps(event).encode('utf-8')

def run():
    options = PipelineOptions(save_main_session=True, streaming=True)
    pipeline = beam.Pipeline()
    (
        pipeline
        | "Trigger" >> PeriodicImpulse(fire_interval=(60/5))
        | "Generate Events" >> beam.ParDo(EventGenerator(anomaly=True))
        | "JSONIFY" >> beam.Map(to_json)
        | "Write to PubSub" >> beam.Map(print)
    )
    return pipeline.run()

In [None]:
import apache_beam as beam

# Define the PubSub subscription and GCS bucket
subscription = "projects/my-project/subscriptions/my-subscription"
bucket = "gs://my-bucket"

# Define the Cloud Dataflow pipeline
pipeline = beam.Pipeline(runner="DirectRunner")

# Read from PubSub and write to GCS
(pipeline
  | "Read from PubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=subscription)
  | "Transform data" >> beam.ParDo(DoFn())
  | "Write to GCS" >> beam.io.WriteToText(bucket))
