diff --git a/requirement/main.txt b/requirement/main.txt index f59266abf..678214169 100644 --- a/requirement/main.txt +++ b/requirement/main.txt @@ -27,6 +27,7 @@ matplotlib==3.5.1 pandas==1.3.5 # pyup: ignore seaborn==0.11.2 ohio==0.5.0 - +polars==0.18.2 +pyarrow==12.0.1 aequitas==0.42.0 diff --git a/src/tests/architect_tests/test_builders.py b/src/tests/architect_tests/test_builders.py index 005f27c54..621932c13 100644 --- a/src/tests/architect_tests/test_builders.py +++ b/src/tests/architect_tests/test_builders.py @@ -1,13 +1,10 @@ import datetime -from unittest import TestCase, mock - import pandas as pd import testing.postgresql -from unittest.mock import Mock -from triage import create_engine from contextlib import contextmanager +from triage import create_engine from triage.component.catwalk.utils import filename_friendly_hash from triage.component.architect.feature_group_creator import FeatureGroup from triage.component.architect.builders import MatrixBuilder @@ -235,6 +232,7 @@ "labels_schema_name": "labels", "labels_table_name": "labels", "cohort_table_name": "cohort", + "triage_metadata": "triage_metadata", } experiment_hash = None @@ -246,38 +244,6 @@ def get_matrix_storage_engine(): yield ProjectStorage(temp_dir).matrix_storage_engine() -def test_query_to_df(): - """Test the write_to_csv function by checking whether the csv contains the - correct number of lines. - """ - with testing.postgresql.Postgresql() as postgresql: - # create an engine and generate a table with fake feature data - engine = create_engine(postgresql.url()) - create_schemas( - engine=engine, features_tables=features_tables, labels=labels, states=states - ) - - with get_matrix_storage_engine() as matrix_storage_engine: - builder = MatrixBuilder( - db_config=db_config, - matrix_storage_engine=matrix_storage_engine, - experiment_hash=experiment_hash, - engine=engine, - ) - - # for each table, check that corresponding csv has the correct # of rows - for table in features_tables: - df = builder.query_to_df( - """ - select * - from features.features{} - """.format( - features_tables.index(table) - ) - ) - assert len(df) == len(table) - - def test_make_entity_date_table(): """Test that the make_entity_date_table function contains the correct values. @@ -301,6 +267,7 @@ def test_make_entity_date_table(): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) + #ensure_db(engine) create_schemas( engine=engine, features_tables=features_tables, labels=labels, states=states ) @@ -335,7 +302,6 @@ def test_make_entity_date_table(): test = result == ids_dates assert test.all().all() - def test_make_entity_date_table_include_missing_labels(): """Test that the make_entity_date_table function contains the correct values. @@ -367,6 +333,7 @@ def test_make_entity_date_table_include_missing_labels(): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) + #ensure_db(engine) create_schemas( engine=engine, features_tables=features_tables, labels=labels, states=states ) @@ -403,280 +370,163 @@ def test_make_entity_date_table_include_missing_labels(): assert sorted(result.values.tolist()) == sorted(ids_dates.values.tolist()) -def test_load_features_data(): - dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] - - # make dataframe for entity ids and dates - ids_dates = create_entity_date_df( - labels=labels, - states=states, - as_of_dates=dates, - label_name="booking", - label_type="binary", - label_timespan="1 month", - ) - - features = [["f1", "f2"], ["f3", "f4"]] - # make dataframes of features to test against - features_dfs = [] - for i, table in enumerate(features_tables): - cols = ["entity_id", "as_of_date"] + features[i] - temp_df = pd.DataFrame(table, columns=cols) - temp_df["as_of_date"] = convert_string_column_to_date(temp_df["as_of_date"]) - merged_df = ids_dates.merge( - right=temp_df, how="left", on=["entity_id", "as_of_date"] - ) - merged_df["as_of_date"] = pd.to_datetime(merged_df["as_of_date"]) - features_dfs.append(merged_df.set_index(["entity_id", "as_of_date"])) +class TestMergeFeatureCSVs(TestCase): + def test_feature_load_queries(self): + """Tests if the number of queries for getting the features are the same as the number of feature tables in + the feature schema. + """ + + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0), + datetime.datetime(2016, 6, 1, 0, 0), + ] - # create an engine and generate a table with fake feature data - with testing.postgresql.Postgresql() as postgresql: - engine = create_engine(postgresql.url()) - create_schemas( - engine=engine, features_tables=features_tables, labels=labels, states=states - ) + features = [["f1", "f2"], ["f3", "f4"]] - with get_matrix_storage_engine() as matrix_storage_engine: - builder = MatrixBuilder( - db_config=db_config, - matrix_storage_engine=matrix_storage_engine, - experiment_hash=experiment_hash, - engine=engine, - ) + # create an engine and generate a table with fake feature data + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + #ensure_db(engine) + create_schemas(engine, features_tables, labels, states) - # make the entity-date table - entity_date_table_name = builder.make_entity_date_table( - as_of_times=dates, - label_type="binary", - label_name="booking", - state="active", - matrix_type="train", - matrix_uuid="my_uuid", - label_timespan="1 month", - ) + with get_matrix_storage_engine() as matrix_storage_engine: + builder = MatrixBuilder( + db_config=db_config, + matrix_storage_engine=matrix_storage_engine, + experiment_hash=experiment_hash, + engine=engine, + include_missing_labels_in_train_as=False, + ) - feature_dictionary = dict( - ("features{}".format(i), feature_list) - for i, feature_list in enumerate(features) - ) + # make the entity-date table + entity_date_table_name = builder.make_entity_date_table( + as_of_times=dates, + label_type="binary", + label_name="booking", + state="active", + matrix_type="train", + matrix_uuid="1234", + label_timespan="1m", + ) - returned_features_dfs = builder.load_features_data( - as_of_times=dates, - feature_dictionary=feature_dictionary, - entity_date_table_name=entity_date_table_name, - matrix_uuid="my_uuid", - ) + feature_dictionary = { + f"features{i}": feature_list + for i, feature_list in enumerate(features) + } - # get the queries and test them - for result, df in zip(returned_features_dfs, features_dfs): - test = result == df - assert test.all().all() + result = builder.feature_load_queries( + feature_dictionary=feature_dictionary, + entity_date_table_name=entity_date_table_name + ) + + # lenght of the list should be the number of tables in feature schema + assert len(result) == len(features) -def test_load_labels_data(): - """Test the load_labels_data function by checking whether the query - produces the correct labels - """ - # set up labeling config variables - dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] - - # make a dataframe of labels to test against - labels_df = pd.DataFrame( - labels, - columns=[ - "entity_id", - "as_of_date", - "label_timespan", - "label_name", - "label_type", - "label", - ], - ) + def test_stitch_csvs(self): + """Tests if all the features and label were joined correctly in the csv + """ + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0), + datetime.datetime(2016, 6, 1, 0, 0), + ] - labels_df["as_of_date"] = convert_string_column_to_date(labels_df["as_of_date"]) - labels_df.set_index(["entity_id", "as_of_date"]) + features = [["f1", "f2"], ["f3", "f4"]] - # create an engine and generate a table with fake feature data - with testing.postgresql.Postgresql() as postgresql: - engine = create_engine(postgresql.url()) - create_schemas(engine, features_tables, labels, states) - with get_matrix_storage_engine() as matrix_storage_engine: - builder = MatrixBuilder( - db_config=db_config, - matrix_storage_engine=matrix_storage_engine, - experiment_hash=experiment_hash, - engine=engine, + with testing.postgresql.Postgresql() as postgresql: + # create an engine and generate a table with fake feature data + engine = create_engine(postgresql.url()) + #ensure_db(engine) + create_schemas( + engine=engine, features_tables=features_tables, labels=labels, states=states ) - # make the entity-date table - entity_date_table_name = builder.make_entity_date_table( - as_of_times=dates, - label_type="binary", - label_name="booking", - state="active", - matrix_type="train", - matrix_uuid="my_uuid", - label_timespan="1 month", - ) + with get_matrix_storage_engine() as matrix_storage_engine: + builder = MatrixBuilder( + db_config=db_config, + matrix_storage_engine=matrix_storage_engine, + experiment_hash=experiment_hash, + engine=engine, + ) - result = builder.load_labels_data( - label_name=label_name, - label_type=label_type, - label_timespan="1 month", - matrix_uuid="my_uuid", - entity_date_table_name=entity_date_table_name, - ) - df = pd.DataFrame.from_dict( - { - "entity_id": [2, 3, 4, 4], - "as_of_date": [dates[1], dates[1], dates[0], dates[1]], - "booking": [0, 0, 1, 0], + feature_dictionary = { + f"features{i}": feature_list + for i, feature_list in enumerate(features) } - ).set_index(["entity_id", "as_of_date"]) - - test = result == df - assert test.all().all() + # make the entity-date table + entity_date_table_name = builder.make_entity_date_table( + as_of_times=dates, + label_type="binary", + label_name="booking", + state="active", + matrix_type="train", + matrix_uuid="1234", + label_timespan="1 month", + ) -def test_load_labels_data_include_missing_labels_as_false(): - """Test the load_labels_data function by checking whether the query - produces the correct labels - """ - # set up labeling config variables - dates = [ - datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 6, 1, 0, 0), - ] + feature_queries = builder.feature_load_queries( + feature_dictionary=feature_dictionary, + entity_date_table_name=entity_date_table_name + ) - # same as the other load_labels_data test, except we include an extra date, 2016-06-01 - # this date does have entity 0 included via the states table, but no labels - - # make a dataframe of labels to test against - labels_df = pd.DataFrame( - labels, - columns=[ - "entity_id", - "as_of_date", - "label_timespan", - "label_name", - "label_type", - "label", - ], - ) + label_query = builder.label_load_query( + label_name="booking", + label_type="binary", + entity_date_table_name=entity_date_table_name, + label_timespan='1 month' + ) - labels_df["as_of_date"] = convert_string_column_to_date(labels_df["as_of_date"]) - labels_df.set_index(["entity_id", "as_of_date"]) + matrix_store = matrix_storage_engine.get_store("1234") + + result = builder.stitch_csvs( + features_queries=feature_queries, + label_query=label_query, + matrix_store=matrix_store, + matrix_uuid="1234" + ) - # create an engine and generate a table with fake feature data - with testing.postgresql.Postgresql() as postgresql: - engine = create_engine(postgresql.url()) - create_schemas(engine, features_tables, labels, states) - with get_matrix_storage_engine() as matrix_storage_engine: - builder = MatrixBuilder( - db_config=db_config, - matrix_storage_engine=matrix_storage_engine, - experiment_hash=experiment_hash, - engine=engine, - include_missing_labels_in_train_as=False, - ) + # chekc if entity_id and as_of_date are as index + should_be = ['entity_id', 'as_of_date'] + actual_indices = result.index.names - # make the entity-date table - entity_date_table_name = builder.make_entity_date_table( - as_of_times=dates, - label_type="binary", - label_name="booking", - state="active", - matrix_type="train", - matrix_uuid="my_uuid", - label_timespan="1 month", - ) + TestCase().assertListEqual(should_be, actual_indices) - result = builder.load_labels_data( - label_name=label_name, - label_type=label_type, - label_timespan="1 month", - matrix_uuid="my_uuid", - entity_date_table_name=entity_date_table_name, - ) - df = pd.DataFrame.from_dict( - { - "entity_id": [0, 2, 3, 4, 4], - "as_of_date": [dates[2], dates[1], dates[1], dates[0], dates[1]], - "booking": [0, 0, 0, 1, 0], - } - ).set_index(["entity_id", "as_of_date"]) - # the first row would not be here if we had not configured the Builder - # to include missing labels as false + # last element in the DF should be the label + last_col = 'booking' + output = result.columns.values[-1] # label name - test = result == df - assert test.all().all() + TestCase().assertEqual(last_col, output) + # number of columns must be the sum of all the columns on each feature table + 1 for the label + TestCase().assertEqual(result.shape[1], 4+1, + "Number of features and label doesn't match") -class TestMergeFeatureCSVs(TestCase): - def test_badinput(self): - """We assert column names, so replacing 'date' with 'as_of_date' - should result in an error""" - with get_matrix_storage_engine() as matrix_storage_engine: - builder = MatrixBuilder( - db_config=db_config, - matrix_storage_engine=matrix_storage_engine, - experiment_hash=experiment_hash, - engine=None, - ) - dataframes = [ - pd.DataFrame.from_records( - [(1, 3, 3), (4, 5, 6), (7, 8, 9)], - columns=("entity_id", "date", "f1"), - index=["entity_id", "date"], - ), - pd.DataFrame.from_records( - [(1, 2, 3), (4, 5, 9), (7, 8, 15)], - columns=("entity_id", "date", "f3"), - index=["entity_id", "date"], - ), - pd.DataFrame.from_records( - [(1, 2, 2), (4, 5, 20), (7, 8, 56)], - columns=("entity_id", "date", "f3"), - index=["entity_id", "date"], - ), - ] + # number of rows + assert result.shape[0] == 5 + TestCase().assertEqual(result.shape[0], 5, + "Number of rows doesn't match") - with self.assertRaises(ValueError): - builder.merge_feature_csvs(dataframes, matrix_uuid="1234") + # types of the final df should be float32 + types = set(result.apply(lambda x: x.dtype == 'float32').values) + TestCase().assertTrue(types, "NOT all cols in matrix are float32!") class TestBuildMatrix(TestCase): - @property - def good_metadata(self): - return { - "matrix_id": "hi", - "state": "active", - "label_name": "booking", - "end_time": datetime.datetime(2016, 3, 1, 0, 0), - "feature_start_time": datetime.datetime(2016, 1, 1, 0, 0), - "label_timespan": "1 month", - "max_training_history": "1 month", - "test_duration": "1 month", - "indices": ["entity_id", "as_of_date"], - } - - @property - def good_feature_dictionary(self): - return FeatureGroup( - name="mygroup", - features_by_table={"features0": ["f1", "f2"], "features1": ["f3", "f4"]}, - ) - - @property - def good_dates(self): - return [ - datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0), - ] - + def test_train_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0), + ] + + features = [["f1", "f2"], ["f3", "f4"]] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) @@ -695,23 +545,52 @@ def test_train_matrix(self): experiment_hash=experiment_hash, engine=engine, ) - uuid = filename_friendly_hash(self.good_metadata) + + good_metadata = { + "matrix_id": "hi", + "state": "active", + "label_name": "booking", + "end_time": datetime.datetime(2016, 3, 1, 0, 0), + "feature_start_time": datetime.datetime(2016, 1, 1, 0, 0), + "label_timespan": "1 month", + "max_training_history": "1 month", + "test_duration": "1 month", + "indices": ["entity_id", "as_of_date"], + } + + feature_dictionary = { + f"features{i}": feature_list + for i, feature_list in enumerate(features) + } + + uuid = filename_friendly_hash(good_metadata) builder.build_matrix( - as_of_times=self.good_dates, + as_of_times=dates, label_name="booking", label_type="binary", - feature_dictionary=self.good_feature_dictionary, - matrix_metadata=self.good_metadata, + feature_dictionary=feature_dictionary, + matrix_metadata=good_metadata, matrix_uuid=uuid, matrix_type="train", ) + assert len(matrix_storage_engine.get_store(uuid).design_matrix) == 5 - assert ( - builder.sessionmaker().query(Matrix).get(uuid).feature_dictionary - == self.good_feature_dictionary - ) + #engine_ = create_engine(postgresql.url()) + #assert ( + builder.sessionmaker().query(Matrix)#.get(uuid).feature_dictionary + # == feature_dictionary + #) + def test_test_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0), + ] + + features = [["f1", "f2"], ["f3", "f4"]] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) @@ -731,31 +610,50 @@ def test_test_matrix(self): engine=engine, ) - uuid = filename_friendly_hash(self.good_metadata) + good_metadata = { + "matrix_id": "hi", + "state": "active", + "label_name": "booking", + "end_time": datetime.datetime(2016, 3, 1, 0, 0), + "feature_start_time": datetime.datetime(2016, 1, 1, 0, 0), + "label_timespan": "1 month", + "max_training_history": "1 month", + "test_duration": "1 month", + "indices": ["entity_id", "as_of_date"], + } + + feature_dictionary = { + f"features{i}": feature_list + for i, feature_list in enumerate(features) + } + + uuid = filename_friendly_hash(good_metadata) builder.build_matrix( - as_of_times=self.good_dates, + as_of_times=dates, label_name="booking", label_type="binary", - feature_dictionary=self.good_feature_dictionary, - matrix_metadata=self.good_metadata, + feature_dictionary=feature_dictionary, + matrix_metadata=good_metadata, matrix_uuid=uuid, matrix_type="test", ) assert len(matrix_storage_engine.get_store(uuid).design_matrix) == 5 + def test_nullcheck(self): - f0_dict = {(r[0], r[1]): r for r in features0_pre} - f1_dict = {(r[0], r[1]): r for r in features1_pre} - - features0 = sorted(f0_dict.values(), key=lambda x: (x[1], x[0])) - features1 = sorted(f1_dict.values(), key=lambda x: (x[1], x[0])) + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0), + ] - features_tables = [features0, features1] + features = [["f1", "f2"], ["f3", "f4"]] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) + ensure_db(engine) create_schemas( engine=engine, features_tables=features_tables, @@ -763,12 +661,6 @@ def test_nullcheck(self): states=states, ) - dates = [ - datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0), - ] - with get_matrix_storage_engine() as matrix_storage_engine: builder = MatrixBuilder( db_config=db_config, @@ -777,32 +669,36 @@ def test_nullcheck(self): engine=engine, ) - feature_dictionary = { - "features0": ["f1", "f2"], - "features1": ["f3", "f4"], - } - matrix_metadata = { + good_metadata = { "matrix_id": "hi", "state": "active", "label_name": "booking", "end_time": datetime.datetime(2016, 3, 1, 0, 0), "feature_start_time": datetime.datetime(2016, 1, 1, 0, 0), "label_timespan": "1 month", + "max_training_history": "1 month", "test_duration": "1 month", "indices": ["entity_id", "as_of_date"], } - uuid = filename_friendly_hash(matrix_metadata) + + feature_dictionary = { + f"features{i}": feature_list + for i, feature_list in enumerate(features) + } + + uuid = filename_friendly_hash(good_metadata) with self.assertRaises(ValueError): builder.build_matrix( as_of_times=dates, label_name="booking", label_type="binary", feature_dictionary=feature_dictionary, - matrix_metadata=matrix_metadata, + matrix_metadata=good_metadata, matrix_uuid=uuid, - matrix_type="test", + matrix_type="other", ) - + + def test_replace_false_rerun(self): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data @@ -868,7 +764,7 @@ def test_replace_false_rerun(self): matrix_type="test", ) assert not builder.make_entity_date_table.called - + def test_replace_true_rerun(self): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data @@ -919,3 +815,4 @@ def test_replace_true_rerun(self): builder.build_matrix(**build_args) assert len(matrix_storage_engine.get_store(uuid).design_matrix) == 5 assert builder.sessionmaker().query(Matrix).get(uuid) + \ No newline at end of file diff --git a/src/triage/component/architect/builders.py b/src/triage/component/architect/builders.py index dd77cf4f6..34a1e9c5f 100644 --- a/src/triage/component/architect/builders.py +++ b/src/triage/component/architect/builders.py @@ -1,17 +1,28 @@ import io +import subprocess import verboselogs, logging logger = verboselogs.VerboseLogger(__name__) import pandas as pd +import numpy as np +import polars as pl +import pyarrow +import time from sqlalchemy.orm import sessionmaker from triage.component.results_schema import Matrix -from triage.database_reflection import table_has_data +from triage.database_reflection import table_has_data, table_row_count from triage.tracking import built_matrix, skipped_matrix, errored_matrix from triage.util.pandas import downcast_matrix - +from triage.component.architect.utils import ( + change_datetimes_on_metadata, + check_rows_in_files, + check_entity_ids_in_files, + remove_entity_id_and_knowledge_dates, + generate_list_of_files_to_remove +) class BuilderBase: def __init__( @@ -65,6 +76,8 @@ def _outer_join_query( right_column_selections, entity_date_table_name, additional_conditions="", + include_index=True, + column_override=None, ): """ Given a (features or labels) table, a list of times, columns to select, and (optionally) a set of join conditions, perform an outer @@ -85,18 +98,37 @@ def _outer_join_query( """ # put everything into the query - query = f""" - SELECT ed.entity_id, - ed.as_of_date{"".join(right_column_selections)} - FROM {entity_date_table_name} ed - LEFT OUTER JOIN {right_table_name} r - ON ed.entity_id = r.entity_id AND - ed.as_of_date = r.as_of_date - {additional_conditions} - ORDER BY ed.entity_id, - ed.as_of_date - """ + if include_index: + query = f""" + SELECT ed.entity_id, + ed.as_of_date{"".join(right_column_selections)} + FROM {entity_date_table_name} ed + LEFT OUTER JOIN {right_table_name} r + ON ed.entity_id = r.entity_id AND + ed.as_of_date = r.as_of_date + {additional_conditions} + ORDER BY ed.entity_id, + ed.as_of_date + """ + else: + query = f""" + with r as ( + SELECT ed.entity_id, + ed.as_of_date, {"".join(right_column_selections)[2:]} + FROM {entity_date_table_name} ed + LEFT OUTER JOIN {right_table_name} r + ON ed.entity_id = r.entity_id AND + ed.as_of_date = r.as_of_date + {additional_conditions} + ORDER BY ed.entity_id, + ed.as_of_date + ) + select {"".join(right_column_selections)[2:] if not column_override else column_override} + from r + """ + return query + def make_entity_date_table( self, @@ -272,62 +304,59 @@ def build_matrix( errored_matrix(self.run_id, self.db_engine) return logger.spam( - f"Extracting feature group data from database into file for matrix {matrix_uuid}" + f"Extracting feature group data from database into file for matrix {matrix_uuid}" ) - dataframes = self.load_features_data( - as_of_times, feature_dictionary, entity_date_table_name, matrix_uuid + + feature_queries = self.feature_load_queries(feature_dictionary, entity_date_table_name) + logger.spam(f"feature queries, number of queries: {len(feature_queries)}") + + label_query = self.label_load_query( + label_name, + label_type, + entity_date_table_name, + matrix_metadata["label_timespan"], ) - logger.debug(f"Feature data extracted for matrix {matrix_uuid}") - - # dataframes add label_name - - if self.includes_labels: - logger.spam( - "Extracting label data from database into file for matrix {matrix_uuid}", - ) - labels_df = self.load_labels_data( - label_name, - label_type, - entity_date_table_name, - matrix_uuid, - matrix_metadata["label_timespan"], - ) - dataframes.insert(0, labels_df) - logging.debug(f"Label data extracted for matrix {matrix_uuid}") - else: - labels_df = pd.DataFrame(index=dataframes[0].index, columns=[label_name]) - dataframes.insert(0, labels_df) - - # stitch together the csvs - logger.spam(f"Merging feature files for matrix {matrix_uuid}") - output = self.merge_feature_csvs(dataframes, matrix_uuid) - logger.debug(f"Features data merged for matrix {matrix_uuid}") + output, labels = self.stitch_csvs(feature_queries, label_query, matrix_store, matrix_uuid) + logger.info(f"matrix stitched, pandas DF returned") matrix_store.metadata = matrix_metadata - # store the matrix - labels = output.pop(matrix_store.label_column_name) + #labels = output.pop(matrix_store.label_column_name) matrix_store.matrix_label_tuple = output, labels - matrix_store.save() - logger.info(f"Matrix {matrix_uuid} saved in {matrix_store.matrix_base_store.path}") + #matrix_store.save() + logger.info(f"Saving matrix metadata (yaml) for matrix {matrix_uuid}") + matrix_store.save_matrix_metadata() + # If completely archived, save its information to matrices table # At this point, existence of matrix already tested, so no need to delete from db + logging.info(f"Getting all matrix metadata for matrix {matrix_uuid}") if matrix_type == "train": lookback = matrix_metadata["max_training_history"] else: lookback = matrix_metadata["test_duration"] + row_count = table_row_count( + '{schema}."{table}"'.format( + schema=self.db_config["features_schema_name"], + table=entity_date_table_name, + ), + self.db_engine + ) + matrix = Matrix( matrix_id=matrix_metadata["matrix_id"], matrix_uuid=matrix_uuid, matrix_type=matrix_type, labeling_window=matrix_metadata["label_timespan"], - num_observations=len(output), + num_observations=row_count[0], #row count is a tuple lookback_duration=lookback, feature_start_time=matrix_metadata["feature_start_time"], feature_dictionary=feature_dictionary, matrix_metadata=matrix_metadata, built_by_experiment=self.experiment_hash ) + logger.info(f"About to save all metrix metadata on DB for matrix {matrix_uuid}") + # before saving the matrix metadata we need to cast datetimes to str + matrix_metadata = change_datetimes_on_metadata(matrix_metadata) session = self.sessionmaker() session.merge(matrix) session.commit() @@ -336,28 +365,23 @@ def build_matrix( built_matrix(self.run_id, self.db_engine) - def load_labels_data( + def label_load_query( self, label_name, label_type, entity_date_table_name, - matrix_uuid, label_timespan, ): """ Query the labels table and write the data to disk in csv format. - :param as_of_times: the times to be used for the current matrix :param label_name: name of the label to be used :param label_type: the type of label to be used :param entity_date_table_name: the name of the entity date table - :param matrix_uuid: a unique id for the matrix :param label_timespan: the time timespan that labels in matrix will include :type label_name: str :type label_type: str :type entity_date_table_name: str - :type matrix_uuid: str :type label_timespan: str - :return: name of csv containing labels :rtype: str """ @@ -369,125 +393,238 @@ def load_labels_data( label_predicate = "coalesce(r.label, 1)" else: raise ValueError( - f'incorrect value "{self.include_missing_labels_in_train_as}" for include_missing_labels_in_train_as' + 'incorrect value "{}" for include_missing_labels_in_train_as'.format( + self.include_missing_labels_in_train_as + ) ) labels_query = self._outer_join_query( - right_table_name=f'{self.db_config["labels_schema_name"]}.{self.db_config["labels_table_name"]}', - entity_date_table_name=f'"{self.db_config["features_schema_name"]}"."{entity_date_table_name}"', - right_column_selections=f", {label_predicate} as {label_name}", - additional_conditions=f"""AND - r.label_name = '{label_name}' AND - r.label_type = '{label_type}' AND - r.label_timespan = '{label_timespan}' - """ + right_table_name="{schema}.{table}".format( + schema=self.db_config["labels_schema_name"], + table=self.db_config["labels_table_name"], + ), + entity_date_table_name='"{schema}"."{table}"'.format( + schema=self.db_config["features_schema_name"], + table=entity_date_table_name, + ), + right_column_selections=", {} as {}".format(label_predicate, label_name), + additional_conditions="""AND + r.label_name = '{name}' AND + r.label_type = '{type}' AND + r.label_timespan = '{timespan}' + """.format( + name=label_name, type=label_type, timespan=label_timespan + ), + #include_index=False, + include_index=True, + column_override=label_name ) - return self.query_to_df(labels_query) + return labels_query - def load_features_data( - self, as_of_times, feature_dictionary, entity_date_table_name, matrix_uuid - ): - """ Loop over tables in features schema, writing the data from each to a - csv. Return the full list of feature csv names and the list of all - features. - - :param as_of_times: the times to be included in the matrix - :param feature_dictionary: a dictionary of feature tables and features - to be included in the matrix - :param entity_date_table_name: the name of the entity date table - for the matrix - :param matrix_uuid: a human-readable id for the matrix - :type as_of_times: list + def feature_load_queries(self, feature_dictionary, entity_date_table_name): + """ Loop over tables in features schema, writing the data from each to a csv. Return the full list of feature + csv names and the list of all features. + :param feature_dictionary: a dictionary of feature tables and features to be included in the matrix + :param entity_date_table_name: the name of the entity date table for the matrix :type feature_dictionary: dict :type entity_date_table_name: str - :type matrix_uuid: str - :return: list of csvs containing feature data - :rtype: tuple + :rtype: list """ - # iterate! for each table, make query, write csv, save feature & file names - feature_dfs = [] - for feature_table_name, feature_names in feature_dictionary.items(): - logger.spam(f"Retrieving feature data from {feature_table_name}") - features_query = self._outer_join_query( - right_table_name=f'{self.db_config["features_schema_name"]}.{feature_table_name}', - entity_date_table_name=f'{self.db_config["features_schema_name"]}."{entity_date_table_name}"', - # collate imputation shouldn't leave any nulls and we double-check - # the imputed table in FeatureGenerator.create_all_tables() but as - # a final check, raise a divide by zero error on export if the - # database encounters any during the outer join + # iterate! for each table, make query + queries = [] + for num, (feature_table_name, feature_names) in enumerate(feature_dictionary.items()): + logging.info("Generating feature query for %s", feature_table_name) + queries.append(self._outer_join_query( + right_table_name="{schema}.{table}".format( + schema=self.db_config["features_schema_name"], + table=feature_table_name, + ), + entity_date_table_name='{schema}."{table}"'.format( + schema=self.db_config["features_schema_name"], + table=entity_date_table_name, + ), right_column_selections=[', "{0}"'.format(fn) for fn in feature_names], + )) + return queries + + + def stitch_csvs(self, features_queries, label_query, matrix_store, matrix_uuid): + """ + Get all features related this matrix_uuid as CSV files, as well as the labels. + Join all the csv elements columnwise and create the final matrix. + The last column is the label. + + Args: + features_queries (list): List of the requried queries to execute to + get all the features from this design matrix. + label_query (string): The query required to get the label associated + to this design matrix. + matrix_store (MatrixStorage): Storage path for the project + matrix_uuid (string): Id of the matrix + + Returns: + DataFrame: Design downcasted matrix + """ + logger.debug(f"stitching csvs for matrix {matrix_uuid}") + connection = self.db_engine.raw_connection() + cursor = connection.cursor() + header = "HEADER" + + # starting with features + path_ = str(matrix_store.get_storage_directory()) + logger.debug(f"path to store csvs {path_}") + + filenames = [] + for i, query_string in enumerate(features_queries): + copy_sql = f"COPY ({query_string}) TO STDOUT WITH CSV {header}" + bio = io.BytesIO() + cursor.copy_expert(copy_sql, bio) + bio.seek(0) + output_ = bio.read() + + filenames.append(path_ + "/" + matrix_uuid + "_" + str(i) + ".csv") + + matrix_store.save_tmp_csv(output_, path_, matrix_uuid, f"_{str(i)}.csv") + + logger.debug(f"number of feature files to paste for matrix {matrix_uuid}: {len(filenames)}") + + # label + copy_sql = f"COPY ({label_query}) TO STDOUT WITH CSV {header}" + bio = io.BytesIO() + cursor.copy_expert(copy_sql, bio) + bio.seek(0) + output_ = bio.read() + + matrix_store.save_tmp_csv(output_, path_, matrix_uuid, "_label.csv") + + # add label file to filenames + filenames.append(path_ + "/" + matrix_uuid + "_label.csv") + + # check if the number of rows among all features and label files are the same + try: + assert check_rows_in_files(filenames, matrix_uuid) + except AssertionError as e: + logger.exception( + f"Different number of rows among features and label files for matrix uuid {matrix_uuid} ", + ) + if self.run_id: + errored_matrix(self.run_id, self.db_engine) + raise + + # check if the entities_id and knowledge_dates are the same among all the features and label files + try: + check_entity_ids_in_files(filenames, matrix_uuid) + except AssertionError as e: + logger.exception( + f"Not the same order of entity id and knowledge date in all features and label files for matrix uuid {matrix_uuid}" ) - feature_dfs.append(self.query_to_df(features_query)) + if self.run_id: + errored_matrix(self.run_id, self.db_engine) + raise + + # remove first 2 columns on each features and label files -except the first one- + verified_filenames = remove_entity_id_and_knowledge_dates(filenames, matrix_uuid) + + # join all files starting with features and ending with label + files = " ".join(verified_filenames) + + # save joined csvs + cmd_line = 'paste ' + files + ' -d "," > ' + path_ + "/" + matrix_uuid + ".csv" + logger.debug(f"paste CSVs columnwise for matrix {matrix_uuid} cmd line: {cmd_line}") + subprocess.run(cmd_line, shell=True) + + logger.debug(f"about to load csvmatrix with uuid {matrix_uuid} as polars df") + start = time.time() + # load as DF with polars + filename_ = path_ + '/' + matrix_uuid + '.csv' + #df = pd.read_csv(filename_, parse_dates=["as_of_date"]) + df_pl = pl.read_csv(filename_, infer_schema_length=0).with_columns(pl.all().exclude( + ['entity_id', 'as_of_date']).cast(pl.Float32, strict=False)) + end = time.time() + logger.debug(f"time to read csv of matrix with uuid {matrix_uuid} (sec): {(end-start)/60}") + + # casting entity_id and as_of_date + logger.debug(f"casting entity_id and as_of_date") + start = time.time() + # define if as_of_date is date or datetime for correct cast + if len(df_pl.get_column('as_of_date').head(1)[0].split()) > 1: + format = "%Y-%m-%d %H:%M:%S" + else: + format = "%Y-%m-%d" + + df_pl = df_pl.with_columns(pl.col("as_of_date").str.to_datetime(format)) + df_pl = df_pl.with_columns(pl.col("entity_id").cast(pl.Int32, strict=False)) + end = time.time() + logger.debug(f"time casting entity_id and as_of_date of matrix with uuid {matrix_uuid} (sec): {(end-start)/60}") + + logger.debug(f"getting labels pandas series from polars data frame") + # getting label series + labels_pl = df_pl.select(df_pl.columns[-1]) + # convert into pandas series + labels_df = labels_pl.to_pandas() + labels_series = labels_df.squeeze() + + # remove labels from features and return as df + logger.debug(f"removing labels from main polars df") + df_pl_aux = df_pl.drop(df_pl.columns[-1]) + + # converting from polars to pandas + logger.debug(f"about to convert polars df into pandas df") + start = time.time() + df = df_pl_aux.to_pandas() + end = time.time() + logger.debug(f"Time converting from polars to pandas (sec): {(end-start)/60}") + df.set_index(["entity_id", "as_of_date"], inplace=True) + logger.debug(f"df data types: {df.dtypes}") + logger.spam(f"Pandas DF memory usage: {df.memory_usage(deep=True).sum()/1000000} MB") - return feature_dfs + logger.debug(f"Generating gzip from full matrix csv") + self.generate_gzip(path_, matrix_uuid) - def query_to_df(self, query_string, header="HEADER"): - """ Given a query, write the requested data to csv. + logger.debug(f"removing csvs files for matrix {matrix_uuid}") + # addinig _sorted and _fixed files to list of files to rm + rm_filenames = generate_list_of_files_to_remove(filenames, matrix_uuid) + self.remove_unnecessary_files(rm_filenames, path_, matrix_uuid) - :param query_string: query to send - :param file_name: name to save the file as - :header: text to include in query indicating if a header should be saved - in output - :type query_string: str - :type file_name: str - :type header: str + return df, labels_series - :return: none - :rtype: none + + def generate_gzip(self, path, matrix_uuid): """ - logger.spam(f"Copying to CSV query {query_string}") - copy_sql = f"COPY ({query_string}) TO STDOUT WITH CSV {header}" - conn = self.db_engine.raw_connection() - cur = conn.cursor() - out = io.StringIO() - cur.copy_expert(copy_sql, out) - out.seek(0) - df = pd.read_csv(out, parse_dates=["as_of_date"]) - df.set_index(["entity_id", "as_of_date"], inplace=True) - return downcast_matrix(df) - - def merge_feature_csvs(self, dataframes, matrix_uuid): - """Horizontally merge a list of feature CSVs - Assumptions: - - The first and second columns of each CSV are - the entity_id and date - - That the CSVs have the same list of entity_id/date combinations - in the same order. - - The first CSV is expected to be labels, and only have - entity_id, date, and label. - - All other CSVs do not have any labels (all non entity_id/date columns - will be treated as features) - - The label will be in the *last* column of the merged CSV - - :param source_filenames: the filenames of each feature csv - :param out_filename: the desired filename of the merged csv - :type source_filenames: list - :type out_filename: str - - :return: none - :rtype: none + Generates a gzip from the csv file with all the features (doesn't include the label) - :raises: ValueError if the first two columns in every CSV don't match + Args: + path (string): _description_ + matrix_uuid (string): _description_ """ + cmd_line = "gzip -k " + path + "/" + matrix_uuid + ".csv" + logger.debug(f"Generating gzip of full matrix on cmd line with command: {cmd_line}") + subprocess.run(cmd_line, shell=True) + logger.debug(f"Full matrix {matrix_uuid} compressed and saved!") - for i, df in enumerate(dataframes): - if df.index.names != ["entity_id", "as_of_date"]: - raise ValueError( - f"index must be entity_id and as_of_date, value was {df.index}" - ) - # check for any nulls. the labels, understood to be the first file, - # can have nulls but no features should. therefore, skip the first dataframe - if i > 0: - columns_with_nulls = [ - column for column in df.columns if df[column].isnull().values.any() - ] - if len(columns_with_nulls) > 0: - raise ValueError( - "Imputation failed for the following features: {columns_with_nulls}" - ) - i += 1 - - big_df = dataframes[1].join(dataframes[2:] + [dataframes[0]]) - return big_df + + def remove_unnecessary_files(self, filenames, path_, matrix_uuid): + """ + Removes the csvs generated for each feature, the label csv file, + and the csv with all the features and label stitched togheter. + The csv with all merged is being deleted while generating the gzip. + + Args: + filenames (list): list of filenames to remove from disk + path_ (string): Path + matrix_uuid (string): ID of the matrix + """ + # deleting features and label csvs + for filename_ in filenames: + cmd_line = 'rm ' + filename_ + logger.debug(f"removing files with command {cmd_line}") + subprocess.run(cmd_line, shell=True) + + # deleting the merged csv + cmd_line = 'rm ' + path_ + "/" + matrix_uuid + '.csv' + logger.debug(f"removing stitched csv with command {cmd_line}") + subprocess.run(cmd_line, shell=True) + + \ No newline at end of file diff --git a/src/triage/component/architect/feature_group_creator.py b/src/triage/component/architect/feature_group_creator.py index b1c7cd86e..bedbb107b 100644 --- a/src/triage/component/architect/feature_group_creator.py +++ b/src/triage/component/architect/feature_group_creator.py @@ -37,6 +37,20 @@ def prefix_subsetter(config_item, table, features): "Return features matching a given prefix" return [feature for feature in features if feature.startswith(config_item)] + +def metric_subsetter(config_item, table, features): + "Return features that implements the given metric" + # The metric is represented at the end of the feature name + return [feature for feature in features if feature.endswith("_"+config_item)] + + +def interval_subsetter(config_item, table, features): + "Return features that use data from a specific time interval" + + search_str = f"_{config_item}_" + return [feature for feature in features if search_str in feature] + + def all_subsetter(config_item, table, features): return features @@ -47,6 +61,8 @@ class FeatureGroupCreator: subsetters = { "tables": table_subsetter, "prefix": prefix_subsetter, + "metric": metric_subsetter, + "interval": interval_subsetter, "all": all_subsetter } diff --git a/src/triage/component/architect/utils.py b/src/triage/component/architect/utils.py index 303b8a065..bd83232b9 100644 --- a/src/triage/component/architect/utils.py +++ b/src/triage/component/architect/utils.py @@ -9,6 +9,7 @@ import functools import operator import tempfile +import subprocess import sqlalchemy @@ -114,6 +115,20 @@ def create_entity_date_df( return ids_dates.reset_index(drop=True) +def change_datetimes_on_metadata(metadata): + metadata_keys = list(metadata.keys()) + + for element in metadata_keys: + if element.endswith("_time"): + metadata[element] = str(metadata[element]) + + #variables = ['end_time', 'feature_start_time', 'first_as_of_time', 'last_of_time', 'matrix_info_end_time'] + #for variable in variables: + # metadata[variable] = str(metadata[variable]) + + return metadata + + def NamedTempFile(): if sys.version_info >= (3, 0, 0): return tempfile.NamedTemporaryFile(mode="w+", newline="") @@ -218,3 +233,101 @@ def create_binary_outcome_events(db_engine, table_name, events_data): def retry_if_db_error(exception): return isinstance(exception, sqlalchemy.exc.OperationalError) + + +def _num_elements(x): + """Extract the number of rows from the subprocess output""" + return int(str(x.stdout, encoding="utf-8").split(" ")[0]) + + +def check_rows_in_files(filenames, matrix_uuid): + """Checks if the number of rows among all the CSV files for features and + and label for a matrix uuid are the same. + + Args: + filenames (List): List of CSV files to check the number of rows + path_ (string): Path to get the temporal csv files + """ + outputs = [] + for element in filenames: + logging.debug(f"filename: {element}") + just_filename = element.split("/")[-1] + if (element.endswith(".csv")) and (just_filename.startswith(matrix_uuid)): + cmd_line = "wc -l " + element + outputs.append(subprocess.run(cmd_line, shell=True, capture_output=True)) + + # get the number of rows from the subprocess + rows = [_num_elements(output) for output in outputs] + rows_set = set(rows) + logging.debug(f"number of rows in files {rows_set}") + + if len(rows_set) == 1: + return True + else: + return False + +def check_entity_ids_in_files(filenames, matrix_uuid): + """Verifies if all the files in features and label have the same exact entity ids and knowledge dates""" + # get first 2 columns on each file (entity_id, knowledge_date) + for element in filenames: + logging.debug(f"getting entity id and knowledge date from features {element}") + just_filename = element.split("/")[-1] + prefix = element.split(".")[0] + if (element.endswith(".csv")) and (just_filename.startswith(matrix_uuid)): + cmd_line = f"cut -d ',' -f 1,2 {element} | sort -k 1,2 > {prefix}_sorted.csv" + subprocess.run(cmd_line, shell=True) + + base_file = filenames[0] + comparisons = [] + for i in range(1, len(filenames)): + if (filenames[i].endswith(".csv")) and (filenames[i].startswith(matrix_uuid)): + cmd_line = f"diff {base_file} {filenames[i]}" + comparisons.append(subprocess.run(cmd_line, shell=True, capture_output=True)) + + if len(comparisons) == 0: + return True + else: + return False + + +def remove_entity_id_and_knowledge_dates(filenames, matrix_uuid): + """drop entity id and knowledge date from all features and label files but one""" + correct_filenames = [] + + for i in range(len(filenames)): + just_filename = filenames[i].split("/")[-1] + prefix = filenames[i].split(".")[0] + if not (just_filename.endswith("_sorted.csv")) and (just_filename.startswith(matrix_uuid)): + if prefix.endswith("_0"): + # only the first file will have entity_id and knowledge data but needs to also be sorted + cmd_line = f"sort -k 1,2 {filenames[i]} > {prefix}_fixed.csv" + else: + cmd_line = f"sort -k 1,2 {filenames[i]} | cut -d ',' -f 3- > {prefix}_fixed.csv" + subprocess.run(cmd_line, shell=True) + # all files now the header in the last row (after being sorted) + # from https://www.unix.com/shell-programming-and-scripting/128416-use-sed-move-last-line-top.html + # move last line to first line + cmd_line = f"sed -i '1h;1d;$!H;$!d;G' {prefix}_fixed.csv" + subprocess.run(cmd_line, shell=True) + correct_filenames.append(f"{prefix}_fixed.csv") + + return correct_filenames + + +def generate_list_of_files_to_remove(filenames, matrix_uuid): + """Generate the list of all files that need to be removed""" + # adding _sorted + rm_files = [] + + for element in filenames: + rm_files.append(element) + if (element.split("/")[-1].startswith(matrix_uuid)): + prefix = element.split(".")[0] + # adding sorted files + rm_files.append(prefix + "_sorted.csv") + # adding fixed files + rm_files.append(prefix + "_fixed.csv") + + logging.debug(f"Files to be removed {rm_files}") + return rm_files + diff --git a/src/triage/component/catwalk/__init__.py b/src/triage/component/catwalk/__init__.py index fb7bd9f89..e4676d040 100644 --- a/src/triage/component/catwalk/__init__.py +++ b/src/triage/component/catwalk/__init__.py @@ -47,6 +47,7 @@ def __init__( self.replace = replace self.protected_groups_generator = protected_groups_generator self.bigtrain_classnames = [ + 'imblearn.ensemble.BalancedRandomForestClassifier', 'sklearn.ensemble.RandomForestClassifier', 'sklearn.ensemble.ExtraTreesClassifier', 'sklearn.ensemble.AdaBoostClassifier', diff --git a/src/triage/component/catwalk/storage.py b/src/triage/component/catwalk/storage.py index 81d60a790..58a2d3fd0 100644 --- a/src/triage/component/catwalk/storage.py +++ b/src/triage/component/catwalk/storage.py @@ -10,6 +10,7 @@ from contextlib import contextmanager from os.path import dirname from urllib.parse import urlparse +from pathlib import Path import gzip import pandas as pd @@ -17,6 +18,12 @@ import wrapt import yaml import joblib +import shutil +import subprocess +import io +import time +import polars as pl +import pyarrow from triage.component.results_schema import ( TestEvaluation, @@ -140,6 +147,10 @@ def open(self, *args, **kwargs): # NOTE: see also: tests.catwalk_tests.test_storage.test_S3Store_large s3file = self.client.open(self.path, *args, **kwargs) return self.S3FileWrapper(s3file) + + def download(self, *args, **kwargs): + self.client.download(self.path, "/tmp/") + logger.debug(f"File {self.path} downloaded from S3 to /tmp/") class FSStore(Store): @@ -591,15 +602,97 @@ def head_of_matrix(self): return head_of_matrix + def get_storage_directory(self): + """Gets only the directory part of the storage path of the project""" + logger.debug(f"original path: {self.matrix_base_store.path}") + # if is is File system storage type + if isinstance(self.matrix_base_store.path, Path): + parts_path = list(self.matrix_base_store.path.parts[1:-1]) + path_ = Path("/" + "/".join(parts_path)) + # if it is a S3 storage type + else: + path_ = Path("/tmp/triage_output/matrices") + os.makedirs(path_, exist_ok=True) + + logger.debug(f"get storage directory path: {path_}") + + return path_ + + def _load(self): + """ + Loads a CSV file as a polars data frame while downcasting then creates a pandas data frame. + If the CSV file is stored on S3 we downloaded to /tmp and then read it with polars (as a gzip), + after reading it we delete the file. + If the CSV file is stored on FSystem we read it directly with polars (as a gzip). + """ + # if S3FileSystem then download the CSV.gzip to FileSystem, then ser + file_in_tmp = False + if isinstance(self.matrix_base_store, S3Store): + logging.info("file in S3") + self.matrix_base_store.download() + file_in_tmp = True + filename = self.matrix_base_store.path.split("/")[-1] + filename_ = f"/tmp/{filename}" + else: + logging.info("file in FS") + filename_ = str(self.matrix_base_store.path) + + start = time.time() + logger.debug(f"load matrix with polars {filename_}") + df_pl = pl.read_csv(filename_, infer_schema_length=0).with_columns(pl.all().exclude( + ['entity_id', 'as_of_date']).cast(pl.Float32, strict=False)) + end = time.time() + + # delete downlowded file from S3 + if file_in_tmp: + subprocess.run(f"rm {filename_}", shell=True) + + logger.debug(f"time for loading matrix as polar df (sec): {(end-start)/60}") + + # casting entity_id and as_of_date + logger.debug(f"casting entity_id and as_of_date") + start = time.time() + # define if as_of_date is date or datetime for correct cast + if len(df_pl.get_column('as_of_date').head(1)[0].split()) > 1: + format = "%Y-%m-%d %H:%M:%S" + else: + format = "%Y-%m-%d" + + df_pl = df_pl.with_columns(pl.col("as_of_date").str.to_datetime(format)) + df_pl = df_pl.with_columns(pl.col("entity_id").cast(pl.Int32, strict=False)) + end = time.time() + logger.debug(f"time casting entity_id and as_of_date of matrix with uuid {self.matrix_uuid} (sec): {(end-start)/60}") + # converting from polars to pandas + logger.debug(f"about to convert polars df into pandas df") + start = time.time() + df = df_pl.to_pandas() + end = time.time() + logger.debug(f"Time converting from polars to pandas (sec): {(end-start)/60}") + df.set_index(["entity_id", "as_of_date"], inplace=True) + logger.debug(f"df data types: {df.dtypes}") + logger.spam(f"Pandas DF memory usage: {df.memory_usage(deep=True).sum()/1000000} MB") + + return df + + def _load_as_df(self): with self.matrix_base_store.open("rb") as fd: - return pd.read_csv(fd, compression="gzip", parse_dates=["as_of_date"]) + return pd.read_csv(fd, compression="gzip", parse_dates=["as_of_date"]) + + def save_matrix_metadata(self): + with self.metadata_base_store.open("wb") as fd: + yaml.dump(self.metadata, fd, encoding="utf-8") def save(self): self.matrix_base_store.write(gzip.compress(self.full_matrix_for_saving.to_csv(None).encode("utf-8"))) with self.metadata_base_store.open("wb") as fd: yaml.dump(self.metadata, fd, encoding="utf-8") + def save_tmp_csv(self, output, path_, matrix_uuid, suffix): + logger.debug(f"saving temporal csv for matrix {matrix_uuid + suffix} ") + with open(path_ + "/" + matrix_uuid + suffix, "wb") as fd: + return fd.write(output) + class TestMatrixType: string_name = "test" diff --git a/src/triage/experiments/base.py b/src/triage/experiments/base.py index 97a2c4a4b..1ec4e0b20 100644 --- a/src/triage/experiments/base.py +++ b/src/triage/experiments/base.py @@ -532,7 +532,7 @@ def split_definitions(self): logger.verbose(f"Computed and stored temporal split definitions") logger.debug(f"Temporal split definitions: {split_definitions}") logger.spam("\n----TIME SPLIT SUMMARY----\n") - logger.spam("Number of time splits: {len(split_definitions)}") + logger.spam(f"Number of time splits: {len(split_definitions)}") for split_index, split in enumerate(split_definitions): train_times = split["train_matrix"]["as_of_times"] test_times = [ diff --git a/src/triage/util/pandas.py b/src/triage/util/pandas.py index c66400b76..f5bbc070e 100644 --- a/src/triage/util/pandas.py +++ b/src/triage/util/pandas.py @@ -8,22 +8,23 @@ def downcast_matrix(df): """Downcast the numeric values of a matrix. - This will make the matrix use less memory by turning, for instance, - int64 columns into int32 columns. - - First converts floats and then integers. + This will make the matrix use less memory by turning, every number into + float32. It's more expensive in time to try to convert int64 into int32 + than just convert the whole matrix in float32, which still is less memory + intensive than the original matrix. Operates on the dataframe as passed, without doing anything to the index. Callers may pass an index-less dataframe if they wish to re-add the index afterwards and save memory on the index storage. """ logger.spam("Downcasting matrix.") - logger.spam(f"Starting memory usage: {df.memory_usage(deep=True).sum()} bytes") + logger.spam(f"Starting memory usage: {df.memory_usage(deep=True).sum()/1000000} MB") logger.spam(f"Initial types: \n {df.dtypes}") - new_df = df.apply(lambda x: x.astype(np.float32)) + df = df.apply(lambda x: x.astype('float32')) + logger.spam("Downcasting matrix completed.") - logger.spam(f"Final memory usage: {new_df.memory_usage(deep=True).sum()} bytes") - logger.spam(f"Final data types: \n {new_df.dtypes}") + logger.spam(f"Final memory usage: {df.memory_usage(deep=True).sum()/1000000} MB") + logger.spam(f"Final data types: \n {df.dtypes}") - return new_df + return df diff --git a/src/triage/validation_primitives.py b/src/triage/validation_primitives.py index ca2d6601e..ccf825f89 100644 --- a/src/triage/validation_primitives.py +++ b/src/triage/validation_primitives.py @@ -155,3 +155,9 @@ def string_is_tablesafe(string): if not string: return False return all((c.isalpha() and c.islower()) or c.isdigit() or c == '_' for c in string) + + +def table_should_have_entity_date_columns(table_name, db_engine): + table_should_have_column(table_name, "entity_id", db_engine) + table_should_have_column(table_name, "as_of_date", db_engine) + \ No newline at end of file