# Функции

In [None]:
def rule_project_type(t):
  if t == 'Типовой':
    return "типовому"
  elif t == 'Индивидуальный':
    return "индивидуальному"
  return t

def rule_round(t):
  if np.isnan(t):
    return 'null'
  return round(t)

def rule_binary(t):
  return "есть" if t else "отсутствует"

def rule_no(t):
  return "не " if t else ""

def rule_name(t):
  return "не " if not t=="Жилой дом" else ""

def rule_point(t):
  return (t.x, t.y)

rules = {
        'project_type': rule_project_type,
        'building_area': rule_round,
        'living_area': rule_round,
        'storeys_count': rule_round,
        'resident_number': rule_round,
        'population_balanced': rule_round,
        'central_heating': rule_binary,
        'central_hot_water': rule_binary,
        'central_water': rule_binary,
        'central_electricity': rule_binary,
        'central_gas': rule_binary,
        'refusechute': rule_binary,
        'failure': rule_no,
        'lift_count': rule_round,
        'name': rule_name,
        'building_year': rule_round,
        'geometry': rule_point
    }

In [None]:
def clean_data(data):
  return data.dropna(subset=['address'])

In [None]:
import numpy as np

def process_target(target_name, data_row, rules):
  if data_row[target_name] is not None:
    target_value = data_row[target_name]
    if target_name in rules:
      target_value = rules[target_name](target_value)
  else:
    target_value = 'null'
  return target_value

In [None]:
from shapely.geometry import Point

def process_true_target(target_name, data_row):
  target_value = data_row[target_name]
  if target_value is None:
    return 'null'
  if isinstance(target_value, float) and np.isnan(target_value):
    return'null'
  if isinstance(target_value, Point):
    return (target_value.x, target_value.y)
  return target_value

In [None]:
def generate_query_and_response(query_template, response_template, target_name, data_row, rules):
  building_id = data_row['id']
  address = data_row['address']
  query = query_template.format(address=address)
  target_value = process_target(target_name, data_row, rules)

  if target_value == 'null':
    response = f'Для дома по адресу "{address}" данная информация отсутствует'
  else:
    response = response_template.format(address=data_row['address'], target=target_value)
  return building_id, query, response, process_true_target(target_name, data_row)

# Обзор датасетов

In [None]:
import pandas as pd

# шаблоны вопросов

templates = pd.read_csv('templates.tsv', sep='\t')
templates.head(5)

Unnamed: 0,query_template,target_name,response_template
0,Какой идентификатор физического объекта у дома...,id,"Идентификатор дома по адресу ""{address}"" – {ta..."
1,"В каком районе находится дом по адресу ""{addre...",administrative_unit,"Дом по адресу ""{address}"" находится в районе ""..."
2,В каком муниципальном образовании находится до...,municipality,"Дом по адресу ""{address}"" находится в муниципа..."
3,"Какой тип проекта застройки дома по адресу ""{a...",project_type,"Дом по адресу ""{address}"" построен по {target}..."
4,"Какая площадь основания здания по адресу ""{add...",building_area,"Площадь основания здания по адресу ""{address}""..."


In [None]:
# загружаем и чистим данные от пустых адресов

import geopandas as gpd

filepath = 'spb_buildings.geojson'
# filepath = '/content/buildings_part_0.geojson'
data = gpd.read_file(filepath)
print(len(data))

cleaned_data = clean_data(data)
print(len(cleaned_data))

89163
78172


In [None]:
data_row = cleaned_data.iloc[0]
query_template, target_name, response_template = templates.iloc[-1]

generate_query_and_response(query_template, response_template, target_name, data_row, rules)

(112224,
 'Каковы геометрические координаты дома по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14"?',
 'Геометрические координаты дома по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14" – (30.410393, 59.929312).',
 (30.410393, 59.929312))

In [None]:
# проверяем, что для каждого шаблона генерируется правильный ответ

data_row = data.iloc[0]
for index, row in templates.iterrows():
  query_template = row['query_template']
  target = row['target_name']
  response_template = row['response_template']
  try:
    print(generate_query_and_response(query_template, response_template, target, data_row, rules))
  except KeyError:
    print(f'Ошибка в запросе {target}')


(112224, 'Какой идентификатор физического объекта у дома по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14"?', 'Идентификатор дома по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14" – 112224.', 112224)
(112224, 'В каком районе находится дом по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14"?', 'Дом по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14" находится в районе "Красногвардейский".', 'Красногвардейский')
(112224, 'В каком муниципальном образовании находится дом по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14"?', 'Дом по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14" находится в муниципальном образовании Малая Охта.', 'Малая Охта')
(112224, 'Какой тип проекта застройки дома по адресу "Санкт-Петербург, Новочеркасский проспект, 41, Заневский проспект, 14"?', 'Дом по адресу "Санкт-Петербург, Новочеркасск

# Генерируем датасет и разбиваем на батчи

In [None]:
import random

def calculate_range(n_batch, batch_size=10):
    start = n_batch * batch_size
    end = start + batch_size
    return start, end

def generate_dataset(data, templates, rules, n_batch, batch_size=10):
  # data_rows = random.sample(range(len(data)), n)
  start, end = calculate_range(n_batch, batch_size)
  data_rows = range(start, end)
  dataset = {}
  for data_id in data_rows:
    data_row = data.iloc[data_id]
    for index, row in templates.iterrows():
      key = f'{data_id}_{index}'
      query_template = row['query_template']
      response_template = row['response_template']
      target = row['target_name']
      building_id, query, response, target_value = generate_query_and_response(query_template, response_template, target, data_row, rules)
      dataset[key] = {
          'building_id': building_id,
          'query': query,
          'response': response,
          'target_value': target_value
      }
  return dataset

In [None]:
import json

def numpy_to_python(o):
    """ Конвертирует объекты NumPy в стандартные типы Python для сериализации в JSON. """
    if isinstance(o, np.float64):
        return float(o)
    elif isinstance(o, np.integer):
        return int(o)
    return o

def save_to_json(dataset, n_batch):
  with open(f'datasets/data_{n_batch}.json', 'w', encoding='utf-8') as f:
    json.dump(dataset, f, ensure_ascii=False, indent=4, default=numpy_to_python)

In [None]:
import os

os.mkdir('buildings')
os.mkdir('datasets')

In [None]:
batch_size = 10   # количество домов в файле
total_parts = len(cleaned_data) // batch_size
for n in range(total_parts):
  dataset = generate_dataset(cleaned_data, templates, rules, n_batch=n, batch_size=batch_size)
  save_to_json(dataset, n)
  start, end = calculate_range(n)
  building_chunk = cleaned_data.iloc[start:end]
  building_chunk.to_file(f"buildings/buildings_part_{n}.geojson", driver='GeoJSON')

In [None]:
# пишем в zip

import zipfile
import os

def zipdir(path, ziph):
    for root, dirs, files in os.walk(path):
        for file in files:
            relative_path = os.path.relpath(os.path.join(root, file), os.path.join(path, '..'))
            ziph.write(os.path.join(root, file), arcname=relative_path)

def create_zip_archive(folder_paths, output_zip_file):
    with zipfile.ZipFile(output_zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for folder_path in folder_paths:
            zipdir(folder_path, zipf)
    print(f"Created zip archive '{output_zip_file}' containing the folders.")

folder_paths = ['buildings', 'datasets']  # Замените на свои пути к папкам
output_zip_file = f'data_{batch_size}.zip'
create_zip_archive(folder_paths, output_zip_file)


Created zip archive 'data_10.zip' containing the folders.


In [None]:
import shutil

shutil.rmtree('datasets')
shutil.rmtree('buildings')