##Scenario:

We simulated logs from a website, containing user IDs, timestamps, and the type of action they performed (e.g., "login", "view_item", "purchase"). The main objectives were:

* Count the number of each action type within a certain window.
* Identify users who performed a "login" action followed by a "purchase" action within a 10-minute window.
* Filter out any bots, assumed to be users with more than 100 actions within a 10-minute window.

##Apache Beam Features Demonstrated:

* Pipeline Creation: Established the foundation to build the data processing tasks.

* Reading Data with IO: Utilized the beam.Create() method to ingest mock log data into the pipeline.

* ParDo and DoFn: Demonstrated parallel processing by creating custom functions (DoFn) like FilterBots and FindLoginThenPurchase.

* Windowing: Partitioned data into fixed 10-minute windows using beam.WindowInto(window.FixedWindows(600)).

* Triggers: Introduced triggers to control when data in a window becomes available for downstream operations. Used a combination of AfterCount and AfterWatermark to specify conditions when the window data should be processed.

* GroupByKey: Grouped data by user ID, allowing us to process all actions performed by each user within a window.

* Composite Transforms: Created a composite transform CountWords to demonstrate how multiple transforms can be grouped into one logical operation.

* Filtering and Mapping: Utilized beam.Map() and custom filtering functions to transform and filter data based on specific criteria.

## Outcomes:

1. Action Counts: The pipeline outputted counts for each action type within the specified window.

2. User Identification: Identified and printed users who performed a "login" action followed by a "purchase" within the windowed timeframe.

3. Bot Filtering: Successfully filtered out users (bots) performing an unusually high number of actions within the window.

In [4]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.51.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m63.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam)
  Downloading orjson-3.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.7/138.7 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m16.5 MB/s[0m eta [36m0:00:0

In [13]:
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import PTransform, ParDo, DoFn, window

In [14]:
class FilterBots(DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        user, actions = element
        if len(actions) <= 100:
            yield user, actions

In [15]:
class FindLoginThenPurchase(DoFn):
    def process(self, element):
        user, actions = element
        actions.sort(key=lambda x: x[1])
        for i in range(len(actions) - 1):
            if actions[i][0] == 'login' and actions[i + 1][0] == 'purchase' and (actions[i + 1][1] - actions[i][1]) <= 600:
                yield user

In [16]:
def main():
    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(['--allow_unsafe_triggers'])) as pipeline:
        raw_data = [
            ("user1", "login", 10),
            ("user1", "view_item", 15),
            ("user1", "purchase", 30),
            ("user2", "login", 50),
            ("user2", "purchase", 55),
            ("bot1", "view_item", 10)] + [("bot1", "view_item", i) for i in range(11, 112)]

        actions = (
            pipeline
            | 'ReadData' >> beam.Create(raw_data)
            | 'AssignTimestamps' >> beam.Map(lambda x: window.TimestampedValue(x, x[2]))
            | 'WindowInto' >> beam.WindowInto(
                window.FixedWindows(600),
                trigger=beam.trigger.AfterAny(beam.trigger.AfterCount(5), beam.trigger.AfterWatermark()),
                accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
            )
        )


        ''' PairWithOne: We transform our data into key-value pairs where the key is the action type and the value is 1.
            GroupAndSum: We use CombinePerKey(sum) to group our data by action type and sum up the values. This gives us the count of each action type.
            PrintActionCounts: We print out the results. '''

        action_counts = (
            actions
            | 'PairWithOne' >> beam.Map(lambda x: (x[1], 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)
            | 'PrintActionCounts' >> beam.Map(print)
        )

        ''' ToUserAction: We transform our data into key-value pairs where the key is the user ID and the value is a tuple containing the action type and timestamp.
            GroupByUser: We group our data by user ID, so we have a list of all actions performed by each user within the 10-minute window.
            FilterBots: We filter out any users (bots) that performed more than 100 actions in the 10-minute window.
            FindLoginThenPurchase: We iterate through each user's actions and find sequences of "login" followed by "purchase" within the 10-minute window.
            PrintUsers: We print out the user IDs that match the criteria. '''

        users_login_then_purchase = (
            actions
            | 'ToUserAction' >> beam.Map(lambda x: (x[0], (x[1], x[2])))
            | 'GroupByUser' >> beam.GroupByKey()
            | 'FilterBots' >> ParDo(FilterBots())
            | 'FindLoginThenPurchase' >> ParDo(FindLoginThenPurchase())
            | 'PrintUsers' >> beam.Map(lambda x: print(f"User {x} made a login followed by a purchase within 10 minutes"))
        )

if __name__ == "__main__":
    main()



('login', 2)
('view_item', 103)
('purchase', 2)
User user2 made a login followed by a purchase within 10 minutes


## Working

1. We first create a simulated log with users and a bot. Each log entry contains a user ID, an action, and a timestamp.
2. We apply timestamps to each log entry and window the data into 10-minute windows.
3.We apply triggers that for every 10-minute window, if there are at least 5 actions, the window's data will be processed immediately after the 5th action. If the window doesn't accumulate 5 actions, its data will be processed at the end of the 10-minute duration.
4. The action_counts pipeline counts the occurrences of each action type within the window.
5. The users_login_then_purchase pipeline first groups actions by user. Then, it filters out bots using the FilterBots ParDo. After that, it uses the FindLoginThenPurchase ParDo to find users who made a login action followed by a purchase within 10 minutes.