In [2]:
import pandas as pd
import pathway as pw

In [3]:
class InputSchema(pw.Schema):
    user_id: int
    date: str
    steps: int
    calories_burned: float
    distance_km: float
    active_minutes: int
    sleep_hours: float
    heart_rate_avg: int
    workout_type: str
    weather_conditions: str
    location: str
    mood: str

input_table = pw.io.csv.read(
    "small_dataset.csv",
    schema=InputSchema,
    mode = "streaming"
)   

In [4]:
def normalize_fitness_score(steps, calories, active_mins):
    # Define targets
    target_steps = 10000
    target_calories = 500
    target_minutes = 30
    
    # Calculate maximum possible score
    max_score = (target_steps * 0.4 + 
                 target_calories * 0.3 + 
                 target_minutes * 0.2)
    
    # Calculate actual score
    actual_score = (steps * 0.4 + 
                   calories * 0.3 + 
                   active_mins * 0.2)
    
    # Normalize to 100
    return ((actual_score / max_score) * 100)


user_insights_table = input_table.groupby(pw.this.user_id).reduce(
    # Basic Metrics
    user_id=pw.this.user_id,
    total_steps=pw.reducers.sum(pw.this.steps),
    avg_daily_steps=pw.reducers.avg(pw.this.steps),
    step_range=pw.reducers.max(pw.this.steps) - pw.reducers.min(pw.this.steps),

    # Activity Analysis
    avg_calories_burned=pw.reducers.avg(pw.this.calories_burned),
    total_active_minutes=pw.reducers.sum(pw.this.active_minutes),
    avg_daily_active_minutes=pw.reducers.avg(pw.this.active_minutes),
    min_active_minutes=pw.reducers.min(pw.this.active_minutes),
    max_active_minutes=pw.reducers.max(pw.this.active_minutes),
    days_over_30min_active=pw.reducers.count(pw.this.active_minutes > 30),

    # Sleep Patterns
    avg_sleep_hours=pw.reducers.avg(pw.this.sleep_hours),
    min_sleep_hours=pw.reducers.min(pw.this.sleep_hours),
    max_sleep_hours=pw.reducers.max(pw.this.sleep_hours),
    sleep_range=pw.reducers.max(pw.this.sleep_hours) - pw.reducers.min(pw.this.sleep_hours),
    
    # Workout Analysis
    most_common_workout=pw.reducers.max(pw.this.workout_type),
    
    # Heart Health Indicators
    lowest_heart_rate=pw.reducers.min(pw.this.heart_rate_avg),
    highest_heart_rate=pw.reducers.max(pw.this.heart_rate_avg),
    avg_heart_rate=pw.reducers.avg(pw.this.heart_rate_avg),
    heart_rate_range=pw.reducers.max(pw.this.heart_rate_avg) - pw.reducers.min(pw.this.heart_rate_avg),
    
    # Stress and Recovery
    low_sleep=pw.reducers.any(pw.this.sleep_hours < 6),
    high_hr=pw.reducers.any(pw.this.heart_rate_avg > 100),
    
    # Health Risk Indicators
    sedentary=pw.reducers.any(
        (pw.this.steps < 3000) & 
        (pw.this.active_minutes < 30)
    ),
    overactive=pw.reducers.any(
        (pw.this.active_minutes > 120) &
        (pw.this.sleep_hours < 7)
    ),
    poor_sleep=pw.reducers.any(pw.this.sleep_hours < 6),
    
    # Performance Extremes
    best_step_day=pw.reducers.max(pw.this.steps),
    worst_step_day=pw.reducers.min(pw.this.steps),
    best_active_day=pw.reducers.max(pw.this.active_minutes),
    worst_active_day=pw.reducers.min(pw.this.active_minutes),
    
    
    # Overall Scores
    overall_fitness_score=pw.reducers.avg(
        normalize_fitness_score(
            pw.this.steps,
            pw.this.calories_burned,
            pw.this.active_minutes
        )
    ),

    risk_of_heart_attack=pw.reducers.any(
        (pw.this.heart_rate_avg > 130) & (pw.this.sleep_hours < 4)
    ),
    
    # Consistency Indicators
    consistent_activity_days=pw.reducers.count(
        (pw.this.steps > 7500) &
        (pw.this.active_minutes > 30) &
        (pw.this.sleep_hours > 7)
    )
)

result_table = user_insights_table.select(
    user_id=pw.this.user_id,
    total_steps=pw.this.total_steps,
    avg_daily_steps=pw.this.avg_daily_steps,
    step_range=pw.this.step_range,
    avg_calories_burned=pw.this.avg_calories_burned,
    total_active_minutes=pw.this.total_active_minutes,
    avg_daily_active_minutes=pw.this.avg_daily_active_minutes,
    min_active_minutes=pw.this.min_active_minutes,
    max_active_minutes=pw.this.max_active_minutes,
    days_over_30min_active=pw.this.days_over_30min_active,
    avg_sleep_hours=pw.this.avg_sleep_hours,
    min_sleep_hours=pw.this.min_sleep_hours,
    max_sleep_hours=pw.this.max_sleep_hours,
    sleep_range=pw.this.sleep_range,
    most_common_workout=pw.this.most_common_workout,
    lowest_heart_rate=pw.this.lowest_heart_rate,
    highest_heart_rate=pw.this.highest_heart_rate,
    avg_heart_rate=pw.this.avg_heart_rate,
    heart_rate_range=pw.this.heart_rate_range,
    days_low_sleep=pw.this.low_sleep,
    high_hr_episodes=pw.this.high_hr,
    sedentary_days=pw.this.sedentary,
    overactive_days=pw.this.overactive,
    poor_sleep_days=pw.this.poor_sleep,
    best_step_day=pw.this.best_step_day,
    worst_step_day=pw.this.worst_step_day,
    best_active_day=pw.this.best_active_day,
    worst_active_day=pw.this.worst_active_day,
    overall_fitness_score=pw.this.overall_fitness_score,
    consistent_activity_days=pw.this.consistent_activity_days,
    risk_of_heart_attack=pw.this.risk_of_heart_attack,
)

In [6]:
pw.io.csv.write(result_table, "./tmp/output_stream.csv")
pw.run()

Output()

KeyboardInterrupt: 