In [10]:
import pandas as pd

def read_csv_file(filepath, **kwargs):
    """Read a CSV file and return a DataFrame.
    Args:
        filepath (str): Path to the CSV file.
        **kwargs: Additional keyword arguments for pd.read_csv.
    Returns:
        pd.DataFrame: Loaded DataFrame.
    """
    return pd.read_csv(filepath, **kwargs)

# Example usage:
df = read_csv_file("./lpdg_test.csv")

In [None]:
# Data Processing class 
# input - which takes the df create a copy of it.
# process - perform the data clearning and feature engineering ops
# output - return the clean data which then can be used for model training and dev

class DataProcessor:
    def __init__(self, df):
        self.df = df.copy()

    def read_csv_file(self,filepath, **kwargs):
        return pd.read_csv(filepath, **kwargs)
    
    def clean(self):
        self.df = self.df.dropna(subset=["user_id"])
        self.df["purchase_amount"] = self.df["purchase_amount"].clip(lower=0)
        return self
    
    def add_features(self):
        self.df["engagement_score"] = (
            self.df["session_duration_sec"] * self.df["pages_visited"]
        )
        self.df["high_value_flag"] = (self.df["purchase_amount"] > 10000)
        return self 
    def aggreagate_user_level(self):
        agg = (
            self.df.groupby("user_id").agg(
                total_spend=("purchase_amount", "sum"),
                avg_engagement=("engagement_score", "mean")
            ).reset_index()
        )
        self.df = self.df.merge(agg, on="user_id", how="left")
        return self
    def get_data(self):
        return self.df


In [None]:
processor = DataProcessor(df)
final_df = (processor.processor.read_csv_file("./lpdg_test.csv")
            .clean()
            .add_features()
            .aggreagate_user_level()
            .get_data()
            )

In [None]:
#final_df --- output df 

How to use this data in another datapipeline file?

from datapipeline import read_csv_file, DataProcessor

# Read data
df = read_csv_file("./lpdg_test.csv")

# Process data
processor = DataProcessor(df)
final_df = (
    processor.clean()
    .add_features()
    .aggreagate_user_level()
    .get_data()
)

# Now use final_df for training, etc.


Unnamed: 0,user_id,age,signup_date,purchase_amount,country,device_type,traffic_source,session_duration_sec,pages_visited,is_repeat_user,engagement_score,high_value_flag,total_spend,avg_engagement
0,101.0,25.0,2023-01-15,500.0,IN,mobile,organic,120,3,0,360,False,1300.0,630.0
1,102.0,,2023-02-10,1200.0,US,desktop,paid,300,6,1,1800,False,1200.0,2300.0
2,103.0,35.0,2023-02-20,0.0,IN,mobile,paid,45,1,0,45,False,4000.0,172.5
3,104.0,45.0,invalid_date,7000.0,UK,desktop,organic,600,10,1,6000,False,7000.0,6000.0
4,106.0,,2023-03-15,15000.0,US,mobile,paid,900,15,1,13500,True,15000.0,13500.0
5,101.0,25.0,2023-01-20,800.0,IN,mobile,organic,180,5,1,900,False,1300.0,630.0
6,102.0,40.0,2023-02-25,,US,desktop,paid,400,7,1,2800,False,1200.0,2300.0
7,103.0,,,4000.0,UK,tablet,organic,150,2,0,300,False,4000.0,172.5
8,107.0,29.0,2023-03-30,200.0,IN,mobile,referral,60,1,0,60,False,200.0,60.0


In [None]:
####