# Tiny Telemetry Pipeline: Cleaning Game Session Logs

## Story

You’ve been asked to help a small indie game studio make sense of the data coming from their new online platform. Every time a player starts a session, a raw event is sent to a central log with inconsistent types, missing fields, and half-baked flags. The lead engineer wants a lightweight, testable Python “mini-pipeline” built from functions that can be reused later in a bigger project. Your job is to turn this messy event list into a cleaned table and a small set of key performance indicators (KPIs), using functions that call each other like a simple data-engineering pipeline.

## Tasks

### **Task 1 — Load & Basic Cleaning**
Create a function `build_raw_dataframe(raw_events)` that:

- Converts the `raw_events` list of dicts into a pandas DataFrame.  
- Safely converts `minutes_played` to numeric, handling invalid entries (`"abc"`, `None`, etc.).  
- Converts `ended_normally` into a sensible boolean/None representation.  
- Returns the resulting DataFrame.

Inspect `.head()` and `.dtypes` to confirm correctness.

---

### **Task 2 — Core Cleaning Function (Pipeline Step 1)**
Write a function `clean_sessions(df, min_minutes=5.0)` that:

- Removes or flags sessions with:
  - Missing or non-parsable play time  
  - Non-positive minutes (`<= 0`)  
- Adds a new column `crashed`:
  - `True` when `ended_normally` is False **and** `crash_error_code` is not null  
  - Otherwise `False`
- Filters sessions to only those with `minutes_played >= min_minutes`.

Return the cleaned DataFrame.

---

### **Task 3 — KPI Aggregator (Pipeline Step 2)**
Create a function `compute_kpis(clean_df, group_by="game")` that:

- Groups by a chosen dimension (`"game"` by default).  
- Computes:
  - Number of sessions  
  - Total minutes played  
  - Average minutes per session  
  - Crash rate (mean of `crashed`)  
- Returns a compact summary DataFrame.

Try calling the function with `group_by="region"` as well.

---

### **Task 4 — Optional Stretch Goal: Mini Pipeline**
Design `run_telemetry_pipeline(raw_events, min_minutes=5, group_by="game", allowed_regions=None)`:

- Calls, in sequence:
  1. `build_raw_dataframe(...)`
  2. `clean_sessions(...)`
  3. `compute_kpis(...)`
- If `allowed_regions` is not `None`, filter to only those regions before computing KPIs.
- Return:
  - KPI summary  
  - Cleaned session table (optional but useful for debugging)

---


## Data

In [55]:
raw_events = [
    {
        "session_id": "s1",
        "user_id": 101,
        "game": "SkyQuest",
        "minutes_played": "34",
        "ended_normally": "True",
        "crash_error_code": None,
        "region": "EU",
        "timestamp": "2025-12-01 18:03:10"
    },
    {
        "session_id": "s2",
        "user_id": 102,
        "game": "DungeonDive",
        "minutes_played": "7.5",
        "ended_normally": "False",
        "crash_error_code": "ERR42",
        "region": "NA",
        "timestamp": "2025-12-01 18:07:55"
    },
    {
        "session_id": "s3",
        "user_id": 103,
        "game": "SkyQuest",
        "minutes_played": None,
        "ended_normally": "True",
        "crash_error_code": None,
        "region": "EU",
        "timestamp": "2025-12-01 18:09:20"
    },
    {
        "session_id": "s4",
        "user_id": 104,
        "game": "FarmWorld",
        "minutes_played": "0",
        "ended_normally": "False",
        "crash_error_code": None,
        "region": "APAC",
        "timestamp": "2025-12-01 18:10:02"
    },
    {
        "session_id": "s5",
        "user_id": 101,
        "game": "DungeonDive",
        "minutes_played": "15",
        "ended_normally": "False",
        "crash_error_code": "ERR_NET",
        "region": "NA",
        "timestamp": "2025-12-01 18:20:44"
    },
    {
        "session_id": "s6",
        "user_id": 106,
        "game": "SkyQuest",
        "minutes_played": "abc",  # clearly bad data
        "ended_normally": "True",
        "crash_error_code": None,
        "region": "EU",
        "timestamp": "2025-12-01 18:25:00"
    },
    {
        "session_id": "s7",
        "user_id": 107,
        "game": "FarmWorld",
        "minutes_played": "52",
        "ended_normally": "False",
        "crash_error_code": "ERR_TIMEOUT",
        "region": "APAC",
        "timestamp": "2025-12-01 18:35:12"
    },
    {
        "session_id": "s8",
        "user_id": 108,
        "game": "DungeonDive",
        "minutes_played": 3,
        "ended_normally": None,   # unknown outcome
        "crash_error_code": None,
        "region": "NA",
        "timestamp": "2025-12-01 18:42:09"
    },
    {
        "session_id": "s9",
        "user_id": 109,
        "game": "SkyQuest",
        "minutes_played": "26",
        "ended_normally": "False",
        "crash_error_code": "ERR42",
        "region": "EU",
        "timestamp": "2025-12-01 18:55:33"
    },
    {
        "session_id": "s10",
        "user_id": 110,
        "game": "FarmWorld",
        "minutes_played": "11",
        "ended_normally": "True",
        "crash_error_code": None,
        "region": "APAC",
        "timestamp": "2025-12-01 19:05:03"
    },
]


## Libraries

In [56]:
import pandas as pd
import numpy as np

## Task 1
- Converts the `raw_events` list of dicts into a pandas DataFrame.  
- Safely converts `minutes_played` to numeric, handling invalid entries (`"abc"`, `None`, etc.).  
- Converts `ended_normally` into a sensible boolean/None representation.  
- Returns the resulting DataFrame.

Inspect `.head()` and `.dtypes` to confirm correctness.

In [67]:
def build_raw_dataframe(raw_events):

    # Build df

    df = pd.DataFrame(raw_events)

    # minutes played

    df['minutes_played'] = pd.to_numeric(df['minutes_played'], errors = 'coerce')

    # Ended normally

    def parse_normal(x):
        if x in ['True', True]:
            return True
        elif x in ['False', False]:
            return False
        return np.nan


    df['ended_normally'] = df['ended_normally'].apply(parse_normal)

    return df
    

In [68]:
test = build_raw_dataframe(raw_events)

In [69]:
test

Unnamed: 0,session_id,user_id,game,minutes_played,ended_normally,crash_error_code,region,timestamp
0,s1,101,SkyQuest,34.0,True,,EU,2025-12-01 18:03:10
1,s2,102,DungeonDive,7.5,False,ERR42,,2025-12-01 18:07:55
2,s3,103,SkyQuest,,True,,EU,2025-12-01 18:09:20
3,s4,104,FarmWorld,0.0,False,,APAC,2025-12-01 18:10:02
4,s5,101,DungeonDive,15.0,False,ERR_NET,,2025-12-01 18:20:44
5,s6,106,SkyQuest,,True,,EU,2025-12-01 18:25:00
6,s7,107,FarmWorld,52.0,False,ERR_TIMEOUT,APAC,2025-12-01 18:35:12
7,s8,108,DungeonDive,3.0,,,,2025-12-01 18:42:09
8,s9,109,SkyQuest,26.0,False,ERR42,EU,2025-12-01 18:55:33
9,s10,110,FarmWorld,11.0,True,,APAC,2025-12-01 19:05:03


In [70]:
test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   session_id        10 non-null     object 
 1   user_id           10 non-null     int64  
 2   game              10 non-null     object 
 3   minutes_played    8 non-null      float64
 4   ended_normally    9 non-null      object 
 5   crash_error_code  4 non-null      object 
 6   region            10 non-null     object 
 7   timestamp         10 non-null     object 
dtypes: float64(1), int64(1), object(6)
memory usage: 772.0+ bytes


## Task 2
Write a function `clean_sessions(df, min_minutes=5.0)` that:

- Removes or flags sessions with:
  - Missing or non-parsable play time  
  - Non-positive minutes (`<= 0`)  
- Adds a new column `crashed`:
  - `True` when `ended_normally` is False **and** `crash_error_code` is not null  
  - Otherwise `False`
- Filters sessions to only those with `minutes_played >= min_minutes`.

Return the cleaned DataFrame.


In [None]:
def clean_sessions(df, min_minutes=5.0):
    
    df = df.copy()

    # Play time
    
    def wrong_session(x):
        if x <= 0 or pd.isna(x):
            print('Corrupted session found. Dropping row')
            return np.nan
        else:
            return x
    
    df['minutes_played'] = df['minutes_played'].apply(wrong_session)
    df = df.dropna(subset=['minutes_played'])

    # Crashed

    df['crashed'] = (df['ended_normally'] == False) & df['crash_error_code'].notna()

    # Filter

    df = df[df['minutes_played'] >= min_minutes]

    return df


## Task 3 — KPI Aggregator (Pipeline Step 2)
Create a function `compute_kpis(clean_df, group_by="game")` that:

- Groups by a chosen dimension (`"game"` by default).  
- Computes:
  - Number of sessions  
  - Total minutes played  
  - Average minutes per session  
  - Crash rate (mean of `crashed`)  
- Returns a compact summary DataFrame.

Try calling the function with `group_by="region"` as well.

In [85]:
def compute_kpis(clean_df, group_by="game"):

    summary = clean_df.groupby(by = group_by).agg(number_sessions=("session_id", "count"),
                                                  total_played=("minutes_played", "sum"),
                                                  average_min_session=("minutes_played", "mean"),
                                                  mean_crashed = ("crashed" , "mean"))
    
    print("Summary")
    print("-------")
    print(summary)
    return summary

In [75]:
test = build_raw_dataframe(raw_events)

In [76]:
test = clean_sessions(test)

In [84]:
compute_kpis(test, group_by='region')

Summary
-------
        number_sessions  total_played  average_min_session  mean_crashed
region                                                                  
APAC                  2          63.0                31.50           0.5
EU                    2          60.0                30.00           0.5
NA                    2          22.5                11.25           1.0


## Task 4 — Optional Stretch Goal: Mini Pipeline
Design `run_telemetry_pipeline(raw_events, min_minutes=5, group_by="game", allowed_regions=None)`:

- Calls, in sequence:
  1. `build_raw_dataframe(...)`
  2. `clean_sessions(...)`
  3. `compute_kpis(...)`
- If `allowed_regions` is not `None`, filter to only those regions before computing KPIs.
- Return:
  - KPI summary  
  - Cleaned session table (optional but useful for debugging)


In [88]:
def run_telemetry_pipeline(raw_events, min_minutes=5, group_by="game", allowed_regions=None):
    # Build dataframe
    df = build_raw_dataframe(raw_events)

    # Clean sessions
    df_clean = clean_sessions(df, min_minutes=min_minutes)

    # Optional region filter
    if allowed_regions is not None:
        df_clean = df_clean[df_clean['region'].isin(allowed_regions)]

    # Compute KPIs
    summary = compute_kpis(df_clean, group_by=group_by)

    return df_clean, summary


In [89]:
run_telemetry_pipeline(raw_events)

Summary
-------
             number_sessions  total_played  average_min_session  mean_crashed
game                                                                         
DungeonDive                2          22.5                11.25           1.0
FarmWorld                  2          63.0                31.50           0.5
SkyQuest                   2          60.0                30.00           0.5


(  session_id  user_id         game  minutes_played ended_normally  \
 0         s1      101     SkyQuest            34.0           True   
 1         s2      102  DungeonDive             7.5          False   
 4         s5      101  DungeonDive            15.0          False   
 6         s7      107    FarmWorld            52.0          False   
 8         s9      109     SkyQuest            26.0          False   
 9        s10      110    FarmWorld            11.0           True   
 
   crash_error_code region            timestamp  crashed  
 0             None     EU  2025-12-01 18:03:10    False  
 1            ERR42     NA  2025-12-01 18:07:55     True  
 4          ERR_NET     NA  2025-12-01 18:20:44     True  
 6      ERR_TIMEOUT   APAC  2025-12-01 18:35:12     True  
 8            ERR42     EU  2025-12-01 18:55:33     True  
 9             None   APAC  2025-12-01 19:05:03    False  ,
              number_sessions  total_played  average_min_session  mean_crashed
 game          

# === End of Challenge ===