In [1]:
import numpy as np
import pandas as pd
import csv
from sklearn.impute import SimpleImputer


In [2]:

with open("../Data/Raw/Asteroid_Updated.csv", "r") as file_descriptor:
    lines_reader = csv.reader(file_descriptor, delimiter=",")
    data_array = np.array(list(map(lambda line: list(map(lambda item: np.nan if len(item) == 0 else item, line)), lines_reader)))

In [3]:
with open("AuxiliaryData/Asteroid_Updated.bin", "wb") as file_descriptor:
    np.save(file_descriptor, data_array)

In [4]:
with open("AuxiliaryData/Asteroid_Updated.bin", "rb") as file_descriptor:
    data_array = np.load(file_descriptor)

In [5]:
use_number_rows = (1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 15, 17, 18, 19, 20, 21, 25, 26, 28, 29, 30)
use_string_rows = (13, 14, 23, 24, 27)

def reduce_data_array(current_data_array: np.array, use_rows: tuple, type_to_cast_to: type) -> np.array:
    reduced_array = np.array([])

    print(use_rows[0])
    reduced_array = np.hstack((reduced_array, current_data_array[1:, use_rows[0]].astype(type_to_cast_to)))

    for row in use_rows[1:]:
        print(row)
        reduced_array = np.vstack((reduced_array, current_data_array[1:, row].astype(type_to_cast_to)))

    return np.transpose(reduced_array)


typed_numerical_data_array = reduce_data_array(data_array, use_number_rows, float)
typed_categorical_data_array = reduce_data_array(data_array, use_string_rows, str)

number_header_list = [data_array[0, it] for it in use_number_rows]
category_header_list = [data_array[0, jt] for jt in use_string_rows]

print(number_header_list)
print(category_header_list)

1
2
3
4
5
6
7
8
9
11
12
15
17
18
19
20
21
25
26
28
29
30
13
14
23
24
27
['a', 'e', 'i', 'om', 'w', 'q', 'ad', 'per_y', 'data_arc', 'n_obs_used', 'H', 'diameter', 'albedo', 'rot_per', 'GM', 'BV', 'UB', 'G', 'moid', 'n', 'per', 'ma']
['neo', 'pha', 'spec_B', 'spec_T', 'class']


In [6]:
diameter_col_nr = np.where(np.array(number_header_list) == "diameter")[0][0]

lines_with_diameter = np.logical_not(np.isnan(typed_numerical_data_array[:, diameter_col_nr]))

typed_numerical_data_with_diameter_array = typed_numerical_data_array[lines_with_diameter]
typed_categorical_data_with_diameter_array = typed_categorical_data_array[lines_with_diameter]

In [7]:
def get_columns_with_nan_from_numeric(current_data_array: np.array) -> tuple:
    return tuple([it for it in range(current_data_array.shape[1]) if True in np.isnan(current_data_array[:, it])])

def get_columns_with_nan_from_str(current_data_array) -> tuple:
    return tuple([it for it in range(current_data_array.shape[1]) if (np.where(current_data_array[:, it] == "nan"))[0].shape[0] > 0])

def get_completed_array(partial_array: np.array, nan_columns: tuple, current_imputer: SimpleImputer) -> np.array:
    completed_array = np.copy(partial_array)

    for column in nan_columns:
        completed_array[:, column] = current_imputer.fit_transform(completed_array[:, column].reshape(-1, 1))[:, 0]

    return completed_array

def get_completed_dataframe(partial_dataframe: pd.DataFrame, nan_columns: tuple, current_imputer: SimpleImputer) -> np.array:
    completed_dataframe = partial_dataframe.copy()

    for column in nan_columns:
        completed_dataframe[column] = current_imputer.fit_transform(completed_dataframe[column].values.reshape(-1, 1))[:, 0]

    return completed_dataframe

In [8]:
#find numerical columns that should be completed

list_of_nan_numerical_columns = get_columns_with_nan_from_numeric(typed_numerical_data_with_diameter_array)
print(list_of_nan_numerical_columns)

#complete those numerical columns with imputer

completed_typed_numerical_data_with_diameter_array = get_completed_array(typed_numerical_data_with_diameter_array,
                                                                         list_of_nan_numerical_columns,
                                                                         SimpleImputer(missing_values=np.nan, strategy="mean"))

print(get_columns_with_nan_from_numeric(completed_typed_numerical_data_with_diameter_array))

(8, 10, 12, 13, 14, 15, 16, 17)
()


In [9]:
#find categorical columns that should be completed

list_of_nan_categorical_columns = get_columns_with_nan_from_str(typed_categorical_data_with_diameter_array)
print(list_of_nan_categorical_columns)

#complete those categorical colums with most common imputer

typed_categorical_data_with_diameter_data_frame = pd.DataFrame(typed_categorical_data_with_diameter_array,
                                                               columns=list(range(typed_categorical_data_with_diameter_array.shape[1])))


completed_typed_categorical_data_with_diameter_data_frame = get_completed_dataframe(typed_categorical_data_with_diameter_data_frame,
                                                                                    list_of_nan_categorical_columns,
                                                                                    SimpleImputer(missing_values="nan", strategy="most_frequent"))

print(get_columns_with_nan_from_str(completed_typed_categorical_data_with_diameter_data_frame.to_numpy()))

(2, 3)
()


In [19]:
#convert categorical data to binary format

def prefix_columns(prefix: str, data_frame: pd.DataFrame) -> pd.DataFrame:
    renamed_columns = [f"{prefix}#{current_column}" for current_column in list(data_frame.columns)]
    return data_frame.rename(columns=dict(zip(list(data_frame.columns), renamed_columns)))

binary_completed_typed_categorical_data_with_diameter_data_frame = \
    pd.get_dummies(completed_typed_categorical_data_with_diameter_data_frame[0])

binary_completed_typed_categorical_data_with_diameter_data_frame = prefix_columns(category_header_list[0], binary_completed_typed_categorical_data_with_diameter_data_frame)

for it, column in enumerate(completed_typed_categorical_data_with_diameter_data_frame.columns[1:]):
    binary_categorical_data = pd.get_dummies(completed_typed_categorical_data_with_diameter_data_frame[column])
    binary_categorical_data = prefix_columns(category_header_list[it + 1], binary_categorical_data)

    binary_completed_typed_categorical_data_with_diameter_data_frame = pd.concat(
        [binary_completed_typed_categorical_data_with_diameter_data_frame,
         binary_categorical_data], axis=1).reindex(binary_categorical_data.index)


binary_completed_typed_categorical_data_with_diameter_array = binary_completed_typed_categorical_data_with_diameter_data_frame.to_numpy()

In [28]:
#join everything into one np array and an indexes np array

normalized_data_array = np.hstack((completed_typed_numerical_data_with_diameter_array,
                                   binary_completed_typed_categorical_data_with_diameter_array))

normalized_data_header = np.hstack((np.array(number_header_list),
                                    np.array(list(binary_completed_typed_categorical_data_with_diameter_data_frame.columns))))

with open("AuxiliaryData/Asteroid_Updated_Normalized_Array.bin", "wb") as file_descriptor:
    np.save(file_descriptor, normalized_data_array)

with open("AuxiliaryData/Asteroid_Updated_Normalized_Header.bin", "wb") as file_descriptor:
    np.save(file_descriptor, normalized_data_header)