In [2]:
import os

import joblib
import pandas as pd
import yaml

import numpy as np
from datetime import datetime

from geopy import distance
import json

from scipy.stats import stats
import re
from transliterate import translit

import warnings
warnings.filterwarnings("ignore")

In [3]:
config_path = '../config/params.yml'
config = yaml.load(open(config_path,  encoding="utf-8"), Loader=yaml.FullLoader)

preproc = config['preprocessing']
training = config['train']
evaluate = config['evaluate']


# Import

In [3]:
data_test = pd.read_csv(evaluate['predict_path'])

In [4]:
data_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 28 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Название            998 non-null    object 
 1   Адрес               998 non-null    object 
 2   Метро               997 non-null    object 
 3   Время до метро      997 non-null    object 
 4   Общая площадь       998 non-null    object 
 5   Жилая площадь       774 non-null    object 
 6   Площадь кухни       828 non-null    object 
 7   Этаж                998 non-null    object 
 8   Год постройки       733 non-null    float64
 9   Тип жилья           998 non-null    object 
 10  Высота потолков     731 non-null    object 
 11  Санузел             845 non-null    object 
 12  Вид из окон         746 non-null    object 
 13  Ремонт              736 non-null    object 
 14  Строительная серия  526 non-null    object 
 15  Мусоропровод        339 non-null    object 
 16  Количес

# Preprocessing

In [5]:
"""
Программа: Предобработка данных
Версия: 1.0
"""
import json
import warnings

import numpy as np
import pandas as pd
from datetime import datetime

warnings.filterwarnings("ignore")


def replace_comma(
    data: pd.DataFrame, lst: list, inplace_true: bool = False
) -> pd.DataFrame:
    """
    Замена запятых на точки в столбцах и удаление лишних символов (\xa0)
    :param data: дата фрейм
    :param lst: список из названий колонок
    :param inplace_true: True - изменения сразу применятся к data, False - создается новый дата фрейм
    """
    data_new = data.copy()
    data_new[lst] = (
        data[lst]
        .apply(lambda x: x.str.split("\xa0").str[0].str.replace(",", ".").astype(float))
        .copy()
    )
    if inplace_true is True:
        data = data_new.copy()
        return data
    else:
        return data_new


def sum_obj(obj: pd.Series) -> pd.Series:
    """
    Удаление лишней информации из значений признака и преобразование в числовой тип.
    :param obj: Колонка из датасета.
    """

    temp_obj_1 = pd.to_numeric(obj.str.split(" ", expand=True)[0], errors="coerce")
    temp_obj_2 = pd.to_numeric(obj.str.split(" ", expand=True)[2], errors="coerce")
    temp = pd.concat([temp_obj_1, temp_obj_2], axis=1)
    temp = temp.sum(axis=1, min_count=1).astype("Int64")
    return temp


def ar_part(data: pd.DataFrame, col: str, main_col: str) -> pd.Series:
    """
    Заполнение пропусков в зависимости от отношения площади признака к общей площади.
    :param data: Датасет
    :param col: Название колонки площади необходимого признака.
    :param main_col: Название колонки общей площади.
    """
    part = round((data[col] / data[main_col]).median(), 2)
    col_new = data[col].fillna(data[main_col] * part)
    return col_new


def replace_group(
    data: pd.DataFrame, main_col: str, col_1: str, col_2: str
) -> pd.DataFrame:
    """
    Заполнение модой через groupby.
    :param data: Дата фрейм.
    :param main_col: Название колонки по которой идет группировка.
    :param col_1: Название колонки в которой происходит заполнение пустых значений.
    :param col_2: Название колонки в которой происходит заполнение пустых значений.
    :return: Датасет.
    """
    for col in data[[col_1, col_2]]:
        data[col] = data[col].fillna(
            data.groupby(main_col)[col].transform(lambda x: x.mode()[0])
        )
        data[col] = data[col].astype(int)
    data_new = data.copy()
    return data_new


def replace_elev(row: pd.Series) -> int:
    """
    Заполнение пропусков в признаке кол-во лифтов в зависимости от этажности здания согласно СНиП 31-01-2003
    :param row: строка дата фрейма.
    """

    if row["Количество лифтов"] == 0:
        floors = int(row["Этажность здания"])
        if floors < 6:
            return 0
        elif floors < 10:
            return 1
        elif floors < 20:
            return 2
        elif floors < 25:
            return 3
        else:
            return 4
    else:
        return row["Количество лифтов"]


def drop_rooms(data: pd.DataFrame, col: str) -> pd.Series:
    """
    Удаление лишних уникальных значений в признаке 'Кол-во комнат', квартиры с большим кол-вом комнат и апартаменты.
    :param data: Датасет
    :param col: Название колонки "кол-во комнат"
    """
    to_drop = (
        data[col]
        .loc[
            lambda x: (x == "Многокомнатная")
            | (x == "Многокомнатные")
            | (x == "Апартаменты")
        ]
        .index
    )
    data_new = data.drop(index=to_drop).copy()
    data_new[col][data_new[col] == "Студия,"] = 1
    data_new[col] = pd.to_numeric(data_new[col], errors="coerce")
    return data_new


def anomaly_drop(data: pd.DataFrame) -> pd.DataFrame:
    """Удаление аномалий в данных: Отрицательные значения и значения меньше 2 в "Высота потолков"
    :param data: Датасет
    """
    df_num = data.select_dtypes(include=np.number)
    drop_neg = []
    for i, j in df_num.iterrows():
        for k in j:
            if k < 0:
                drop_neg.append(i)
    drop_neg += data[data["Высота потолков"] < 2].index.tolist()
    df_fin = data.drop(index=drop_neg)
    df_fin = df_fin.reset_index(drop=True)
    return df_fin


def check_columns_evaluate(data: pd.DataFrame, unique_values_path: str) -> pd.DataFrame:
    """
    Проверка на наличие признаков из train и упорядочивание признаков согласно train
    :param data: датасет test
    :param unique_values_path: путь до списка с признаками train для сравнения
    :return: датасет test
    """
    with open(unique_values_path) as json_file:
        unique_values = json.load(json_file)

    column_sequence = unique_values.keys()

    assert set(column_sequence) == set(data.columns), "Разные признаки"
    return data[column_sequence]


def save_unique_train_data(
    data: pd.DataFrame, drop_columns: list, unique_values_path: str
) -> None:
    """
    Сохранение словаря с признаками и уникальными значениями
    :param drop_columns: список с признаками для удаления
    :param data: датасет
    :param unique_values_path: путь до файла со словарем
    :return: None
    """
    unique_df = data.drop(columns=drop_columns, axis=1)
    # создаем словарь с уникальными значениями для вывода в UI
    dict_unique = {key: unique_df[key].unique().tolist() for key in unique_df.columns}
    with open(unique_values_path, "w") as file:
        json.dump(dict_unique, file)


def pipeline_preprocess(
    data: pd.DataFrame, flg_evaluate: bool = True, **kwargs
) -> pd.DataFrame:
    """
    Пайплайн по предобработке данных.
    :param flg_evaluate: Флаг для evaluate.
    :param data: Датасет.
    :return: Датасет.
    """
    drop_list = [
        "Газоснабжение",
        "Год сдачи",
        "Дом",
        "Строительная серия",
        "Аварийность",
        "Подъезды",
    ]
    data = data.drop(columns=drop_list).copy()
    # Обработка данных.
    # Проверка идет ли предсказание.
    if not flg_evaluate:
        data = data.drop(data["Цена"].isna().loc[lambda x: x == True].index)

    data_clean = data.drop(
        data["Метро"].isna().loc[lambda x: x == True].index
    ).reset_index(drop=True)

    data_clean = replace_comma(
        data_clean,
        ["Жилая площадь", "Площадь кухни", "Общая площадь", "Высота потолков"],
    ).copy()

    data_clean["Время до метро"] = (
        data_clean["Время до метро"].str.split(" ").str[0].copy()
    )
    data_clean["Время до метро"] = pd.to_numeric(
        data_clean["Время до метро"], errors="coerce"
    )

    data_clean[["Санузел", "Количество лифтов", "Балкон/лоджия"]] = data_clean[
        ["Санузел", "Количество лифтов", "Балкон/лоджия"]
    ].apply(sum_obj)

    floor = data_clean["Этаж"].str.split(" из ", expand=True)
    data_clean["Этаж"] = floor[0]
    data_clean["Этажность здания"] = floor[1]
    data_clean["Этаж"] = pd.to_numeric(data_clean["Этаж"], errors="coerce")

    data_clean["Кол-во комнат"] = (
        data_clean["Название"]
        .str.split(" ", expand=True)[0]
        .str.split("-", expand=True)[0]
    )

    data_clean = drop_rooms(data_clean, "Кол-во комнат")
    data_clean = data_clean.drop(columns=["Название"])

    data_clean["Тип жилья"] = data_clean["Тип жилья"].str.split(" ").str[0]

    # Заполнение пропусков.
    for i in data_clean[["Жилая площадь", "Площадь кухни"]]:
        data_clean[i] = ar_part(data_clean, i, "Общая площадь")

    curr_year = datetime.now().year
    data_clean["Год постройки"] = (
        data_clean["Год постройки"].fillna(curr_year + 1).astype(int)
    )

    data_clean = replace_group(
        data=data_clean,
        main_col="Кол-во комнат",
        col_1="Санузел",
        col_2="Балкон/лоджия",
    ).copy()

    data_clean["Количество лифтов"].fillna(0, inplace=True)
    data_clean["Количество лифтов"] = data_clean["Количество лифтов"].astype(int)
    data_clean["Этажность здания"] = data_clean["Этажность здания"].astype(int)
    data_clean["Количество лифтов"] = data_clean.apply(
        lambda row: replace_elev(row), axis=1
    )

    data_clean["Высота потолков"] = data_clean["Высота потолков"].fillna(
        data_clean["Высота потолков"].median()
    )

    data_clean["Отделка"] = data_clean["Отделка"].fillna("Неизвестно")

    to_fill = data_clean[
        [
            "Отопление",
            "Вид из окон",
            "Ремонт",
            "Тип дома",
            "Тип перекрытий",
            "Парковка",
            "Мусоропровод",
        ]
    ]
    for column in to_fill:
        data_clean[column] = data_clean[column].fillna(data_clean[column].mode()[0])

    data_fin = anomaly_drop(data_clean)
    print(data_fin.info())
    # проверка dataset на совпадение с признаками из train
    # либо сохранение уникальных данных с признаками из train
    if flg_evaluate:
        data_fin = check_columns_evaluate(
            data=data_fin, unique_values_path=kwargs["unique_values_path"]
        )
    else:
        save_unique_train_data(
            data=data_fin,
            drop_columns=kwargs["drop_columns_unique"],
            unique_values_path=kwargs["unique_values_path"],
        )
    return data_fin


In [6]:
"""
Программа: Добавление новых признаков
Версия: 1.0
"""
import json

import pandas as pd
from geopy import distance, Yandex
from geopy.extra.rate_limiter import RateLimiter


def metro_point(data: pd.DataFrame, metro_path, flg_geocode: bool, api: str):
    """
    Поиск координат метро через геокодирование либо через json файл.
    :param api: API для поиска координат.
    :param flg_geocode: Флаг геокодирования.
    :param metro_path: Путь к словарю наименований метро.
    :param data: Датасет.
    :return:
    """
    if flg_geocode:
        geolocator = Yandex(api_key=api)
        geocode = RateLimiter(geolocator.geocode, min_delay_seconds=0.5)

        metro = dict()
        for met in data["Город и метро"].unique():
            metro[met] = geocode(met)
        data["metro"] = data["Город и метро"].map(metro)
        data["координаты метро"] = data["metro"].apply(
            lambda x: tuple(x.point) if x else None
        )
    else:
        with open(metro_path) as file:
            metro = json.load(file)
        data["координаты метро"] = data["Город и метро"].map(metro)
    data["широта метро"] = data["координаты метро"].apply(lambda x: x[0])
    data["долгота метро"] = data["координаты метро"].apply(lambda x: x[1])


def address_point(data: pd.DataFrame, address_path, flg_geocode: bool, api: str):
    """
    Поиск координат дома через геокодирование либо через json файл.
    :param api: API для поиска координат.
    :param flg_geocode: Флаг геокодирования
    :param address_path: путь к словарю адресов.
    :param data: Датасет.
    :return:
    """
    if flg_geocode:
        geolocator = Yandex(api_key=api)
        geocode = RateLimiter(geolocator.geocode, min_delay_seconds=0.5)

        address = dict()
        for i in range(0, data["Адрес"].nunique()):
            adr = data["Адрес"].unique()[i]
            address[adr] = geocode(adr, timeout=1)
        data["address"] = data["Адрес"].map(address)
        data["координаты дома"] = data["address"].apply(
            lambda x: tuple(x.point) if x else None
        )

    else:
        with open(address_path) as file:
            address = json.load(file)
        data["координаты дома"] = data["Адрес"].map(address)
    data["широта дома"] = data["координаты дома"].apply(lambda x: x[0])
    data["долгота дома"] = data["координаты дома"].apply(lambda x: x[1])


def get_distance(data: pd.DataFrame, lon_center: float, lat_center: float):
    """
    Добавление дистанции до метро и до центра.
    :param lat_center: Широта центра Москвы
    :param lon_center: долгота центра Москвы
    :param data: датасет
    :return:
    """
    data["Расстояние до центра"] = data[["широта дома", "долгота дома"]].apply(
        lambda x: distance.distance((x[0], x[1]), (lat_center, lon_center)).km, axis=1
    )
    data["Расстояние до метро"] = data[
        ["широта дома", "долгота дома", "широта метро", "долгота метро"]
    ].apply(lambda x: distance.distance((x[0], x[1]), (x[2], x[3])).km, axis=1)


def get_district(data: pd.DataFrame):
    """
    Добавление признака Округ.
    :param data: Датасет
    """
    districts = [
        "ЦАО",
        "ЮАО",
        "ЮЗАО",
        "ЮВАО",
        "ЗАО",
        "СВАО",
        "ВАО",
        "САО",
        "СЗАО",
        "НАО (Новомосковский)",
        "ЗелАО",
    ]
    data["district"] = data["Адрес"].str.split(", ", expand=True)[1]

    district = dict()
    for i in range(0, len(data["Метро"])):
        if i not in district.keys():
            if data["district"][i] in districts:
                district[data["Метро"][i]] = data["district"][i]
    data["Округ"] = data["Метро"].map(district)

    for ind, dis in enumerate(data["Округ"]):
        if dis not in districts:
            data["Округ"][ind] = "Неизвестно"

    data["Округ"] = data["Округ"].str.split(" ", expand=True)[0]


def removing_excess(data: pd.DataFrame, flg_geocode: bool) -> pd.DataFrame:
    """
    Удаление лишних колонок.
    :param flg_geocode: Флаг геокодирования.
    :param data: Датасет
    :return: новый датасет
    """
    data_clean = data.drop(
        ["Город и метро", "координаты метро", "координаты дома", "district"], axis=1
    )
    if flg_geocode:
        data_clean = data_clean.drop(["metro", "address"], axis=1)
    return data_clean


def pipeline_geo(
    data: pd.DataFrame,
    flg_parse: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """
    Пайплайн геокодирования.
    :param flg_parse: Включение геокодирования, в случае предсказания по введенным данным.
    :param data: Датасет
    :param kwargs:
    :return: новый датасет
    """
    data["Город и метро"] = "Москва, метро " + data["Метро"]
    metro_point(
        data=data,
        metro_path=kwargs["metro_path"],
        flg_geocode=flg_parse,
        api=kwargs["API_key"],
    )
    address_point(
        data=data,
        address_path=kwargs["address_path"],
        flg_geocode=flg_parse,
        api=kwargs["API_key"],
    )
    get_distance(
        data=data,
        lon_center=kwargs["lon_center"],
        lat_center=kwargs["lat_center"],
    )
    get_district(data)
    df = removing_excess(data, flg_geocode=flg_parse)
    return df


In [7]:
"""
Программа: Преобразование признаков.
Версия: 1.0
"""

import numpy as np
import pandas as pd
from scipy.stats import stats
import re
from transliterate import translit


def outliers(data: pd.DataFrame) -> pd.DataFrame:
    """
    Поиск и удаление выбросов по нижней границе.
    :param data: Датасет
    :return: Новый датасет
    """
    targ = data["Цена за квадрат лог"].copy()
    q1 = targ.quantile(q=0.25)
    iqr = stats.iqr(targ)
    lower_bound = q1 - (1.5 * iqr)
    df_clean = data[data["Цена за квадрат лог"] > lower_bound]
    df_clean.reset_index(drop=True, inplace=True)
    return df_clean


def translate(data: pd.DataFrame):
    """
    Перевод названий столбцов на английский язык.
    :param data: Датасет
    :return: новый датасет
    """
    cols_translit = [
        translit(x, language_code="ru", reversed=True).replace(" ", "_")
        for x in data.columns
    ]
    data.columns = cols_translit
    data_new = data.rename(columns=lambda x: re.sub("[^A-Za-z0-9_]+", "", x))
    return data_new


def drop_features(data: pd.DataFrame, drop_cols: list):
    """
    Удаление признаков и преобразование типа object в category
    :param drop_cols: Список с названиями колонок
    :param data: датасет
    :return: новый датасет
    """

    data_clean = data.drop(columns=drop_cols)
    cat_cols = data_clean.select_dtypes("object").columns
    data_clean[cat_cols] = data_clean[cat_cols].astype("category")
    return data_clean


def pipeline_features(data: pd.DataFrame, flg_evaluate: bool = True, **kwargs):
    """
    Пайплайн преобразования признаков.
    :param flg_evaluate:
    :param data: Датасет
    :param kwargs:
    :return:
    """
    if not flg_evaluate:
        # Логарифмирование целевого признака
        data["Цена за квадрат лог"] = np.log(data["Цена за квадрат"])

        # Работа с выбросами
        data = outliers(data)

        data = drop_features(data=data, drop_cols=kwargs["drop_columns_unique"])

    # Перевод русских названий
    data = translate(data)
    # Удаление колонок для обучения
    data = drop_features(data=data, drop_cols=kwargs["drop_columns"])

    return data


In [8]:
dataset = pipeline_preprocess(data=data_test, **training)   

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 949 entries, 0 to 948
Data columns (total 23 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Адрес              949 non-null    object 
 1   Метро              949 non-null    object 
 2   Время до метро     949 non-null    int64  
 3   Общая площадь      949 non-null    float64
 4   Жилая площадь      949 non-null    float64
 5   Площадь кухни      949 non-null    float64
 6   Этаж               949 non-null    int64  
 7   Год постройки      949 non-null    int32  
 8   Тип жилья          949 non-null    object 
 9   Высота потолков    949 non-null    float64
 10  Санузел            949 non-null    int32  
 11  Вид из окон        949 non-null    object 
 12  Ремонт             949 non-null    object 
 13  Мусоропровод       949 non-null    object 
 14  Количество лифтов  949 non-null    int64  
 15  Тип дома           949 non-null    object 
 16  Тип перекрытий     949 non

In [9]:
dataset = pipeline_geo(data=dataset, flg_parse=False, **preproc)
dataset = pipeline_features(data=dataset, flg_evaluate=True, **training)

# Evaluate

In [10]:
model = joblib.load(os.path.join(training["model_path"]))
dataset['predict'] = np.exp(model.predict(dataset))

In [11]:
dataset[:5]

Unnamed: 0,Vremja_do_metro,Obschaja_ploschad,Zhilaja_ploschad,Ploschad_kuhni,Etazh,God_postrojki,Tip_zhilja,Vysota_potolkov,Sanuzel,Vid_iz_okon,...,Balkonlodzhija,Otdelka,Etazhnost_zdanija,Kolvo_komnat,shirota_doma,dolgota_doma,Rasstojanie_do_tsentra,Rasstojanie_do_metro,Okrug,predict
0,14,42.3,17.0,14.0,7,2008,Вторичка,2.8,1,Во двор,...,2,Неизвестно,17,1,55.872627,37.561643,13.765905,1.068552,СВАО,349191.957821
1,8,66.7,35.1,13.3,6,2024,Новостройка,2.8,2,На улицу,...,1,Чистовая,20,3,55.56136,37.598528,21.449414,1.611759,Неизвестно,203622.177003
2,9,64.9,41.2,11.682,4,1965,Вторичка,2.7,1,На улицу и двор,...,1,Неизвестно,12,2,55.782198,37.729745,7.530113,0.657166,ВАО,307711.158602
3,20,314.0,212.0,59.3,24,2010,Вторичка,3.0,4,На улицу,...,2,Неизвестно,25,5,55.819995,37.463305,12.351776,1.243216,СЗАО,371448.259886
4,7,87.0,49.59,15.66,3,2017,Вторичка,2.7,1,Во двор,...,1,Неизвестно,27,3,55.842772,37.371542,18.531579,0.673761,СЗАО,310081.839962
