# Car dataset normalization and ingestion

Ingestion begins with establishment of the appropriate database with PostgreSQL and reading the data to be ingested from CSV file.


In [18]:
import logging
import typing
import re
import unicodedata
import pandas as pd
from sqlalchemy import create_engine, types, text
from sqlalchemy.exc import SQLAlchemyError
from IPython.display import display

logging.basicConfig()
logging.getLogger('sqlalchemy').setLevel(logging.ERROR)
DB_NAME = 'cars'
DB_URL = 'postgresql://postgres:postgres@127.0.0.1:5432'
try:
    engine = create_engine(DB_URL, echo=False, isolation_level="AUTOCOMMIT")
    with engine.connect() as connection:
        connection.execute(text(f'CREATE DATABASE {DB_NAME}'))
        connection.close()
        engine.dispose()
except SQLAlchemyError as e:
    logging.error(f'Encountered error when establishing db environment: {e}')

try:
    engine = create_engine(f'{DB_URL}/{DB_NAME}',
                           echo=False, isolation_level="AUTOCOMMIT")
except SQLAlchemyError as e:
    logging.error(f'Encountered error when establishing db environment: {e}')

try:
    data = pd.read_csv('./data.csv')
except SQLAlchemyError as e:
    logging.error(f'Error reading the data file: {e}')

## Helper classes


In [19]:
class Constraints:
    '''
        Class helper holding operations conveying basic relational constraints pre-insertion into the RDBMS.
    '''

    @staticmethod
    def semantic_integrity(df: pd.DataFrame):
        '''
            Ensure non negative numerical values. 
        '''
        for column in df.columns:
            if re.match(r'(float|int)[0-9]{0,2}', str(df[column].dtype)):
                is_empty = (df[column] < 0).any()
                if is_empty:
                    df = df[df[column] >= 0]
        return df

    @staticmethod
    def domain_constraint(df: pd.DataFrame):
        '''
            Allow only values that match their context. 
        '''
        for column in df.columns:
            regex = ''
            match column:
                case 'gearbox':
                    regex = r'(\d-Speed|CVT)'
                case 'fuel':
                    regex = r'(diesel|petrol|petrol_cng)'
                case 'transmission':
                    regex = r'(automatic|manual)'

            if regex:
                edge_cases = df[~df[column].str.match(regex)]
                if len(edge_cases):
                    df = df[~df[column].str.match(regex)]
                    logging.warning(
                        f"Unseen value in {column} - {df[column]}.\nConsider adjusting the code or rejecting the entry.")
            return df

    @staticmethod
    def null_constraint(df: pd.DataFrame):
        '''
            Check for NaN or null values.
        '''
        if len(df[df.isna().any(axis=1)]) != 0:
            raise LookupError(
                'Null entries in the dataset, pausing pipeline execution. Handle them before starting cleaning pipeline again.')
        return df

    @staticmethod
    def check_constraint(df: pd.DataFrame, col: str):
        '''
            Check year value validity.
        '''
        has_edge_cases = (df[col] > 2023).any()
        if has_edge_cases:
            logging.warning(
                f"Invalid valuse in column {col}.\nConsider adjusting the code or rejecting the entry.")
        return df

In [20]:
class NormalizationUtils:
    '''
        Class helper holding miscallenous normalization operations specificially for the Car Dealer project. 
    '''

    @staticmethod
    def normalize_text(df: pd.DataFrame):
        '''
            Method ensuring consistency of encoded values and their representation across the entire dataset.
        '''
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].apply(lambda txt: unicodedata.normalize(
                    'NFD', txt).encode('ascii', 'ignore').decode("ascii"))
        return df

    @staticmethod
    def dedup(df: pd.DataFrame):
        '''
            Remove duplicated entries iff all fields are the same.
        '''
        if len(df[df.duplicated()]) > 0:
            df = df.drop_duplicates(ignore_index=True)
        return df

    @staticmethod
    def order_to_numerical(df: pd.DataFrame, col):
        '''
            Method replacing ordered owners (1st, 2nd etc) to integer representation(1, 2 etc)
        '''
        def match_digit(expr):
            regex = r'\d'
            try:
                matched = re.match(regex, expr)
                return matched.group(0)
            except AttributeError as e:
                logging.error(
                    f'Invalid entry, failed to extract the digit in {expr}')

        df[col] = df[col].apply(match_digit)
        return df

    @staticmethod
    def change_types(df: pd.DataFrame):
        '''
            Cast recognized types to their logical and contextual datatype.
        '''
        for column in df.columns:
            match column:
                case 'kmpl' | 'power' | 'torque':
                    df[column] = pd.to_numeric(
                        df[column], errors='coerce', downcast='float')
                case 'mileage' | 'total_owners' | 'price' | 'production_year' | 'seats_nr' | 'tank_capacity' | 'displacement':
                    df[column] = pd.to_numeric(
                        df[column], errors='coerce', downcast='integer')
            if df[column].dtype == 'object':
                df.astype({column: 'string'})
            if not df[column].notna().all():
                logging.warning(
                    f'Conversion introduced NaN values in column {column}. Investigate the data or adjust the db structure accordingly.')
        return df

    @staticmethod
    def change_name_convention(df: pd.DataFrame):
        '''
            Changing the naming convention to lowercased to easily distinguish SQL queries (capital letters) from queried attributes.
        '''
        df = df.rename(columns={
            'Engine_Type': 'engine_description',
            'CC_Displacement': 'displacement',
            'Transmission': 'gearbox',
            'Transmission_Type': 'transmission',
            'Fuel_Type': 'fuel',
            'Fuel_Tank_Capacity(L)': 'tank_capacity',
            'Make_Year': 'production_year',
            'Mileage_Run': 'mileage',
            'No_of_Owners': 'total_owners',
            'Power(BHP)': 'power',
            'Torque(Nm)': 'torque',
            'Mileage(kmpl)': 'kmpl',
            'Body_Type': 'shape',
            'Seating_Capacity': 'seats_nr',
            'Car_Name': 'series',
            'Make': 'producer'
        })
        df.columns = df.columns.str.lower()
        return df

    @staticmethod
    def clean_bigints(df: pd.DataFrame, cols=[]):
        '''
            Remove comas from ints with more than 3 digits.
        '''
        for column in cols:
            df[column] = df[column].apply(lambda entry: entry.replace(',', ''))
        return df

## Cleaning pipeline


In [21]:
df = (data
      .pipe(NormalizationUtils.change_name_convention)
      .pipe(Constraints.semantic_integrity)
      .pipe(Constraints.null_constraint)
      .pipe(Constraints.domain_constraint)
      .pipe(Constraints.check_constraint, col='production_year')
      .pipe(NormalizationUtils.dedup)
      .pipe(NormalizationUtils.order_to_numerical, col='total_owners')
      .pipe(NormalizationUtils.clean_bigints, cols=['price', 'mileage'])
      .pipe(NormalizationUtils.change_types)
      .pipe(NormalizationUtils.normalize_text)
      ).reset_index(drop=True)

display(df.head(10))



Unnamed: 0,series,producer,model,production_year,color,shape,mileage,total_owners,seats_nr,fuel,tank_capacity,engine_description,displacement,gearbox,transmission,power,torque,kmpl,emission,price
0,Volkswagen Ameo [2016-2017] Highline 1.5L AT (D),Volkswagen,Ameo,2017,silver,sedan,44611,1,5,diesel,45,1.5L TDI Engine,1498,7-Speed,Automatic,109.0,250.0,21.66,BS IV,657000
1,Hyundai i20 Active [2015-2020] 1.2 SX,Hyundai,i20 Active,2016,red,crossover,20305,1,5,petrol,45,1.2L Kappa 5 Speed Manual Transmission,1197,5-Speed,Manual,82.0,115.0,17.190001,BS V,682000
2,Honda WR-V VX i-VTEC,Honda,WR-V,2019,white,suv,29540,2,5,petrol,40,i-VTEC Petrol engine,1199,5-Speed,Manual,88.5,110.0,16.5,BS IV,793000
3,Renault Kwid 1.0 RXT AMT,Renault,Kwid,2017,bronze,hatchback,35680,1,5,petrol,28,1.0L,999,5-Speed,Manual,67.0,91.0,21.700001,BS IV,414000
4,Hyundai Grand i10 [2017-2020] Asta 1.2 Kappa VTVT,Hyundai,Grand i10,2017,orange,hatchback,25126,1,5,petrol,43,Kappa VTVT Petrol Engine,1197,5-Speed,Manual,81.860001,113.75,18.9,BS V,515000
5,Hyundai Elite i20 [2014-2018] Sportz 1.2,Hyundai,Elite i20,2016,red,hatchback,52261,1,5,petrol,45,Kappa VTVT Petrol Engine,1197,5-Speed,Manual,81.830002,114.699997,18.6,BS IV,604000
6,Honda Brio [2011-2016] V MT,Honda,Brio,2012,grey,hatchback,28108,2,5,petrol,35,4 cylinder inline petrol,1198,5-Speed,Manual,86.800003,109.0,19.4,BS III,316000
7,Tata Harrier XZ,Tata,Harrier,2019,grey,suv,92603,1,5,diesel,50,Kryotec 2.0 L Turbocharge,1956,6-Speed,Automatic,138.0,350.0,17.0,BS IV,1419000
8,Hyundai Grand i10 Nios Sportz AMT 1.2 Kappa VTVT,Hyundai,Grand i10 Nios,2021,blue,hatchback,16304,1,5,petrol,37,1.2 L Kappa Petrol,1197,5-Speed,Manual,81.860001,113.75,20.07,BS IV,710000
9,Renault Kwid 1.0 RXT Opt,Renault,Kwid,2019,bronze,hatchback,26350,2,5,petrol,28,Petrol Engine,999,5-Speed,Manual,67.0,91.0,22.0,BS IV,392000


### Normalize to 2NF

Empirical test/observation for redundant and partially dependent attributes determined the following conclusion:

- producer, model, shape, seats capacity, tank capacity, fuel, engine, power, torgue, emission - are functionally dependent on series + producer + model
- color, production_year, mileage, total_owners, price, displacement, transmission, kmpl, gearbox - are functionally dependent on id


In [22]:
df_car = df[['series', 'color', 'production_year',
             'mileage', 'total_owners', 'price', 'displacement', 'transmission', 'kmpl', 'gearbox']]
df_car_series = df[['series', 'producer', 'model', 'shape', 'seats_nr', 'tank_capacity', 'fuel', 'engine_description',
                    'power', 'torque', 'emission']].drop_duplicates(ignore_index=True)
assert len(df_car.columns) + len(df_car_series.columns) == len(df.columns)+1

## Ingest both tables into the db


In [23]:
try:
    df_car.to_sql(
        'sold_cars',
        engine,
        if_exists='replace',
        index=True,
        dtype={
            'series': types.VARCHAR(70),
            'color': types.VARCHAR(10),
            'production_year': types.SMALLINT,
            'mileage': types.INT,
            'total_owners': types.SMALLINT,
            'price': types.INT,
            'displacement': types.INT,
            'transmission': types.VARCHAR(20),
            'kmpl': types.DECIMAL(17, 15),
            'gearbox': types.VARCHAR(20),
        }
    )
except SQLAlchemyError as e:
    logging.error(f'Failed to ingest data into PostgreSQL: {e}')

In [24]:
try:
    df_car_series.to_sql(
        'model_details',
        engine,
        if_exists='replace',
        index=False,
        dtype={
            'series': types.VARCHAR(70),
            'producer': types.VARCHAR(20),
            'model': types.VARCHAR(20),
            'shape': types.VARCHAR(10),
            'seats_nr': types.SMALLINT,
            'tank_capacity': types.INT,
            'fuel': types.VARCHAR(10),
            'engine_type': types.VARCHAR(100),
            'power': types.FLOAT,
            'torque': types.FLOAT,
            'emission': types.VARCHAR(20),
        }
    )
except SQLAlchemyError as e:
    logging.error(f'Failed to ingest data into PostgreSQL: {e}')