In [13]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder, StandardScaler
from sklearn.feature_selection import mutual_info_classif, RFE, SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
from category_encoders import HashingEncoder, CountEncoder
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import skew, kurtosis
from sklearn.base import BaseEstimator, TransformerMixin

class DatetimeConvertCyclical(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.time_periods = {
            'second': 24 * 60 * 60,
            'minute': 24 * 60,
            'hour': 24,
            'day': 30,
            'dayofweek': 7,
            'month': 12
        }

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        for period, value in self.time_periods.items():
            X[period] = getattr(X['timestamp'].dt, period)
            X[f'sin_{period}'] = np.sin(2 * np.pi * X[period] / value)
            X[f'cos_{period}'] = np.cos(2 * np.pi * X[period] / value)
            X.drop(period, axis=1, inplace=True)
        return X

class DataProcessor:
    def __init__(self):
        self.encoders = {}
        self.scaler = StandardScaler()

    @staticmethod
    def _convert_time_columns(data):
        data['start_time'] = pd.to_datetime(data['start_time'], format='%Y%m%d%H%M%S')
        data['end_time'] = pd.to_datetime(data['end_time'], format='%Y%m%d%H%M%S')
        data['timestamp'] = data['start_time']
        return data

    def _extract_time_features(self, data):
        cyclical_transformer = DatetimeConvertCyclical()
        data = cyclical_transformer.transform(data)
        data['start_hour'] = data['start_time'].dt.hour
        data['start_dayofweek'] = data['start_time'].dt.dayofweek
        data['is_weekend'] = data['start_dayofweek'].apply(lambda x: 1 if x >= 5 else 0)
        data['is_working_hour'] = data['start_hour'].apply(lambda x: 1 if 9 <= x <= 18 else 0)
        return data

    def _encode_feature(self, data, feature, encoding_method, fit=True):
        if encoding_method == 'onehot':
            encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore') if fit else self.encoders.get(feature)
            if fit:
                encoded = encoder.fit_transform(data[[feature]])
                self.encoders[feature] = encoder
            else:
                encoded = encoder.transform(data[[feature]])
            encoded_df = pd.DataFrame(encoded, columns=[f"{feature}_{cat}" for cat in encoder.categories_[0]])
            data = pd.concat([data, encoded_df], axis=1).drop(columns=[feature])
        elif encoding_method == 'label':
            encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1) if fit else self.encoders.get(feature)
            if fit:
                data[feature] = encoder.fit_transform(data[[feature]])
                self.encoders[feature] = encoder
            else:
                data[feature] = encoder.transform(data[[feature]])
        elif encoding_method == 'hash':
            encoder = HashingEncoder() if fit else self.encoders.get(feature)
            if fit:
                data = pd.concat([data, encoder.fit_transform(data[[feature]])], axis=1)
                self.encoders[feature] = encoder
            else:
                data = pd.concat([data, encoder.transform(data[[feature]])], axis=1)
        elif encoding_method == 'count':
            encoder = CountEncoder() if fit else self.encoders.get(feature)
            if fit:
                data[feature] = encoder.fit_transform(data[[feature]])
                self.encoders[feature] = encoder
            else:
                data[feature] = encoder.transform(data[[feature]]) if encoder else data[feature].map(data[feature].value_counts())
        elif encoding_method == 'frequency':
            freq = data[feature].value_counts() / len(data) if fit else self.encoders.get(feature, data[feature].value_counts() / len(data))
            data[feature] = data[feature].map(freq).fillna(0)
            if fit:
                self.encoders[feature] = freq
        elif encoding_method == 'labelcount':
            encoder = CountEncoder(normalize=True) if fit else self.encoders.get(feature)
            if fit:
                data[feature] = encoder.fit_transform(data[[feature]])
                self.encoders[feature] = encoder
            else:
                data[feature] = encoder.transform(data[[feature]])
        else:
            raise ValueError(f"Unknown encoding method: {encoding_method}")
        return data

    def _encode_categorical_features(self, data, encoding_config, fit=True):
        for feature, encoding_method in encoding_config.items():
            data = self._encode_feature(data, feature, encoding_method, fit)
        return data

    @staticmethod
    def _statistical_features(df, feature):
        agg_funcs = ['sum', 'mean', 'max', 'min', 'std', 'var', 'median', 'nunique', 'size', 'count']
        agg_funcs += [
            ('skew', lambda x: skew(x) if len(x) > 1 else -1),
            ('kurt', lambda x: kurtosis(x) if len(x) > 1 else -1),
            ('quantile_25', lambda x: x.quantile(0.25)),
            ('quantile_75', lambda x: x.quantile(0.75)),
            ('mode', lambda x: x.mode().iloc[0] if not x.mode().empty else -1)
        ]
        return df.groupby('msisdn')[feature].agg(agg_funcs).add_prefix(f'{feature}_')

    @staticmethod
    def _aggregate_features(data, features, agg_func):
        user_aggregated_data = pd.DataFrame()
        for feature in features:
            feature_stats = agg_func(data, feature)
            if user_aggregated_data.empty:
                user_aggregated_data = feature_stats
            else:
                user_aggregated_data = user_aggregated_data.join(feature_stats, how='outer')
        return user_aggregated_data

    def _aggregate_numerical_features(self, data, numerical_features):
        return self._aggregate_features(data, numerical_features, self._statistical_features)

    @staticmethod
    def _aggregate_location_features(data, location_features):
        return data.groupby('msisdn')[location_features].agg(['nunique', 'count']).add_prefix('_')

    @staticmethod
    def _aggregate_categorical_frequencies(data, categorical_features, user_aggregated_data):
        for feature in categorical_features:
            frequency = data.groupby(['msisdn', feature]).size().unstack(fill_value=0)
            normalized_frequency = frequency.div(frequency.sum(axis=1), axis=0)
            normalized_frequency.columns = [f"{feature}_{col}_freq" for col in normalized_frequency.columns]
            user_aggregated_data = user_aggregated_data.join(normalized_frequency, how='left')
        return user_aggregated_data

    @staticmethod
    def _aggregate_differential_features(data):
        data['call_duration_diff'] = data.groupby('msisdn')['call_duration'].diff().fillna(0)
        data['call_duration_diff2'] = data.groupby('msisdn')['call_duration'].diff(2).fillna(0)

        diff_agg_funcs = {
            'call_duration_diff': ['mean', 'std'],
            'call_duration_diff2': ['mean', 'std']
        }

        diff_aggregated_data = data.groupby('msisdn').agg(diff_agg_funcs)
        diff_aggregated_data.columns = ['_'.join(map(str, col)).strip() for col in diff_aggregated_data.columns.values]

        return diff_aggregated_data

    def _extract_advanced_features(self, data):
        call_stats = data.groupby('msisdn').agg({
            'msisdn': ['count', 'nunique']
        })
        call_stats.columns = ['_'.join(col).strip() for col in call_stats.columns.values]
        call_stats['avg_calls_per_person'] = call_stats['msisdn_count'] / call_stats['msisdn_nunique']

        most_common_city = data.groupby('msisdn')['phone1_loc_city'].agg(lambda x: x.mode().iloc[0] if not x.mode().empty else -1)
        most_common_county = data.groupby('msisdn')['phone1_loc_province'].agg(lambda x: x.mode().iloc[0] if not x.mode().empty else -1)

        data['date'] = data['start_time'].dt.date
        daily_stats = data.groupby(['msisdn', 'date']).agg({
            'start_hour': ['min', 'max'],
            'call_duration': 'count'
        })
        daily_stats.columns = ['_'.join(col).strip() for col in daily_stats.columns.values]
        daily_stats.reset_index(inplace=True)

        daily_stats_agg = daily_stats.groupby('msisdn').agg({
            'start_hour_min': ['mean', 'std'],
            'start_hour_max': ['mean', 'std'],
            'call_duration_count': ['mean', 'std']
        })
        daily_stats_agg.columns = ['_'.join(col).strip() for col in daily_stats_agg.columns.values]

        advanced_features = pd.concat([call_stats, most_common_city, most_common_county, daily_stats_agg], axis=1)
        advanced_features.columns = [f"advanced_{col}" for col in advanced_features.columns]

        return advanced_features

    @staticmethod
    def _binary_operations(user_aggregated_data):
        if 'cfee_sum' in user_aggregated_data.columns and 'lfee_sum' in user_aggregated_data.columns:
            user_aggregated_data['cfee_lfee_sum'] = user_aggregated_data['cfee_sum'] + user_aggregated_data['lfee_sum']
            user_aggregated_data['cfee_lfee_diff'] = user_aggregated_data['cfee_sum'] - user_aggregated_data['lfee_sum']
            user_aggregated_data['cfee_lfee_prod'] = user_aggregated_data['cfee_sum'] * user_aggregated_data['lfee_sum']
            user_aggregated_data['cfee_lfee_ratio'] = user_aggregated_data['cfee_sum'] / (user_aggregated_data['lfee_sum'] + 1e-6)
        return user_aggregated_data

    def _add_ranking_features(self, data, features):
        for feature in features:
            data[f'{feature}_rank'] = data.groupby('msisdn')[feature].rank(method='average')
            data[f'{feature}_dense_rank'] = data.groupby('msisdn')[feature].rank(method='dense')
        return data

    def preprocess_and_aggregate(self, data, label_data=None, is_validation=False, fit_columns=None, encoding_config=None):
        data = self._convert_time_columns(data)
        data = self._extract_time_features(data)

        suspect_types = {3, 5, 6, 9, 11, 12, 17}
        data['is_suspect'] = data['phone1_type'].apply(lambda x: 1 if x in suspect_types else 0)

        categorical_features = [
            'call_event', 'roam_type', 'long_type1', 'ismultimedia', 'home_area_code',
            'visit_area_code', 'called_home_code', 'called_code', 'a_serv_type',
            'a_product_id', 'phone1_type', 'phone2_type', 'phone1_loc_city',
            'phone1_loc_province', 'phone2_loc_city', 'phone2_loc_province',
            'is_weekend', 'is_working_hour'
        ]
        location_features = [
            'home_area_code', 'visit_area_code', 'called_home_code', 'called_code',
            'phone1_loc_city', 'phone1_loc_province', 'phone2_loc_city', 'phone2_loc_province'
        ]
        numerical_features = ['call_duration', 'cfee', 'lfee', 'start_hour', 'start_dayofweek']

        data = self._encode_categorical_features(data, encoding_config, fit=not is_validation)

        # Include the new features in the aggregation
        all_numerical_features = numerical_features + [f'sin_{period}' for period in ['hour', 'dayofweek']] + [f'cos_{period}' for period in ['hour', 'dayofweek']]
        
        user_aggregated_data = self._aggregate_numerical_features(data, all_numerical_features)
        user_aggregated_data = user_aggregated_data.join(self._aggregate_location_features(data, location_features), how='outer')
        user_aggregated_data = self._aggregate_categorical_frequencies(data, categorical_features, user_aggregated_data)
        user_aggregated_data = user_aggregated_data.join(self._aggregate_differential_features(data), how='left')

        user_aggregated_data = self._binary_operations(user_aggregated_data)
        advanced_features = self._extract_advanced_features(data)
        user_aggregated_data = user_aggregated_data.join(advanced_features, how='left')

        ranking_features = ['call_duration', 'cfee', 'lfee'] + [f'sin_{period}' for period in ['hour', 'dayofweek']] + [f'cos_{period}' for period in ['hour', 'dayofweek']]
        user_aggregated_data = self._add_ranking_features(user_aggregated_data, ranking_features)

        user_aggregated_data.fillna(-1, inplace=True)
        user_aggregated_data.reset_index(inplace=True)

        if not is_validation and label_data is not None:
            user_aggregated_data = user_aggregated_data.merge(label_data, on='msisdn', how='left')

        if not is_validation:
            numerical_features = [col for col in user_aggregated_data.columns if col not in ['msisdn', 'is_sa']]
            user_aggregated_data[numerical_features] = self.scaler.fit_transform(user_aggregated_data[numerical_features])
            return user_aggregated_data, numerical_features, self.scaler
        else:
            user_aggregated_data[fit_columns] = self.scaler.transform(user_aggregated_data[fit_columns])
            return user_aggregated_data

def feature_selection(train_data, label_column='is_sa', k=20):
    X = train_data.drop(columns=['msisdn', label_column])
    y = train_data[label_column]

    def get_top_features(selector):
        selector.fit(X, y)
        return X.columns[selector.get_support()].tolist()

    xgb_model = XGBClassifier(n_jobs=-1)
    xgb_selected_features = get_top_features(xgb_model)

    mutual_info_selected_features = get_top_features(SelectKBest(mutual_info_classif, k=k))

    rfe_model = RFE(LogisticRegression(max_iter=1000), n_features_to_select=k)
    rfe_selected_features = get_top_features(rfe_model)

    info_gain_selected_features = get_top_features(SelectKBest(f_classif, k=k))

    corr_coef_selected_features = X.corrwith(y).abs().nlargest(k).index.tolist()

    all_selected_features = (
        xgb_selected_features + mutual_info_selected_features +
        rfe_selected_features + info_gain_selected_features +
        corr_coef_selected_features + rf_selected_features
    )

    combined_features = pd.Series(all_selected_features).value_counts().nlargest(k).index.tolist()

    return combined_features

# Encoding configuration with Count Encoding for location features
encoding_config = {
    'call_event': 'label',
    'other_party': 'label',
    'ismultimedia': 'label',
    'home_area_code': 'frequency',
    'visit_area_code': 'frequency',
    'called_home_code': 'frequency',
    'called_code': 'frequency',
    'a_serv_type': 'onehot',
    'long_type1': 'label',
    'roam_type': 'label',
    'a_product_id': 'count',
    'phone1_type': 'label',
    'phone2_type': 'label',
    'phone1_loc_city': 'frequency',
    'phone1_loc_province': 'frequency',
    'phone2_loc_city': 'frequency',
    'phone2_loc_province': 'frequency',
    'is_weekend': 'label',
    'is_working_hour': 'label'
}


In [14]:
# 读取数据
train_set_res = pd.read_csv(
    '/home/hwxu/Projects/Competition/Telecom/Input/raw/train.csv', low_memory=False)
train_set_ans = pd.read_csv(
    '/home/hwxu/Projects/Competition/Telecom/Input/raw/labels.csv', low_memory=False)
validation_set_res = pd.read_csv(
    '/home/hwxu/Projects/Competition/Telecom/Input/raw/val.csv', low_memory=False)

In [15]:

# 实例化数据处理器
data_processor = DataProcessor()

# 处理训练集
train_data, fit_columns, scaler = data_processor.preprocess_and_aggregate(
    train_set_res, train_set_ans, is_validation=False, encoding_config=encoding_config)

print(f"num features: {len(fit_columns)}")

# 选择特征
n_features_to_keep = 30
selected_features = feature_selection(train_data, k=n_features_to_keep)
train_data = train_data[['msisdn'] + selected_features + ['is_sa']]

print(f"num selected features: {len(selected_features)}")

# 处理验证集
validation_data = data_processor.preprocess_and_aggregate(
    validation_set_res, is_validation=True, fit_columns=fit_columns, encoding_config=encoding_config)

# 选择特征（根据训练集选择的特征）
validation_data = validation_data[['msisdn'] + selected_features]

# 输出处理后的训练集和验证集
train_data.to_csv(
    f'/home/hwxu/Projects/Competition/Telecom/Input/processed/train{n_features_to_keep}.csv', index=False)
validation_data.to_csv(
    f'/home/hwxu/Projects/Competition/Telecom/Input/processed/val{n_features_to_keep}.csv', index=False)



  ('skew', lambda x: skew(x) if len(x) > 1 else -1),
  ('kurt', lambda x: kurtosis(x) if len(x) > 1 else -1),
  ('skew', lambda x: skew(x) if len(x) > 1 else -1),
  ('kurt', lambda x: kurtosis(x) if len(x) > 1 else -1),
  ('skew', lambda x: skew(x) if len(x) > 1 else -1),
  ('kurt', lambda x: kurtosis(x) if len(x) > 1 else -1),


KeyError: 'Column not found: call_duration'

In [None]:
train_data.shape, validation_data.shape