In [2]:
import polars as pl 
import os 
from typing import List, Dict, Set, Optional
import time

In [3]:
data_dir = "./data"

def load_data(data_dir: str) -> Dict[str, pl.DataFrame]:
    data = {}
    tables = [
            "ProductionUnit",
            "Specification",
            "SpecificationSerialRange",
            "CatalogTreeNode",
            "CatalogTreeNode_CatalogTreeNode",
            "VinLockCache",
            "ServicePartsPage",
            "Section",
            "Specification_ProductionUnit"
    ]

    for table in tables:
        file_path = os.path.join(data_dir, f"{table}.csv") # Используйте f-строку для удобства
        if os.path.exists(file_path):
            print(f"Loading {file_path}...") # Можно добавить лог для отладки
            # Use infer_schema_length=10000 to better detect column types
            data[table] = pl.scan_csv(file_path, infer_schema_length=10000)
        else:
            print(f"File not found: {file_path}") # Добавьте сообщение, если файл отсутствует


    return data 

data = load_data(data_dir)

Loading ./data\ProductionUnit.csv...
Loading ./data\Specification.csv...
Loading ./data\SpecificationSerialRange.csv...
Loading ./data\CatalogTreeNode.csv...
Loading ./data\CatalogTreeNode_CatalogTreeNode.csv...
Loading ./data\VinLockCache.csv...
Loading ./data\ServicePartsPage.csv...
Loading ./data\Section.csv...
Loading ./data\Specification_ProductionUnit.csv...


In [4]:

class PartsSearch:
   def __init__(self, data: Dict[str, pl.LazyFrame]):
      self.data = data
      self.catalog_tree_nodes = data["CatalogTreeNode"].collect()
      self.catalog_tree_relations = data["CatalogTreeNode_CatalogTreeNode"].collect()
      print(f"Loaded LazyFrames for {len(data)} tables")
      for name, lazy_df in data.items():
         row_count = lazy_df.select(pl.count()).collect().item()
         print(f"  - {name}: {row_count} rows")

   def find_specifications_by_production_unit(self, production_unit_id: str) -> pl.DataFrame:
      spec_ids = self.data["Specification_ProductionUnit"].filter(
          pl.col("ProductionUnitID") == production_unit_id
      ).select("SpecificationID").collect()
   
      if len(spec_ids) == 0:
         return pl.DataFrame()
      return self.data["Specification"].filter(
         pl.col("SpecificationID").is_in(spec_ids["SpecificationID"])
      ).collect()

   def find_specifications_by_serial_range(self, serial_number: str) -> pl.DataFrame:
      matching_ranges = self.data["SpecificationSerialRange"].filter(
          (pl.col("RangeStart") <= serial_number) & 
          (pl.col("RangeEnd") >= serial_number)
      ).select("SpecificationID").collect() # No

      if len(matching_ranges) == 0:
            return pl.DataFrame()
      return self.data["Specification"].filter(
         pl.col("SpecificationID").is_in(matching_ranges["SpecificationID"])
      ).collect()

   def find_catalog_nodes_by_serial(self, serial_number: str) -> pl.DataFrame:
      node_ids = self.data["VinLockCache"].filter(
          str(pl.col("SerialNumber")) == serial_number
      ).select("CatalogTreeNodeID").collect()
      
      if len(node_ids) == 0:
         return pl.DataFrame()
      return self.catalog_tree_nodes.filter(
         pl.col("CatalogTreeNodeID").is_in(node_ids["CatalogTreeNodeID"])
      )
   
   def find_sections_by_nodes(self, catalog_nodes: pl.DataFrame) -> pl.DataFrame:
      if len(catalog_nodes) == 0 or "SectionID" not in catalog_nodes.columns:
         return pl.DataFrame()
      section_ids = catalog_nodes.filter(
         pl.col("NodeType") == "Section"
      ).select("SectionID").collect()
      if len(section_ids) == 0:
         return pl.DataFrame()
      return self.data["Section"].filter(
         pl.col("SectionID").is_in(section_ids["SectionID"])
      ).collect()

   def find_service_parts_pages(self,
                                  serial_number: str, 
                                  catalog_nodes: pl.DataFrame) -> pl.DataFrame:  
      if len(catalog_nodes) == 0:
         return pl.DataFrame()
      
      direct_pages = catalog_nodes.filter(
         pl.col("NodeType") == "ServicePartsPage"
      )
      service_page_ids = set()
      if len(direct_pages) > 0 and "BaseObjectID" in direct_pages.columns:
         service_page_ids.update(direct_pages["BaseObjectID"].to_list())
      node_ids = catalog_nodes["CatalogTreeNodeID"].to_list()
      print(type(node_ids))
      child_pages = self._find_service_parts_pages_recursive(node_ids, set(node_ids))
      if len(child_pages) > 0 and "BaseObjectID" in child_pages.columns:
          service_page_ids.update(child_pages["BaseObjectID"].to_list())

      if not service_page_ids:
         return pl.DataFrame()
      
      return self.data["ServicePartsPage"].filter(
            pl.col("ServicePartsPageID").is_in(list(service_page_ids))
         ).collect()

   def _find_service_parts_pages_recursive(self,
                                          parent_ids: List[str],
                                          visited: Set[str]) -> pl.DataFrame:
      if not parent_ids:
            return pl.DataFrame()
      child_relations = self.catalog_tree_relations.filter(
            pl.col("Parents_CatalogTreeNodeID").is_in(parent_ids)
      )

      if len(child_relations) == 0:
            return pl.DataFrame()

      child_ids = [id for id in child_relations["Children_CatalogTreeNodeID"].to_list() if id not in visited]

      if not child_ids:
            return pl.DataFrame()

      visited.update(child_ids)

      child_nodes = self.catalog_tree_nodes.filter(
            pl.col("CatalogTreeNodeID").is_in(child_ids)
      )
        
      service_pages = child_nodes.filter(
            pl.col("NodeType") == "ServicePartsPage"
      )
          
      deeper_pages = self._find_service_parts_pages_recursive(child_ids, visited)
 
      if len(deeper_pages) > 0:
            if len(service_pages) > 0:
                  return pl.concat([service_pages, deeper_pages])
            return deeper_pages
      return service_pages

   def process_serial_number(self, serial_number: str) -> Dict:
      result = {
                "serialNumber": serial_number,
                "productionUnit": None,
                "specifications": [],
                "sections": [],
                "servicePartsPages": []
         }

      production_unit = self.data["ProductionUnit"].filter(
               pl.col("SerialNumber") == serial_number
         ).collect()

      if len(production_unit) == 0:
            return result
         
      result["productionUnit"] = production_unit[0].to_dict()
      production_unit_id = production_unit[0, "ProductionUnitID"]

      specifications = self.find_specifications_by_production_unit(production_unit_id)
 
      if len(specifications) == 0:
            specifications = self.find_specifications_by_serial_range(serial_number)

      if len(specifications) > 0:
            result["specifications"] = specifications.rows(named=True)
      catalog_nodes = self.find_catalog_nodes_by_serial(serial_number)
      sections = self.find_sections_by_nodes(catalog_nodes)
      if len(sections) > 0:
            result["sections"] = sections.rows(named=True)

      service_pages = self.find_service_parts_pages(serial_number, catalog_nodes)
      if len(service_pages) > 0:
                result["servicePartsPages"] = service_pages.rows(named=True)

      return result

In [5]:
search = PartsSearch(data)
serial_numbers = data["ProductionUnit"].select("SerialNumber").collect().to_series().to_list()
serial_numbers = serial_numbers[:1000] 
print(f"Processing {len(serial_numbers)} serial numbers...")


Loaded LazyFrames for 9 tables
  - ProductionUnit: 1039625 rows
  - Specification: 4491 rows
  - SpecificationSerialRange: 6586 rows


  row_count = lazy_df.select(pl.count()).collect().item()


  - CatalogTreeNode: 1221033 rows
  - CatalogTreeNode_CatalogTreeNode: 1223582 rows
  - VinLockCache: 66052 rows
  - ServicePartsPage: 271287 rows
  - Section: 24009 rows
  - Specification_ProductionUnit: 924458 rows
Processing 1000 serial numbers...


In [6]:
start_time = time.time()
results = []
for i, serial_number in enumerate(serial_numbers):
    result = search.process_serial_number(serial_number)
    results.append(result)

    if (i + 1) % 10 == 0:
        elapsed = time.time() - start_time
        print(f"Processed {i + 1}/{len(serial_numbers)} serial numbers ({elapsed:.2f} seconds)")
elapsed = time.time() - start_time
print(f"Finished processing {len(serial_numbers)} serial numbers in {elapsed:.2f} seconds")

Processed 10/1000 serial numbers (1.57 seconds)
Processed 20/1000 serial numbers (3.26 seconds)
Processed 30/1000 serial numbers (4.80 seconds)
Processed 40/1000 serial numbers (6.27 seconds)
Processed 50/1000 serial numbers (7.74 seconds)
Processed 60/1000 serial numbers (9.24 seconds)
Processed 70/1000 serial numbers (10.71 seconds)
Processed 80/1000 serial numbers (12.16 seconds)
Processed 90/1000 serial numbers (13.62 seconds)
Processed 100/1000 serial numbers (15.07 seconds)
Processed 110/1000 serial numbers (16.52 seconds)
Processed 120/1000 serial numbers (17.95 seconds)
Processed 130/1000 serial numbers (19.36 seconds)
Processed 140/1000 serial numbers (20.79 seconds)
Processed 150/1000 serial numbers (22.22 seconds)
Processed 160/1000 serial numbers (23.64 seconds)
Processed 170/1000 serial numbers (25.06 seconds)
Processed 180/1000 serial numbers (26.51 seconds)
Processed 190/1000 serial numbers (28.01 seconds)
Processed 200/1000 serial numbers (29.45 seconds)
Processed 210/1

In [7]:
summary_data = {
    "SerialNumber": [],
    "HasProductionUnit": [],
    "SpecificationCount": [],
    "SectionCount": [],
    "ServicePartsPageCount": []
    }

for result in results:
    summary_data["SerialNumber"].append(result["serialNumber"])
    summary_data["HasProductionUnit"].append(result["productionUnit"] is not None)
    summary_data["SpecificationCount"].append(len(result["specifications"]))
    summary_data["SectionCount"].append(len(result["sections"]))
    summary_data["ServicePartsPageCount"].append(len(result["servicePartsPages"]))

summary_df = pl.DataFrame(summary_data)
summary_df.head(10)

SerialNumber,HasProductionUnit,SpecificationCount,SectionCount,ServicePartsPageCount
str,bool,i64,i64,i64
"""2201439""",True,13,0,0
"""807250""",True,1,0,0
"""2276814""",True,6,0,0
"""966370""",True,1,0,0
"""2333032""",True,7,0,0
"""810230""",True,1,0,0
"""338637""",True,1,0,0
"""485420""",True,2,0,0
"""892076""",True,1,0,0
"""2341207""",True,40,0,0


In [9]:
summary_df.write_csv("parts_search_summary.csv")
print("Summary data exported to parts_search_summary.csv")
details = {
"specifications": [],
"sections": [],
"servicePartsPages": []
}

for result in results:
    serial_number = result["serialNumber"]
    for spec in result["specifications"]:
        spec["SerialNumber"] = serial_number
        details["specifications"].append(spec)

    for section in result["sections"]:
        section["SerialNumber"] = serial_number
        details["sections"].append(section)

    for page in result["servicePartsPages"]:
        page["SerialNumber"] = serial_number
        details["servicePartsPages"].append(page)

for key, items in details.items():
    if items:
        pl.DataFrame(items).write_csv(f"parts_search_{key}.csv")
        print(f"Exported {len(items)} {key} to parts_search_{key}.csv")
   

Summary data exported to parts_search_summary.csv
Exported 426 specifications to parts_search_specifications.csv


In [9]:
from typing import Dict, Any 

def extract_string_from_series_object(item: Any) -> str | None:
    """
    Извлекает строку из элемента колонки типа Object,
    который, предположительно, содержит Polars Series со строкой внутри.
    """
    # Проверяем, является ли элемент Polars Series и не пуст ли он
    if isinstance(item, pl.Series) and len(item) > 0:
        # Пытаемся получить значение из внутренней Series.
        # Предполагаем, что внутренняя Series содержит только один элемент (как в вашем head() выводе).
        inner_value = item[0]
        # Преобразуем полученное значение в строку, если оно не None
        return str(inner_value) if inner_value is not None else None
    elif item is None:
        # Обрабатываем явные значения None
        return None
    else:
        # Обрабатываем любые другие неожиданные типы данных -
        # можно вернуть None, попробовать str(), или вызвать ошибку
        # print(f"Warning: Unexpected item type in Object column: {type(item)}, value: {item}") # Можно добавить лог
        try:
             return str(item) # Попытка стандартного преобразования
        except Exception:
             return None
        
comprehensive_data = []
for result in results:
    serial_number = result["serialNumber"]
    production_unit = result["productionUnit"]
    if not result["servicePartsPages"] and not result["specifications"]:
        row = {
                "SerialNumber": serial_number,
                "ProductionUnitID": production_unit["ProductionUnitID"] if production_unit else None,
                "SpecificationID": None,
                "SectionID": None,
                "ServicePartsPageID": None
        }
        comprehensive_data.append(row)
        continue

    for page in result["servicePartsPages"]:
        for spec in result["specifications"]:
            for section in result["sections"] if result["sections"] else [None]:
                row = {
                      "SerialNumber": serial_number,
                        "ProductionUnitID": production_unit["ProductionUnitID"] if production_unit else None,
                        "SpecificationID": spec["SpecificationID"],
                        "SectionID": section["SectionID"] if section else None,
                        "ServicePartsPageID": page["ServicePartsPageID"]
                    }
                comprehensive_data.append(row)
        
        if result["specifications"] and not result["servicePartsPages"]:
            for spec in result["specifications"]:
                row = {
                    "SerialNumber": serial_number,
                    "ProductionUnitID": production_unit["ProductionUnitID"] if production_unit else None,
                    "SpecificationID": spec["SpecificationID"],
                    "SectionID": None,
                    "ServicePartsPageID": None
                }
                comprehensive_data.append(row)
    
    if comprehensive_data:
        # Убедитесь, что comprehensive_data - это список словарей,
        # где значение для ProductionUnitID - это сама строка UUID, а не Series
        # Если comprehensive_data создается таким образом, что внутри попадают Series,
        # то вам нужно исправить ЭТО место.
        # Например:
        # comprehensive_data = []
        # for ...: # Ваш цикл сбора данных
        #    row = { ... 'ProductionUnitID': actual_uuid_string, ... } # Здесь должна быть строка, а не Series
        #    comprehensive_data.append(row)


        comprehensive_df = pl.DataFrame(comprehensive_data)

        # print("Original Schema:")
        # print(comprehensive_df.schema)

        # --- ПРИМЕНЯЕМ ИСПРАВЛЕНИЕ К КОЛОНКЕ 'ProductionUnitID' ---
        try:
            # Используем map_elements для применения функции к каждому элементу колонки
            # Указываем return_dtype=pl.String, так как функция возвращает строку или None
            comprehensive_df = comprehensive_df.with_columns(
                pl.col("ProductionUnitID").map_elements(
                    extract_string_from_series_object, return_dtype=pl.String
                ).alias("ProductionUnitID") # Сохраняем исходное имя колонки
            )
            # print("\nSchema after processing 'ProductionUnitID':")
            # print(comprehensive_df.schema)

            # --- УДАЛИТЕ ЭТУ СТРОКУ! --- Вы повторно создавали DataFrame, отбрасывая исправления.
            # comprehensive_df = pl.DataFrame(comprehensive_data,) # !!! УДАЛИТЬ !!!
            # --- КОНЕЦ УДАЛЕНИЯ ---

            # --- ПРИМЕНЯЕМ ПРЕОБРАЗОВАНИЕ ТИПОВ ДЛЯ ОСТАЛЬНЫХ OBJECT-КОЛОНОК, ЕСЛИ ЕСТЬ ---
            # Пройдемся по всем колонкам после обработки ProductionUnitID
            for col_name, col_dtype in comprehensive_df.schema.items():
                if col_dtype == pl.Object:
                    # print(f"Attempting to cast remaining object column '{col_name}' to String.")
                    # # Попытка стандартного приведения для других object-колонок, если они есть
                    comprehensive_df = comprehensive_df.with_columns(
                        pl.col(col_name).cast(pl.String).alias(col_name) # Приведение и сохранение имени
                    )
                    # print(f"Column '{col_name}' schema after cast attempt: {comprehensive_df.schema[col_name]}")


            # --- ТЕПЕРЬ ЗАПИСЫВАЕМ В CSV ---
            # print("\nAttempting to write to CSV...")
            comprehensive_df.write_csv("C:/Users/user/Desktop/parts_search_comprehensive.csv")
            # print(f"Exported comprehensive dataset with {len(comprehensive_df)} rows to parts_search_comprehensive.csv")
            # print(comprehensive_df.head(10))

        except Exception as e:
            print(f"\nAn error occurred during processing or writing: {e}")
            print("Please review the data source, the extraction function, and column names.")
