In [2]:
from bokeh.core.has_props import abstract

from models.ingestion import CSVDatasource
from models.storage import Landing
from pandas import DataFrame
import pandas as pd

In [46]:
class LandingMotorVehicleCollisionsCrashes(Landing):
    def __init__(
        self, i_filepath: str, i_separator: str = ",", i_low_memory: bool = False
    ):
        self.m_filepath = i_filepath
        self.m_separator = i_separator
        self.m_low_memory = i_low_memory

    @staticmethod
    def to_snake_case(x: str) -> str:
        return x.lower().replace(" ", "_")

    @staticmethod
    def _count_null_values(df: DataFrame) -> dict[str, int]:
        null_values = df.isnull().sum().to_dict()
        return {k: v for k, v in null_values.items()}

    def _drop_missing_on_critical_columns(self, df: DataFrame) -> DataFrame:
        return df[
            (~df["crash_date"].isnull())
            & (~df["crash_time"].isnull())
            & (~df["collision_id"].isnull())
        ]

    def _overwrite_invalid_sum(
        self, df: DataFrame, x: str, y: [str]
    ) -> (DataFrame, int):
        sum_y = df[y].sum(axis=1)
        discrepancies = len(df[df[x] != df[x].where(df[x] >= sum_y, sum_y)])
        df[x] = df[x].where(df[x] >= sum_y, sum_y)
        return (df, discrepancies)
    
    def _validate_coordinates(self, df: DataFrame, lat_col: str, lon_col: str) -> DataFrame:
        return df[
            ((df[lat_col] >= -90) & (df[lat_col] <= 90) &
             (df[lon_col] >= -180) & (df[lon_col] <= 180)) |
            (df[lat_col].isnull() | df[lon_col].isnull())
        ]

    def _read(self) -> DataFrame:
        return pd.read_csv(
            self.m_filepath, sep=self.m_separator, low_memory=self.m_low_memory
        )

    def get(self) -> DataFrame:
        df = self._read()
        df.columns = list(
            map(LandingMotorVehicleCollisionsCrashes.to_snake_case, df.columns)
        )
        self.m_metadata[
            "raw_missing_values"
        ] = LandingMotorVehicleCollisionsCrashes._count_null_values(df)
        df = self._drop_missing_on_critical_columns(df)

        df, discrepancies_injured = self._overwrite_invalid_sum(
            df,
            "number_of_persons_injured",
            [
                "number_of_pedestrians_injured",
                "number_of_cyclist_injured",
                "number_of_motorist_injured",
            ],
        )

        df, discrepancies_killed = self._overwrite_invalid_sum(
            df,
            "number_of_persons_killed",
            [
                "number_of_pedestrians_killed",
                "number_of_cyclist_killed",
                "number_of_motorist_killed",
            ],
        )

        self.m_metadata["discrepancies"] = {
            "injured": discrepancies_injured,
            "killed": discrepancies_killed,
        }
        
        df = self._validate_coordinates(df, 'latitude', 'longitude')