In [7]:
import dask.dataframe as dd  # Import dask dataframe
import pandas as pd

In [8]:
used_cars_csv = '../used_cars_data.csv'

# Read column names of dataset
columns = pd.read_csv(used_cars_csv, nrows=0).columns.tolist()

# Drop columns that are empty or not meaningful for
# predicting the price
drop_columns = [
    'bed',
    'bed_length',
    'bed_height',
    'cabin',
    'combine_fuel_economy',
    'dealer_zip',
    'description',
    'engine_type',
    'is_certified',
    'listed_date',
    'main_picture_url',
    'major_options',
    'power',
    'sp_id',
    'transmission',
    'trimId',
    'vehicle_damage_category',
    'vin',
    'wheel_system_display'
]

# Remove the columns
used_columns = [col for col in columns if col not in drop_columns]
print(f'Used features ({len(used_columns)}): {used_columns}')

Used features (47): ['back_legroom', 'body_type', 'city', 'city_fuel_economy', 'daysonmarket', 'engine_cylinders', 'engine_displacement', 'exterior_color', 'fleet', 'frame_damaged', 'franchise_dealer', 'franchise_make', 'front_legroom', 'fuel_tank_volume', 'fuel_type', 'has_accidents', 'height', 'highway_fuel_economy', 'horsepower', 'interior_color', 'isCab', 'is_cpo', 'is_new', 'is_oemcpo', 'latitude', 'length', 'listing_color', 'listing_id', 'longitude', 'make_name', 'maximum_seating', 'mileage', 'model_name', 'owner_count', 'price', 'salvage', 'savings_amount', 'seller_rating', 'sp_name', 'theft_title', 'torque', 'transmission_display', 'trim_name', 'wheel_system', 'wheelbase', 'width', 'year']


In [9]:
# Read dataset in chunks
data = dd.read_csv(used_cars_csv, low_memory=False,
usecols=used_columns,
dtype={
    'vin': str,
    'back_legroom': str,
    'bed': str,
    'bed_height': str,
    'bed_length': str,
    'body_type': str,
    'cabin': str,
    'city': str,
    'city_fuel_economy': float,
    'combine_fuel_economy': float,
    'daysonmarket': int,
    'dealer_zip': int,
    'description': str,
    'engine_cylinders': str,
    'engine_displacement': float,
    'engine_type': str,
    'exterior_color': str,
    # 'fleet': bool,
    # 'frame_damaged': bool,
    'franchise_dealer': bool,
    'franchise_make': str,
    'front_legroom': str,
    'fuel_tank_volume': str,
    'fuel_type': str,
    # 'has_accidents': bool,
    'height': str,
    'highway_fuel_economy': float,
    'horsepower': float,
    'interior_color': str,
    # 'isCab': bool,
    # 'is_certified': bool,
    # 'is_cpo': bool,
    'is_new': bool,
    # 'is_oemcpo': bool,
    'latitude': float,
    'length': str,
    'listed_date': str,
    'listinc_color': str,
    'listing_id': int,
    'longitude': float,
    'main_picture_url': str,
    'major_options': str,
    'make_name': str,
    'maximum_seating': str,
    'mileage': float,
    'model_name': str,
    'owner_count': float,
    'power': str,
    'price': float,
    # 'salvage': bool,
    'savings_amount': int,
    'seller_rating': float,
    'sp_id': str,
    'sp_name': str,
    # 'theft_title': bool,
    'torque': str,
    'transmission': str,
    'transmission_display': str,
    'trimId': str,
    'trim_name': str,
    'vehicle_damage_category': str,
    'wheel_system': str,
    'wheel_system_display': str,
    'wheelbase': str,
    'width': str,
    'year': int
},
converters={
    'fleet': lambda b : 1.0 if b else 0.0,
    'frame_damaged': lambda b : 1.0 if b else 0.0,
    'has_accidents': lambda b : 1.0 if b else 0.0,
    'is_certified': lambda b : 1.0 if b else 0.0,
    'is_cpo': lambda b : 1.0 if b else 0.0,
    'is_oemcpo': lambda b : 1.0 if b else 0.0,
    'isCab': lambda b : 1.0 if b else 0.0,
    'salvage': lambda b : 1.0 if b else 0.0,
    'theft_title': lambda b : 1.0 if b else 0.0,
})

In [10]:
# Get info of data
data.info(verbose=True, memory_usage=True)

<class 'dask.dataframe.core.DataFrame'>
Int64Index: 3000040 entries, 0 to 20454
Data columns (total 47 columns):
 #   Column                Non-Null Count  Dtype
---  ------                --------------  -----
 0   back_legroom          2840771 non-null      object
 1   body_type             2986497 non-null      object
 2   city                  3000040 non-null      object
 3   city_fuel_economy     2508755 non-null      float64
 4   daysonmarket          3000040 non-null      int32
 5   engine_cylinders      2899459 non-null      object
 6   engine_displacement   2827654 non-null      float64
 7   exterior_color        3000014 non-null      object
 8   fleet                 3000040 non-null      float64
 9   frame_damaged         3000040 non-null      float64
10   franchise_dealer      3000040 non-null      bool
11   franchise_make        2427405 non-null      object
12   front_legroom         2840771 non-null      object
13   fuel_tank_volume      2840771 non-null      object
14  

In [11]:
# Columns with units like 'in' (inch)
unit_columns = {
    'back_legroom': ' in',
    'front_legroom': ' in',
    'height': ' in',
    'length': ' in',
    'wheelbase': ' in',
    'width': ' in',
    'fuel_tank_volume': ' gal',
    'maximum_seating': ' seats'
}

# Remove units from columns and convert to
# float type
for col, unit in unit_columns.items():
    data[col] = data[col].replace([unit, '--'], ['', 'NaN'], regex=True).astype(float)

# Extract torque value as a float
data['torque'] = data['torque'].replace(r' .*', '', regex=True).astype(float)

In [12]:
from dask_ml.preprocessing import Categorizer, DummyEncoder

# Convert categorical columns into an one-hot encoding
categorical_columns = [
    'body_type',
    'city',
    'engine_cylinders',
    'exterior_color',
    'franchise_make',
    'fuel_type',
    'interior_color',
    'listing_color',
    'make_name',
    'model_name',
    'sp_name',
    'transmission_display',
    'trim_name',
    'wheel_system',
    'year'
]

# Get categorical data
only_cat = data[categorical_columns].compute()

# Combine small categories under 1% to the cateogory 'other'
for cat in categorical_columns:
    series = only_cat[cat].value_counts()
    
    mask = series[(series/series.sum() * 100) < 1].index
    data[cat] = data[cat].mask(data[cat].isin(mask), "other")

# Convert columns to categorical dtype
ce = Categorizer(columns=categorical_columns)
data = ce.fit_transform(data)

# One-hot encode the categorical columns
enc = DummyEncoder(columns=categorical_columns)
data = enc.fit_transform(data)

In [13]:
from dask_ml.impute import SimpleImputer

# Impute NaN values to mean of the column
# and convert all columns to float
si = SimpleImputer(strategy='mean')
data = si.fit_transform(data).astype(float)

In [15]:
# Save the preprocessed data in an HDF file
used_cars_hdf = 'used_cars_preprocessed.hdf'
data.to_hdf(used_cars_hdf, '/data-*', compute=True, scheduler='processes')

['used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_cars_preprocessed.hdf',
 'used_c