# Advanced Pipeline Operations

Beyond transforming individual records or computing summaries, **Flow** offers a suite of advanced methods for manipulating and combining streams of data.

These operations help you:

- Sort records by timestamp or xG
- Assign rankings or row numbers
- Join one dataset to another
- Eliminate duplicates or sample subsets

💡 Some of these operations (like `.sort()` and `.join()`) **materialize the entire stream into memory**

In [2]:
from penaltyblog.matchflow import Flow
from pprint import pprint

matches = [
    {
        "match_id": 123,
        "competition_name": "Premier League",
        "home_team": "Manchester City",
        "away_team": "Arsenal",
        "match_date": "2023-10-08",
    },
    {
        "match_id": 456,
        "competition_name": "Champions League",
        "home_team": "Real Madrid",
        "away_team": "Bayern Munich",
        "match_date": "2023-10-10",
    },
]

events = [
    {
        "event_id": 1,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:01:30.500",
        "type_name": "Pass",
        "player_name": "Kevin De Bruyne",
        "location": [60.1, 40.3],
        "pass_recipient_name": "Erling Haaland",
        "pass_outcome_name": "Complete",
        "team_name": "Manchester City",
    },
    {
        "event_id": 2,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:01:32.100",
        "type_name": "Shot",
        "player_name": "Erling Haaland",
        "location": [85.5, 50.2],
        "shot_xg": 0.05,
        "shot_outcome_name": "Goal",
        "team_name": "Manchester City",
    },
    {
        "event_id": 3,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:02:05.000",
        "type_name": "Duel",
        "player_name": "Rodri",
        "duel_type_name": "Tackle",
        "duel_outcome_name": "Won",
        "team_name": "Manchester City",
    },
    {
        "event_id": 4,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:02:10.000",
        "type_name": "Pass",
        "player_name": "Kevin De Bruyne",
        "location": [70.0, 25.0],
        "pass_recipient_name": "Jack Grealish",
        "pass_outcome_name": "Incomplete",
        "team_name": "Manchester City",
    },
    {
        "event_id": 5,
        "match_id": 123,
        "period": 1,
        "timestamp": "00:03:00.000",
        "type_name": "Shot",
        "player_name": "Bukayo Saka",
        "location": [75.0, 60.0],
        "shot_xg": 0.01,
        "shot_outcome_name": "Post",
        "team_name": "Arsenal",
    },
]

## Sorting and Ordering

### `.sort()`: Sort Records

Sort the flow by a specific field.

In [2]:
sorted_events = Flow(events).sort(by="timestamp")

Sort shots by xG (highest first):

In [3]:
shots_flow = (
    Flow(events)
    .filter(lambda r: r.get("type_name") == "Shot")
    .sort(by="shot_xg", reverse=True)
)

pprint(shots_flow.first())

{'event_id': 2,
 'location': [85.5, 50.2],
 'match_id': 123,
 'period': 1,
 'player_name': 'Erling Haaland',
 'shot_outcome_name': 'Goal',
 'shot_xg': 0.05,
 'team_name': 'Manchester City',
 'timestamp': '00:01:32.100',
 'type_name': 'Shot'}


Sort the flow by multiple fields.

In [3]:
shots_flow = Flow(events).sort(by=["team_name", "type_name"], reverse=True)

pprint(shots_flow.first())

{'event_id': 2,
 'location': [85.5, 50.2],
 'match_id': 123,
 'period': 1,
 'player_name': 'Erling Haaland',
 'shot_outcome_name': 'Goal',
 'shot_xg': 0.05,
 'team_name': 'Manchester City',
 'timestamp': '00:01:32.100',
 'type_name': 'Shot'}


💡 `.sort()` materializes the entire flow. 

### `.row_number()`: Assign Rankings

Adds a new column (e.g. "xg_rank") based on sort order.

In [4]:
ranked = (
    Flow(events)
    .filter(lambda r: r["type_name"] == "Shot")
    .row_number(by="shot_xg", new_field="xg_rank", reverse=True)
)

pprint(ranked.head(2).collect())

[{'event_id': 2,
  'location': [85.5, 50.2],
  'match_id': 123,
  'period': 1,
  'player_name': 'Erling Haaland',
  'shot_outcome_name': 'Goal',
  'shot_xg': 0.05,
  'team_name': 'Manchester City',
  'timestamp': '00:01:32.100',
  'type_name': 'Shot',
  'xg_rank': 1},
 {'event_id': 5,
  'location': [75.0, 60.0],
  'match_id': 123,
  'period': 1,
  'player_name': 'Bukayo Saka',
  'shot_outcome_name': 'Post',
  'shot_xg': 0.01,
  'team_name': 'Arsenal',
  'timestamp': '00:03:00.000',
  'type_name': 'Shot',
  'xg_rank': 2}]


💡 Records where the `by` field is `None` are always ranked last and receive `None` for the row number.

## Taking Subsets

### `.limit(n)` / `.head(n)`: Take First n Records

```python
first_five = Flow(events).limit(5)
# or:
first_five = Flow(events).head(5)
```

### `.take_last(n)`: Take Last n Records

```python
last_three = Flow(events).take_last(3)
```

## Sampling

### `.sample(n, seed=None)`: Reservoir Sampling

Takes exactly n items randomly from the stream.

```python
random_sample = Flow(events).sample(n=3, seed=42)
```

💡 If `n` is larger than the number of records, `sample()` returns all records.

### `.sample_frac(frac, seed=None)`: Approximate Sampling

Includes each record with a probability `frac` (e.g. 0.2 = 20%).

```python
approx_sample = Flow(events).sample_frac(frac=0.2, seed=123)
```

## Combining and Joining Data

### `.concat()`: Combine Multiple Flows

```python
flow1 = Flow.statsbomb.from_github_file(match_id=123, type="events")
flow2 = Flow.statsbomb.from_github_file(match_id=456, type="events")

combined = flow1.concat(flow2)
```

💡 Records are streamed lazily from each input.

### `.join()`: Enrich Records from Another Flow

Use `.join()` to combine two datasets by matching key fields. `other` can be a `Flow` or a iterable of `dicts`.

### Example: Adding competition name to our events data


In [7]:
events_flow = Flow(events)
matches_flow = Flow(matches)

events_with_matchesflow = events_flow.join(
    other=matches_flow,
    left_on="match_id",  # Key in events_flow
    right_on="match_id",  # Key in match_info_flow
    fields=["competition_name"],  # Only bring this field from match_info_flow
)

results = events_with_matchesflow.collect()

# We've added the competition name to each event
pprint(results[0])

{'competition_name': 'Premier League',
 'event_id': 1,
 'location': [60.1, 40.3],
 'match_id': 123,
 'pass_outcome_name': 'Complete',
 'pass_recipient_name': 'Erling Haaland',
 'period': 1,
 'player_name': 'Kevin De Bruyne',
 'team_name': 'Manchester City',
 'timestamp': '00:01:30.500',
 'type_name': 'Pass'}


💡 This adds "competition_name" to each event based on "match_id".

## Inner Join

Only keep events where a match is found:

In [8]:
events_flow = Flow(events)
matches_flow = Flow(matches)

events_with_matchesflow = events_flow.join(
    other=matches_flow,
    left_on="match_id",  # Key in events_flow
    right_on="match_id",  # Key in match_info_flow
    fields=["competition_name"],  # Only bring this field from match_info_flow
    how="inner",
)

results = events_with_matchesflow.collect()

pprint(results[0])

{'competition_name': 'Premier League',
 'event_id': 1,
 'location': [60.1, 40.3],
 'match_id': 123,
 'pass_outcome_name': 'Complete',
 'pass_recipient_name': 'Erling Haaland',
 'period': 1,
 'player_name': 'Kevin De Bruyne',
 'team_name': 'Manchester City',
 'timestamp': '00:01:30.500',
 'type_name': 'Pass'}


💡 The right-hand side (other) is fully loaded into memory, while the left-hand side is streamed lazily.

## Handling Duplicates

`.drop_duplicates(...)`: **Remove Duplicate Records**

```python
# Remove fully identical records
unique_events = Flow(events).drop_duplicates()

# Deduplicate by player and event type
dedup = Flow(events).drop_duplicates("player_name", "type_name", keep="first")
```

- keep="first" (default): keeps first occurrence
- keep="last": keeps last occurrence
- keep=False: removes all duplicates entirely

💡 Internally tracks seen values - may use memory if many unique combinations.

`.unique(...)`: **Unique Field Values or Combinations**

```python
unique_players = Flow(events).unique("player_name")
player_names = [r["player_name"] for r in unique_players.collect()]
```

Or get combinations:

```python
unique = Flow(events).unique("team_name", "type_name")
```

💡 Like `.drop_duplicates()`, this holds keys in memory. For high-cardinality fields, `.drop_duplicates()` could consume significant memory so use with care on very large datasets.

## Summary

These advanced operations give you powerful ways to:

- Sort and rank your data
- Merge datasets via join
- Sample large streams efficiently
- Clean up duplicates or extract distinct values

💡 Be mindful of which methods consume memory by loading the whole stream. 

## What’s Next?

Next, we’ll look at saving and loading data from disk using formats like JSON, JSON Lines, and folders of files.
