In [1]:
import os

In [2]:
os.chdir("C:/Users/pc/Desktop/GitHub repos/End-to-end-ML-project-with-MLflow")

In [3]:
%pwd

'C:\\Users\\pc\\Desktop\\GitHub repos\\End-to-end-ML-project-with-MLflow'

In [4]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

In [5]:
import os
from mlProject import logger
from sklearn.model_selection import train_test_split
import polars as pl
from mlProject.entity.config_entity import DataValidationConfig

In [6]:
class DataTransformation:
    def __init__(self, ValidationConfig: DataValidationConfig, TransformationConfig: DataTransformationConfig):
        self.ValidationConfig = ValidationConfig
        self.TransformationConfig = TransformationConfig

    def preprocess_data(self) -> pl.DataFrame:
        """
        Preprocesses Velib bike sharing dataframe by:
        1. Parsing datetime column into date, time and weekday components
        2. Calculating total available bikes and free terminals
        3. Extracting latitude and longitude from station_geo
        
        Args:
            data (pl.DataFrame): Raw Velib dataframe
            
        Returns:
            pl.DataFrame: Preprocessed Velib dataframe with additional columns
        """

        df = pl.read_csv(self.ValidationConfig.unzip_data_dir)

        # filter out non-operative stations
        df = df.filter(pl.col("operative") == True)

        # parse datetime column and extract date and time
        df = df.with_columns(
            # Step 1: Parse the string into a Datetime object
            # Polars' default parser usually handles ISO 8601 format (T separator, Z for UTC)
            datetime = pl.col("datetime").str.to_datetime()
        ).with_columns(
            # Step 2: Extract Date, Time and Weekday from the new Datetime object
            date = pl.col("datetime").dt.date(),
            time = pl.col("datetime").dt.time(),
            weekday = pl.col("datetime").dt.weekday()
        )
        # Extract hour from time column
        df = df.with_columns(
            hour = pl.col("time").cast(str).str.slice(0, 2).cast(pl.Int32)
        )

        # create new columns
        total_available = pl.col("available_mechanical") + pl.col("available_electrical")

        df = df.with_columns(
            total_available = total_available,
            free_terminals = pl.col("capacity") - total_available,
            lat = pl.col("station_geo").str.split(",").list.first().cast(float),
            lon = pl.col("station_geo").str.split(",").list.last().cast(float)
        )

        # Group by date, hour, and station, taking last value in each hour
        df = df.group_by(["date", "hour", "station_name"]).agg([
            pl.col("weekday").last(),
            pl.col("lat").last(),
            pl.col("lon").last(),
            pl.col("total_available").last(),
            pl.col("available_mechanical").last(), 
            pl.col("available_electrical").last(),
            pl.col("free_terminals").last()
        ])
        
        # drop datetime column and reorder columns
        df = df.select([
            "date",
            "weekday",
            "hour",
            "station_name",
            "total_available",
            "available_mechanical",
            "available_electrical",
            "free_terminals"
        ]).sort(["date", "hour", "station_name"])

        logger.info(f"Preprocessed data. Shape: {df.shape}")

        return df
    

    def create_station_mapping(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Creates a station mapping (station_id <=> station_name) for the dataframe.
        """
        station_mapping = (
            df.sort("station_name")
            .select("station_name")
            .unique()
            .with_row_index("station_id")
        )
        return station_mapping
        


    def create_lagged_features(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Creates lagged features for the dataframe.
        """
        station_mapping = self.create_station_mapping(df)

        # Add station IDs to original dataframe by joining on station_name
        new_df = df.join(
            station_mapping,
            on="station_name",
            how="left"
        )

        new_df = new_df.with_columns(
            pl.col("station_id").cast(pl.String).cast(pl.Categorical)
        )

        # Replace station_name with station_id and reorder columns
        new_df = new_df.select([
            "station_id",
            "date",
            "hour",
            "total_available",
            "available_mechanical", 
            "available_electrical",
            "free_terminals"
        ])
        
        # Sort by station_id, date and hour to ensure proper lag calculation
        new_df = new_df.sort(["station_id", "date", "hour"])

        # Get unique station IDs
        unique_stations = new_df.get_column("station_id").unique().to_list()

        # Create list to store DataFrames with lagged features for each station
        station_lag_dfs = []

        # For each station, create lagged features
        for station_id in unique_stations:
        # Filter data for current station
            station_df = new_df.filter(pl.col("station_id") == station_id)
            
            # Create lag columns for total_available
            lag_columns = []
            for i in range(1, 25):
                lag_columns.append(
                    pl.col("total_available").shift(i).alias(f"total_available_lag_{i}")
                )
            
            # Add lag columns to station DataFrame
            station_with_lags = station_df.with_columns(lag_columns).select(
                ["station_id", "date", "hour", "total_available"] + 
                [f"total_available_lag_{i}" for i in range(1, 25)]
            )
            
            # Drop rows with any null values (first 24 hours)
            station_with_lags = station_with_lags.drop_nulls()
            
            if len(station_with_lags) > 0:  # Only append if we have data after dropping nulls
                station_lag_dfs.append(station_with_lags)
        
        # Concatenate all station_lag_dfs vertically to form the final df
        lags_df = pl.concat(station_lag_dfs).sort(["date", "hour"])

        logger.info(f"Created lagged features dataframe. Shape: {lags_df.shape}")
        

        return lags_df



    def split_train_test(self, df):
        # Define the split point (e.g., 95% for training)
        split_ratio = 0.95
        split_index = int(len(df) * split_ratio)

        # split data temporally
        train = df[:split_index]
        test = df[split_index:]

        # save to csv files annd store them in the given dir
        train.write_csv(os.path.join(self.TransformationConfig.root_dir, "train.csv"))
        test.write_csv(os.path.join(self.TransformationConfig.root_dir, "test.csv"))

        logger.info("Data split into training and test sets")
        print("train.shape", train.shape)
        print("test.shape", test.shape)

In [7]:
from src.mlProject.utils.common import read_yaml

schema_filepath = Path("schema.yaml")
read_yaml(schema_filepath)

[2025-05-15 20:19:26,755: INFO: common: yaml file: schema.yaml loaded successfully]


ConfigBox({'COLUMNS': {'datetime': 'String,', 'capacity': 'Int64,', 'available_mechanical': 'Int64,', 'available_electrical': 'Int64,', 'station_name': 'String,', 'station_geo': 'String,', 'operative': 'Boolean'}, 'TARGET_COLUMN': {'name': 'total_available'}})

In [8]:
from mlProject.config.configuration import ConfigurationManager

CM = ConfigurationManager()
ValidationConfig = CM.get_data_validation_config()
TransformConfig = CM.get_data_transformation_config()
datatransformation = DataTransformation(ValidationConfig, TransformConfig)

[2025-05-15 20:19:26,794: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-05-15 20:19:26,797: INFO: common: yaml file: params.yaml loaded successfully]
[2025-05-15 20:19:26,800: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-05-15 20:19:26,801: INFO: common: created directory at: artifacts]
[2025-05-15 20:19:26,802: INFO: common: created directory at: artifacts/data_validation]
[2025-05-15 20:19:26,804: INFO: common: created directory at: artifacts/data_transformation]


In [9]:
processed_df = datatransformation.preprocess_data()

[2025-05-15 20:19:34,804: INFO: 2274250724: Preprocessed data. Shape: (4205848, 8)]


In [10]:
lags_df = datatransformation.create_lagged_features(processed_df)

[2025-05-15 20:19:39,516: INFO: 2274250724: Created lagged features dataframe. Shape: (4172748, 28)]


In [11]:
datatransformation.split_train_test(lags_df)

[2025-05-15 20:19:40,432: INFO: 2274250724: Data split into training and test sets]
train.shape (3964110, 28)
test.shape (208638, 28)
