In [3]:
import os
import json
import random

import pandas as pd
import numpy as np

from typing import Tuple

In [4]:
def set_all_seeds(seed=42):

    # python's seeds
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

DEFAULT_RANDOM_SEED = 21
set_all_seeds(seed=DEFAULT_RANDOM_SEED)

In [7]:
TRAIN_DATASET_PATH = "data/train.csv"
TEST_DATASET_PATH  = "data/test.csv"

In [None]:
df = pd.read_csv(TRAIN_DATASET_PATH, encoding='windows-1251')
test = pd.read_csv(TEST_DATASET_PATH, encoding='windows-1251')

In [1]:
# region_type = np.unique(df.region_type.dropna())

# keys_to_types = {}
# second_stage = {}
# for r_t in region_type:
#     data = df[df.region_type == r_t].address.apply(lambda x: x.split("; ")[0].replace('"', '').split(" ")[0])
#     data1 = df[df.region_type == r_t].address.apply(lambda x: x.split("; ")[0].replace('"', '').split(" ")[1])
#     u = np.unique(data)
#     u1 = np.unique(data1)
#     for key in u:
#         if key in keys_to_types:
#             for key1 in u1:
#                 if key1 in second_stage:
#                     print(key)
#                     print(key1)
#                     print(r_t)
#                 else:
#                     second_stage[key1] = r_t
#         else:
#             keys_to_types[key] = r_t
#     # print(u)
#     # print(u1)
#     # print(r_t)
#     # print("-"*80)

In [4]:
class address_model():
    def __init__(
        self,
        region_to_type_json_path       : str,
        region_types_json_path         : str,
        
        municipality_to_type_json_path : str,
        municipality_types_json_path   : str,
        
        settlement_to_type_json_path   : str,
        settlement_types_json_path     : str,
        
        location_to_type_json_path     : str,
        location_types_json_path       : str,
        
        street_to_type_json_path       : str,
        street_types_json_path         : str,
        
        settlements_for_poselenie      : str
        
    ):
        with open(region_to_type_json_path, 'r') as f:
            self.region_to_type = json.load(f)
        self.region_to_type_keys = np.array(list(self.region_to_type.keys()))
        with open(region_types_json_path, 'r') as f:
            self.region_types = json.load(f)          
        
        with open(municipality_to_type_json_path, 'r') as f:
            self.municipality_to_type = json.load(f)
        self.municipality_to_type_keys = np.array(list(self.municipality_to_type.keys()))
        with open(municipality_types_json_path, 'r') as f:
            self.municipality_types = json.load(f)        
        
        with open(settlement_to_type_json_path, 'r') as f:
            self.settlement_to_type = json.load(f)
        self.settlement_to_type_keys = np.array(list(self.settlement_to_type.keys()))
        self.settlement_to_type_values = np.unique(list(self.settlement_to_type.values()))
        with open(settlement_types_json_path, 'r') as f:
            self.settlement_types = json.load(f)
        
        with open(location_to_type_json_path, 'r') as f:
            self.location_to_type = json.load(f)
        self.location_to_type_keys = np.array(list(self.location_to_type.keys()))
        self.location_to_type_values = np.unique(list(self.location_to_type.values()))
        with open(location_types_json_path, 'r') as f:
            self.location_types = json.load(f)
        
        with open(street_to_type_json_path, 'r') as f:
            self.street_to_type = json.load(f)
        self.street_to_type_keys = np.array(list(self.street_to_type.keys()))
        self.street_to_type_values = np.unique(list(self.street_to_type.values()))
        with open(street_types_json_path, 'r') as f:
            self.street_types = json.load(f)

        self.lev_ratio = np.vectorize(self.levenshtein_distance_ratio)
        
        with open(settlements_for_poselenie, 'r') as f:
            self.settlements_for_poselenie = json.load(f)
        

    def levenshtein_distance_ratio(self, string1, string2) -> float:
        """
        Returns the Levenshtein distance ratio between two strings string1 and
        string2, a float in the range [0.0, 1.0]. 1.0 meaning the strings are
        identical. See https://en.wikipedia.org/wiki/Levenshtein_distance
        """
        size_x = len(string1) + 1
        size_y = len(string2) + 1
        matrix = np.zeros((size_x, size_y), dtype=float)

        for x in range(size_x):
            matrix[x][0] = float(x)
        for y in range(size_y):
            matrix[0][y] = float(y)

        for x in range(1, size_x):
            for y in range(1, size_y):
                if string1[x - 1] == string2[y - 1]:
                    matrix[x][y] = min(
                        matrix[x - 1][y] + 1,
                        matrix[x - 1][y - 1],
                        matrix[x][y - 1] + 1
                    )
                else:
                    matrix[x][y] = min(
                        matrix[x - 1][y] + 1,
                        matrix[x - 1][y - 1] + 1,
                        matrix[x][y - 1] + 1
                    )
        return ((len(string1) + len(string2) - matrix[size_x - 1][size_y - 1]) / (len(string1) + len(string2)))
            
    
    '''
    region columns prediction
    '''
    def region_forward(self, region_data : str) -> Tuple[str]:
        region_type, start = self.get_region_type(region_data)
        region = " ".join(region_data.split(' ')[start:])        
        return region, region_type
    
 
    
    def get_region_type(self, region_data : str) -> str:
        region_0 = region_data.split(" ")[0]
        region_1 = region_data.split(" ")[1]

        region_types = self.region_types
        if region_0 in region_types:
            if type(region_types[region_0]) == str:
                return region_types[region_0], 1
        else:
            print(f'error region : {region_data}')
            return '', 0
    
    
    '''
    municipality columns prediction
    '''
    def municipality_forward(self, municipality_data : str) -> Tuple[str]:
        if len(municipality_data) == 1:
            return '', ''
        municipality_type, start = self.get_municipality_type(municipality_data)
        municipality = " ".join(municipality_data.split(' ')[start:])
        return municipality, municipality_type
    
    def get_municipality_type(self, municipality_data : str) -> str:
        municipality_0 = municipality_data.split(" ")[0]
        municipality_1 = municipality_data.split(" ")[1]

        municipality_types = self.municipality_types
        if municipality_0 in municipality_types:
            if type(municipality_types[municipality_0]) == str:
                if "Красная Горбатка" in municipality_data:
                    return "поселок", 1
                return municipality_types[municipality_0], 1
        else:
            print(f'error municipality : {municipality_data}')
            return '', 0
        
    '''
    settlement columns prediction
    '''
    def settlement_forward(self, settlement_data : str, source : int, region : str, municipality : str) -> Tuple[str]:
        if len(settlement_data) == 1:
            return '', ''
        settlement_type, start = self.get_settlement_type(settlement_data, source, region, municipality)
        settlement = " ".join(settlement_data.split(' ')[start:])
        return settlement, settlement_type

    def get_settlement_type(self, settlement_data : str, source : int, region : str, municipality : str) -> str:
        set_0 = settlement_data.split(" ")[0]
        set_1 = settlement_data.split(" ")[1]
        if set_0 == 'п.' and set_1 == 'ж/д' and settlement_data.split(" ")[2] in ('ст', 'ст.'):
            return 'поселок при железнодорожной станции', 3
        if set_0 == "железнодорожный" and set_1 == "остановочный":
            return "железнодорожный остановочный пункт", 3
        if set_0 in ('жд', 'ж/д') and set_1 in ('станция', 'ст', 'ст.'):
            return "железнодорожная станция", 2
        if set_0 == 'ж/д' and set_1 in ('оп', 'о.п.'):
            return 'ж/д останов. (обгонный) пункт', 2
        if set_0 in ("пос", "пос.", "п", "п."):
            if region == "Москва":
                if municipality == "":
                    if source == 1:
                        if " ".join(settlement_data.split(' ')[1:]) in self.settlements_for_poselenie:
                            return "поселение", 1
                
        settlement_types = self.settlement_types
        if set_0 in settlement_types:
            if type(settlement_types[set_0]) == str:
                if settlement_types[set_0] == 'жилой район':
                    if set_1 == 'р-н' or set_1 == 'р-н.':
                        return settlement_types[set_0], 2
                return settlement_types[set_0], 1
            else:
                if set_1 in settlement_types[set_0]: 
                    return settlement_types[set_0][set_1], 2
                else:
                    return settlement_types[set_0][set_0], 1
                
        if 'казарма' in settlement_data and 'км' in settlement_data:
            return 'населенный пункт', 1
        else:
            print(f'error settlement : {settlement_data}')
            return '', 0
    
    '''
    location columns prdiction
    '''
    def location_forward(self, location_data : str, source : int) -> Tuple[str]:
        if len(location_data) == 1:
            return '', ''
        location_type, start = self.get_location_type(location_data, source)
        location = " ".join(location_data.split(' ')[start:])
        return location, location_type
    
    def get_location_type(self, location_data : str, source : int) -> str:
        location_0 = location_data.split(" ")[0]
        location_1 = location_data.split(" ")[1]
        if location_0 == "дачный" and location_1 == "населенный":
            return "дачный населенный пункт", 3
        if location_0 == "площадь" and location_1 == "Мира":
            return "площадь", 1
        if location_0 == "садовое" and location_1 == "товарищество":
            return "садовое товарищество", 2
        if location_0 == "территория" and location_1 == "снт":
            if source == 1:
                return "территория снт", 2
            return "территория", 1
        if location_0 == "территория" and location_1 == "тсн":
            if source == 1:
                return "территория тсн", 2
            return "территория", 2
        if location_0 == "садовое" and location_1 == "товарищество":
            return "садовое товарищество", 2 
        if location_1 == "б-ка" or location_1 == "будка":
            return "железнодорожная будка", 2
        if location_0 == "м.":
            if source == 1:
                return "массив", 1
            return "местечко", 1
        if location_0 in ('жд', 'ж/д') and location_1 in ('станция', 'ст', 'ст.'):
            return "железнодорожная станция", 2
        if location_1 == "промышленная" and location_1 == "зона":
            return "промышленная зона", 2
        location_types = self.location_types
        if location_0 in location_types:
            if type(location_types[location_0]) == str:
                if location_types[location_0] == "жилой район" and (location_1 == 'р-н' or location_1 == 'р-н.'):
                    return location_types[location_0], 2
                return location_types[location_0], 1
            else:
                if location_1 in location_types[location_0]: 
                    return location_types[location_0][location_1], 2
                elif location_0 == 'нп' or location_0 == 'нп.':
                    return location_types[location_0][str(source)], 1
                else:
                    return location_types[location_0][location_0], 1
        else:
            print(f'error location : {location_data}')
            return '', 0
    
    '''
    street columns prediction
    '''
    def street_forward(self, street_data : str) -> Tuple[str]:
        if len(street_data) == 1:
            return '', ''
        street_type, start = self.get_street_type(street_data)
        street = " ".join(street_data.split(' ')[start:])
        return street, street_type
    
    def get_street_type(self, street_data : str) -> str:
        street_0 = street_data.split(" ")[0]
        street_1 = street_data.split(" ")[1]
        street_types = self.street_types
        if street_0 in street_types:
            if type(street_types[street_0]) == str:
                return street_types[street_0], 1
            else:
                if street_1 in street_types[street_0]: 
                    return street_types[street_0][street_1], 2
                else:
                    return street_types[street_0][street_0], 1
        else:
            if street_0 == "сзд" and street_data.split(" ")[-1] == "Главная":
                return "съезд", 3
            print(f'error street : {street_data}')
            return '', 0

    '''
    house prediction
    '''
    def house_forward(self, house_data : str) -> str:
        if len(house_data) == 0:
            return ''
        return house_data
    
    '''
    source predicton
    '''
    def source_forward(self, source_data : str) -> int:
        return int(source_data)
        
    def forward(self, adress : str) -> Tuple[str, int]:
        data = adress.split("; ")
        region_data       = data[0]
        municipality_data = data[1]
        settlement_data   = data[2]
        location_data     = data[3]
        street_data       = data[4]
        house_data        = data[5]
        source_data       = data[6]
        
        source                          = self.source_forward(source_data)
        region, region_type             = self.region_forward(region_data)
        municipality, municipality_type = self.municipality_forward(municipality_data)
        settlement, settlement_type     = self.settlement_forward(settlement_data, source, region, municipality)
        location, location_type         = self.location_forward(location_data, source)
        street, street_type             = self.street_forward(street_data)
        house                           = self.house_forward(house_data)
 
        return (region, region_type,
                municipality, municipality_type,
                settlement, settlement_type,
                location, location_type,
                street, street_type,
                house, source)

In [5]:
model = address_model('data/region_to_type.json',
                      'data/region_types.json',
                      
                      'data/municipality_to_type.json',
                      'data/municipality_types.json',
                      
                      'data/settlement_to_type.json',
                      'data/settlement_types.json',
                      
                      'data/location_to_type.json',
                      'data/location_types.json',
                      
                      'data/street_to_type.json',
                      'data/street_types.json',
                     
                      'data/settlements_for_poselenie.json')

In [93]:
%%time
addresses = test.address[:2000000]
preds_1 = []
for i, address in enumerate(addresses):
    pred = model.forward(address)
    preds_1.append(pred)
    
addresses = test.address[2000000:4000000]
preds_2 = []
for i, address in enumerate(addresses):
    pred = model.forward(address)
    preds_2.append(pred)
    
addresses = test.address[4000000:]
preds_3 = []
for i, address in enumerate(addresses):
    pred = model.forward(address)
    preds_3.append(pred)
    
cols = df.drop(['address'], axis=1).columns

data_1 = np.array(preds_1).T
my_1 = pd.DataFrame(data_1, cols)
my_1 = my_1.T

data_2 = np.array(preds_2).T
my_2 = pd.DataFrame(data_2, cols)
my_2 = my_2.T

data_3 = np.array(preds_3).T
my_3 = pd.DataFrame(data_3, cols)
my_3 = my_3.T

test_pred = pd.concat([my_1, my_2, my_3])

assert len(test) == len(test_pred)

test_pred.to_csv(r"data/submission_pt6.csv", index=False, encoding='windows-1251')