In [None]:
import cudf

def analyze_failed_logins(df: cudf.DataFrame, failure_threshold: int = 2) -> cudf.DataFrame:
    """
    Analyzes failed login attempts and flags users exceeding a failure threshold.

    Args:
        df (cudf.DataFrame): The input user authentication log data.
        failure_threshold (int, optional): Minimum number of failed logins to flag a user.

    Returns:
        cudf.DataFrame: A DataFrame containing flagged users and their failure counts.
    """

    # Filter only failed login attempts
    failed_logins_df = df[df["status"] == "failure"]

    # Count failures per user
    failure_counts = (
        failed_logins_df.groupby("user").size().reset_index(name="failure_count")
    )

    # Identify users exceeding the failure threshold
    flagged_users_df = failure_counts[failure_counts["failure_count"] >= failure_threshold]

    return flagged_users_df

In [None]:
input_file = 'data/simple_user_log.jsonlines'
df = cudf.read_json(input_file, lines=True)

analyze_failed_logins(df)

Unnamed: 0,user,failure_count
2,user123,2


In [None]:
analyze_failed_logins(df, failure_threshold=1)

Unnamed: 0,user,failure_count
0,user864,1
1,user789,1
2,user123,2
3,user456,1


In [None]:
import typing
import logging

from IPython.display import Image

import cudf

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline

from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from add_metadata_and_tasks_stage import AddMetadataAndTaskStage

from morpheus.messages import ControlMessage, MessageMeta

from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage

from morpheus.cli.register_stage import register_stage

from morpheus.utils.logger import configure_logging, reset_logging

import mrc
from mrc.core import operators as ops

In [None]:
metadata = {
    "needs_failed_login_analysis": True
}

In [None]:
tasks = {
    "analyze_failed_logins": {"failure_threshold": 1}
}

In [None]:
config = Config()

pipeline = LinearPipeline(config)

pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

# Use DeserializeStage to convert to ControlMessage messages.
pipeline.add_stage(DeserializeStage(config))

# Use custom AddMetadataAndTaskStage to add our metadata and tasks to the control messages.
pipeline.add_stage(AddMetadataAndTaskStage(config, metadata=metadata, tasks=tasks))

in_mem_sink = pipeline.add_stage(InMemorySinkStage(config))

pipeline.build()
await pipeline.run_async()

In [None]:
cm = in_mem_sink.get_messages()[0]

In [None]:
cm.get_metadata()

{'needs_failed_login_analysis': True}

In [None]:
cm.get_tasks()

{'analyze_failed_logins': [{'failure_threshold': 1}]}

In [None]:
cm.payload().get_data()

Unnamed: 0,timestamp,user,ip_address,request_time,status,error_message
0,2025-02-01T10:15:30Z,user123,192.168.1.10,200.45,success,
1,2025-02-01T10:17:00Z,user123,192.168.1.20,150.55,failure,Invalid credentials
2,2025-02-01T10:18:10Z,user456,10.0.0.5,180.6,success,
3,2025-02-01T10:19:25Z,user789,192.168.1.30,215.25,failure,Timeout
4,2025-02-01T10:20:00Z,user456,10.0.0.6,120.1,success,
5,2025-02-01T10:22:30Z,user123,192.168.1.40,175.35,failure,Access denied
6,2025-02-01T10:23:45Z,user321,192.168.1.50,205.5,success,
7,2025-02-01T10:25:05Z,user864,192.168.1.60,190.15,failure,Invalid session
8,2025-02-01T10:26:20Z,user123,192.168.1.70,210.8,success,
9,2025-02-01T10:27:40Z,user456,10.0.0.7,160.95,failure,Account locked


---

In [None]:
@register_stage("analyze-failed-logins")
class AnalyzeFailedLogins(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):

    @property
    def name(self) -> str:
        return "analyze-failed-logins"

    # Note that this custom stage expects message type `ControlMessage`.
    def accepted_types(self) -> tuple:
        return (ControlMessage, )

    def supports_cpp_node(self) -> bool:
        return False

    def on_data(self, message: ControlMessage) -> ControlMessage:
        if message.has_metadata("needs_failed_login_analysis") and message.get_metadata()["needs_failed_login_analysis"]:
            if message.has_task("analyze_failed_logins"):
                task_args = message.get_tasks()["analyze_failed_logins"][0]
                df = message.payload().get_data()
                analysis = analyze_failed_logins(df, **task_args)
                
                mm = MessageMeta(analysis)
                cm = ControlMessage()
                cm.payload(mm)
                
                return cm
            
        return message

    def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
        node = builder.make_node(self.unique_name, ops.map(self.on_data))
        builder.make_edge(input_node, node)

        return node

In [None]:
config = Config()

pipeline = LinearPipeline(config)

pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

pipeline.add_stage(DeserializeStage(config))

pipeline.add_stage(AddMetadataAndTaskStage(config, metadata=metadata, tasks=tasks))

# Use our new custom stage.
pipeline.add_stage(AnalyzeFailedLogins(config))

in_mem_sink = pipeline.add_stage(InMemorySinkStage(config))

pipeline.build()

In [None]:
reset_logging()
configure_logging(log_level=logging.DEBUG)

====Starting Pipeline====[0m
====Pipeline Started====[0m
====Building Segment: linear_segment_0====[0m
Added source: <from-file-4; FileSourceStage(filename=data/simple_user_log.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, filter_null_columns=None, parser_kwargs=None)>
  └─> morpheus.MessageMeta[0m
Added stage: <deserialize-5; DeserializeStage(ensure_sliceable_index=True, task_type=None, task_payload=None)>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage[0m
Added stage: <add-metadata-and-task-6; AddMetadataAndTaskStage(metadata={'needs_failed_login_analysis': True}, tasks={'analyze_failed_logins': {'failure_threshold': 1}})>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m
Added stage: <analyze-failed-logins-7; AnalyzeFailedLogins()>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m
Added stage: <to-mem-8; InMemorySinkStage()>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m
====Building Segment Complete!====

In [None]:
await pipeline.run_async()

In [None]:
cm = in_mem_sink.get_messages()[0]

In [None]:
cm.get_metadata()

{}

In [None]:
cm.get_tasks()

{}

In [None]:
cm.payload().get_data()

Unnamed: 0,user,failure_count
0,user864,1
1,user789,1
2,user123,2
3,user456,1


In [None]:
config = Config()

pipeline = LinearPipeline(config)

pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

pipeline.add_stage(DeserializeStage(config))

# NOTE: For this run we are not passing in any metadata (or tasks).
pipeline.add_stage(AddMetadataAndTaskStage(config))

# With no metadata provided, we would expect this stage to act as a passthrough.
pipeline.add_stage(AnalyzeFailedLogins(config))

in_mem_sink = pipeline.add_stage(InMemorySinkStage(config))

pipeline.build()

In [None]:
await pipeline.run_async()

In [None]:
cm = in_mem_sink.get_messages()[0]

In [None]:
cm.get_metadata()

{}

In [None]:
cm.get_tasks()

{}

In [None]:
cm.payload().get_data()

Unnamed: 0,timestamp,user,ip_address,request_time,status,error_message
0,2025-02-01T10:15:30Z,user123,192.168.1.10,200.45,success,
1,2025-02-01T10:17:00Z,user123,192.168.1.20,150.55,failure,Invalid credentials
2,2025-02-01T10:18:10Z,user456,10.0.0.5,180.6,success,
3,2025-02-01T10:19:25Z,user789,192.168.1.30,215.25,failure,Timeout
4,2025-02-01T10:20:00Z,user456,10.0.0.6,120.1,success,
5,2025-02-01T10:22:30Z,user123,192.168.1.40,175.35,failure,Access denied
6,2025-02-01T10:23:45Z,user321,192.168.1.50,205.5,success,
7,2025-02-01T10:25:05Z,user864,192.168.1.60,190.15,failure,Invalid session
8,2025-02-01T10:26:20Z,user123,192.168.1.70,210.8,success,
9,2025-02-01T10:27:40Z,user456,10.0.0.7,160.95,failure,Account locked
