In [7]:
import os
import pandas as pd
from abc import ABC, abstractmethod



# Interface for loading data
class LoadDataProcess(ABC):

    @abstractmethod
    def read_data(self):
        pass
    
    
# Interface for loading data
class DataCleaningProcess(ABC):
    @abstractmethod
    def validate_data(self):
        pass 
    
    @abstractmethod
    def transform_data(self):
        pass    
    
    @abstractmethod
    def print_information(self):
        pass

In [2]:
class FileHandler(LoadDataProcess):
    def __init__(self, path):
        self.path = path
        self.data = self.read_data()
    
    def read_data(self):
        data_list = []
        try:
            if os.path.isdir(self.path):
                for file in os.listdir(self.path):
                    file_path = os.path.join(self.path, file)
                    if file.endswith(".txt"):
                        data_list.append(pd.read_csv(file_path, sep='|', dtype=str))
                    elif file.endswith(".csv"):
                        data_list.append(pd.read_csv(file_path, dtype=str))
                    elif file.endswith(".xlsx"):
                        data_list.append(pd.read_excel(file_path, dtype=str))
                    elif file.endswith(".json"):
                        data_list.append(pd.read_json(file_path))
            data = pd.concat(data_list, ignore_index=True)
            return data
        except Exception as e:
            print(f"Error reading data: {e}")
            return None

In [3]:
class DataProcessor(DataCleaningProcess):
    def __init__(self, data):
        
        #self.data = data.copy()
        self.data = data
        self.keep_cols=[]
        self.numeric_cols = ['DT_HOUR', 'MS_ACCT', 'MS_ACCT_WITH_DEAD', 'MS_ACCT_WITH_DEAD_30_DAYS', 
                              'MS_ACCT_WITH_MORY_INJ', 'MS_ACCT_WITH_SERLY_INJ', 'MS_ACCT_WITH_SLY_INJ']
        self.date_col = 'DT_DAY'
        
    #Implement the validate_data method
    def validate_data(self):
        if self.data is None:
            print("Data is empty")
        else:
            # Remove the columns that are not useful to analysis of the data
            for col in self.data.columns:
                if col.endswith("_FR") or col.startswith("CD"):
                    self.data.drop(col, axis=1, inplace=True)
                else:
                    self.keep_cols.append(col)
            self.data = self.data[self.keep_cols]

            # Remove leading/trailing spaces     
            self.data[self.date_col] = self.data[self.date_col].str.strip()  
            
            # Change the correct dtypes of the columns to numeric
            self.data[self.numeric_cols] = self.data[self.numeric_cols].apply(pd.to_numeric, errors='coerce')

            # Feature Engineering for the date columns
            self.data['Year'] = self.data[self.date_col].str[:4].astype('Int64', errors='ignore')
            self.data['Month'] = self.data[self.date_col].str[5:7].astype('Int64', errors='ignore')
            self.data['Day_of_Month'] = self.data[self.date_col].str[8:10].astype('Int64', errors='ignore') 
            self.data['total_Deaths'] = self.data['MS_ACCT_WITH_DEAD'] + self.data['MS_ACCT_WITH_DEAD_30_DAYS']

            # Drop the date column
            self.data.drop(self.date_col, axis=1, inplace=True)

            

            # Fill the missing values of province with Brussels
            self.data['TX_PROV_DESCR_NL'] = self.data['TX_PROV_DESCR_NL'].replace(['', ' '], 'Brussels')
            self.data.fillna({'TX_PROV_DESCR_NL': 'Brussels'}, inplace=True)
            
               
            # Rename the columns
            columns_rename = {
                'DT_DAY': 'Day', 'DT_HOUR': 'Hour', 'TX_DAY_OF_WEEK_DESCR_NL': 'DayOfWeek',
                'TX_BUILD_UP_AREA_DESCR_NL': 'BuiltUpArea', 'TX_COLL_TYPE_DESCR_NL': 'CollisionType',
                'TX_LIGHT_COND_DESCR_NL': 'LightCondition', 'TX_ROAD_TYPE_DESCR_NL': 'RoadType',
                'TX_MUNTY_DESCR_NL': 'Municipality', 'TX_ADM_DSTR_DESCR_NL': 'District',
                'TX_PROV_DESCR_NL': 'Province', 'TX_RGN_DESCR_NL': 'Region', 'MS_ACCT': 'Accident',
                'MS_ACCT_WITH_DEAD': 'AccidentsWithFatalities', 'MS_ACCT_WITH_DEAD_30_DAYS': 'AccidentsWithFatalities30Days',
                'MS_ACCT_WITH_MORY_INJ': 'AccidentsWithMinorInjuries', 'MS_ACCT_WITH_SERLY_INJ': 'AccidentsWithSeriousInjuries',
                'MS_ACCT_WITH_SLY_INJ': 'AccidentsWithSlightInjuries'
            }
            # This loop will rename the columns
            for key, value in columns_rename.items():
                self.data.rename(columns={key: value}, inplace=True)
                
            print("Data cleaned successfully") 
            return self.data
        
    #Implement the transform_data method   
    def transform_data(self, data, output_path):
        self.output_Path = output_path
        if os.path.exists(self.output_Path):
            os.remove(self.output_Path)
        data.to_csv(self.output_Path, index=False)
        print(f"Data saved successfully at {self.output_Path}")

    #Implement the print_information method
    def print_information(self, data_to_print):
        print("-" * 50)
        print(f"Shape of the data: {data_to_print.shape}\n")
        print("Data Info:\n")
        data_to_print.info()
        print("\nStatistics of Data:\n")
        print(data_to_print.describe().T)
        print("\nColumns of Dataframe:\n")
        print(data_to_print.columns)
        print("\nDatatypes of Dataframe:\n")
        print(data_to_print.dtypes)
        print("\nAre there any null values:\n")
        print(data_to_print.isnull().sum())
        print(f"Top 5 rows of the dataframe:\n")
        print(data_to_print.head())
        print("-" * 50)

In [4]:
class AccidentDataPipeline:
    def __init__(self, path):
        self.path = path
    def process_data(self):
        reader = FileHandler(self.path)
        original_df = reader.read_data()
        processor = DataProcessor(original_df)
        if not os.path.exists("Merge_Data"):
            os.makedirs("Merge_Data")
        processor.transform_data(original_df,'Merge_Data/Orignal_data.csv')
        clean_df = processor.validate_data()
        processor.transform_data(clean_df,'Merge_Data/Clean_data.csv')
        return processor.data

In [8]:
import argparse
def main():
    path='Data'
    try:
        # Initialize the pipeline and process the data
        pipeline = AccidentDataPipeline(path)
        df = pipeline.process_data()
        
        # Print the first 5 rows of the dataframe
        print(df.head())
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

Data saved successfully at Merge_Data/Orignal_data.csv
Data cleaned successfully
Data saved successfully at Merge_Data/Clean_data.csv
   Hour DayOfWeek          BuiltUpArea                          CollisionType  \
0    18   vrijdag  Buiten bebouwde kom  Tegen een hindernis buiten de rijbaan   
1    18  woensdag  Buiten bebouwde kom                       Niet beschikbaar   
2    13   maandag     Niet beschikbaar                     Met een voetganger   
3    14    zondag  Binnen bebouwde kom                            Langs opzij   
4     9  woensdag  Binnen bebouwde kom  Frontale botsing (of bij het kruisen)   

                          LightCondition  \
0  Nacht, ontstoken openbare verlichting   
1  Nacht, ontstoken openbare verlichting   
2                    Bij klaarlichte dag   
3                    Bij klaarlichte dag   
4                    Bij klaarlichte dag   

                                 RoadType Municipality  \
0  Gewestweg, provincieweg of gemeenteweg   Aartselaar  