<a href="https://colab.research.google.com/github/MikhailBuimov/pyspark/blob/main/laser_cutting_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [48]:
import pyspark
import pandas as pd
import re
from datetime import datetime, time, timedelta

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Imputer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import DataFrame
from pyspark.sql.functions import udf

In [49]:
# Инициализация SparkSession
spark = (
    SparkSession.builder
    .appName("Laser_cutting")
    .master("local")  # Запуск Spark на локальной машине
    .getOrCreate()
)

In [50]:
df = spark.read.option("header", True).csv('/content/drive/MyDrive/datasets/laser_cutting.csv')

In [51]:
df.show()

+--------------------+--------------------+--------+----------+----------+------+-----+----------+------------+
|        Наименование|            Материал|Операция|      hash|     Время|Ширина|Длина|Длина реза|Кол-во резов|
+--------------------+--------------------+--------+----------+----------+------+-----+----------+------------+
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|           1|
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|           1|
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|           1|
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|           1|
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|           1|
|             Круг 40| Лист х/к т. 3,0 мм.|      ЛР|42af1b9fc3|00:00:03.5|    40|   40|       126|      

Переименование колонок

In [52]:
df = (
    df.withColumnRenamed('Материал', 'Material')
      .withColumnRenamed('Время', 'Time')
      .withColumnRenamed('Ширина', 'Width')
      .withColumnRenamed('Длина', 'Length')
      .withColumnRenamed('Длина реза', 'Cut_length')
      .withColumnRenamed('Кол-во резов', 'Cut_quantity')
      .withColumnRenamed('Операция', 'Operation')
      .withColumnRenamed('Наименование', 'Part_name')
)

In [53]:
df = df.dropDuplicates(['hash'])


In [54]:
df.show(4)

+--------------------+--------------------+---------+---------+----------+-----+------+----------+------------+
|           Part_name|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|
+--------------------+--------------------+---------+---------+----------+-----+------+----------+------------+
|     B6-04-001 Рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|
|45-1.010.06 Перек...|Труба проф. 30*30...|       ТР|00288c85d|00:00:39.1|  539|   116|       464|           6|
|Основание ТЯИЛ.73...| Лист х/к т. 1,0 мм.|       ЛР|00449d28b|00:00:20.9|  156|   156|      1190|          28|
|        ЗТ Основание| Лист х/к т. 3,0 мм.|       ЛР|004ac5d67|00:00:08.5|  146|   174|       914|           8|
+--------------------+--------------------+---------+---------+----------+-----+------+----------+------------+
only showing top 4 rows



In [55]:
@udf(T.ArrayType(T.StringType()))
def cut_keywords(name):
    stop_words = {'и', 'в', 'на', 'с', 'под', 'за',
              'для', 'по', 'от', 'до', 'над',
              'через', 'у', 'о', 'об', 'при',
              'из', 'а', 'но', 'или', 'xx', 'хх',
              'мм', 'ст', 'шт', 'тр', 'дл',
              "шир", "выс", "пр"}

    if not name:
        return []
    # Удаляем нетекстовые символы и числа
    name = re.sub(r'[^А-Яа-яЁё\s]', '', name)
    # Разбиваем строку на слова
    words = name.strip().split()
    # Фильтруем слова
    keywords = [word.lower() for word in words if word.lower() not in stop_words and len(word) > 1]
    return keywords

In [56]:
df = df.withColumn("Part_name_lower", F.lower(F.col("Part_name")))  # Приведение к нижнему регистру
df = df.withColumn("keywords_list", cut_keywords(F.col("Part_name_lower")))  # Применение UDF
df = df.withColumn("keyword", F.explode(F.col("keywords_list")))  # Разворачиваем список ключевых слов

In [57]:
df.show(20)

+--------------------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-----------+
|           Part_name|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|       keywords_list|    keyword|
+--------------------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-----------+
|     B6-04-001 Рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|     b6-04-001 рамка|             [рамка]|      рамка|
|45-1.010.06 Перек...|Труба проф. 30*30...|       ТР|00288c85d|00:00:39.1|  539|   116|       464|           6|45-1.010.06 перек...|       [перекладина]|перекладина|
|Основание ТЯИЛ.73...| Лист х/к т. 1,0 мм.|       ЛР|00449d28b|00:00:20.9|  156|   156|      1190|          28|основание тяил.73...|   [основание, тяил]|  основание|
|Осн

In [58]:
keyword_counts = df.groupBy("keyword").count().withColumnRenamed("count", "keyword_count")

df = df.join(keyword_counts, on="keyword", how="left")

# Группируем по изначальным строкам и выбираем минимальное значение count
keywords_count_min = df.groupBy("Part_name").agg(F.min("keyword_count").alias("keywords_counts"))

# Присоединяем результат к оригинальному DataFrame
df = df.join(keywords_count_min, on="Part_name", how="left")

# Вывод результата
df.show(30)

+--------------------+------------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-------------+---------------+
|           Part_name|     keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|       keywords_list|keyword_count|keywords_counts|
+--------------------+------------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-------------+---------------+
|     B6-04-001 Рамка|       рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|     b6-04-001 рамка|             [рамка]|           75|             75|
|45-1.010.06 Перек...| перекладина|Труба проф. 30*30...|       ТР|00288c85d|00:00:39.1|  539|   116|       464|           6|45-1.010.06 перек...|       [перекладина]|         1300|           1300|
|Основание ТЯИЛ

In [59]:
df = df.withColumn("Material_lower_strip", F.lower(F.col("Material")))  # Приведение к нижнему регистру и разбивка по пробелу

In [60]:
@udf(T.StringType())
def get_material_mark(name):
    name = name.strip()
    materials_dict = {
        ('эл/св', 'труба', 'проф'): 'Сталь',
        ('нерж', 'aisi', 'нерж.'): 'Нержавейка',
        ('09г2с',): '09Г2С',
        ('д16', 'дюраль', "амг2", "амг3", "амг5"): 'Алюминий',
        ('65г',): "65Г",
        ('латунь', 'л63'): 'Латунь',
        ('оцинк',): 'Оцинковка',
        ('cor-ten',): 'CORTEN',
        ('медь',): 'медь',
        ('бронза',): 'бронза',
        ('hardox',): 'Hardox',
        ('60с2а',): '60С2А',
        ('титан',): 'Титан'
    }

    for key in materials_dict:
        if any(k in name for k in key):
            return (materials_dict[key])
    return 'Сталь'


In [61]:
df = df.withColumn("Material_mark", get_material_mark(F.col("Material_lower_strip")))  # Применение UDF


In [62]:
df.show(4)

+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+
|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Material_mark|
+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+
|     B6-04-001 Рамка|      рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|     b6-04-001 рамка|          [рамка]|           75|             75| лист г/к т. 4,0 мм.|        Сталь|
|45-1.010.06 Перек...|перекладина|Труба проф. 30*30...|       ТР|00288c85d|00:00:39.1|  

In [63]:
@udf(T.StringType())
def extract_thickness(material):
    try:
        material = material.lower().replace(',', '.').split()
        thickness = None
        for word in material:
            # Проверка на наличие формата типа "60*30*2,0"
            if '*' in word:
                thickness = float(word.split('*')[-1])
                break
            # Проверка на наличие числа с точкой или без нее
            elif all(char.isdigit() or char == '.' for char in word):
                # Исключаем очевидные ошибки, например "304" или "60*30"
                if '.' in word or len(word) < 3:
                    thickness = float(word)
                    break
        if thickness is not None and thickness > 0 and thickness < 20:
            return str(thickness) # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Вот тут перевожу в строку, потому что с типами непонятно
        else:
            return 'Ошибка'
    except:
        return 'Ошибка'

Тут вернул все строками, потому что непонятно с типами данных как работать
(в pyspark-е внутри вроде какие то свои типы на scala и они неконнектятся. И там какие то StructType-ы, и вообще функция может только что то одно возвращать(у меня было строка или инт))

In [64]:
df = df.withColumn("Thickness", extract_thickness(F.col("Material_lower_strip")))  # Применение UDF

In [65]:
df.show(3)

+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+---------+
|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Material_mark|Thickness|
+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+---------+
|     B6-04-001 Рамка|      рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|     b6-04-001 рамка|          [рамка]|           75|             75| лист г/к т. 4,0 мм.|        Сталь|      4.0|
|45-1.010.06 Перек...|перекладина|Труба проф. 30

In [66]:
df = df.where(df['Thickness'] != 'Ошибка')

In [67]:
@udf(T.IntegerType())
def convert_to_seconds(time_str):
    # Убедимся, что time_str является строкой
    if not isinstance(time_str, str):
        try:
            time_str = str(time_str)
        except Exception as e:
            print(f"Ошибка конвертации: {e}")
            return None

    # Попытка парсинга строки времени с миллисекундами
    try:
        if '.' in time_str:
            time_obj = datetime.strptime(time_str, "%H:%M:%S.%f")
        else:
            time_obj = datetime.strptime(time_str, "%H:%M:%S")
    except ValueError as e:
        ##print(f"Ошибка парсинга: {e}")
        return None

    # Преобразование в timedelta
    delta = timedelta(hours=time_obj.hour, minutes=time_obj.minute, seconds=time_obj.second, microseconds=time_obj.microsecond)

    # Получение общего количества секунд и округление до целого числа
    total_seconds = round(delta.total_seconds())

    return int(total_seconds)

In [68]:
df = df.withColumn("Time_converted", convert_to_seconds(F.col("Time")))  # Применение UDF

In [69]:
df.show(3)

+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+---------+--------------+
|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Material_mark|Thickness|Time_converted|
+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+-------------+---------+--------------+
|     B6-04-001 Рамка|      рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f4072|00:00:41.3|  195|   105|      1292|          12|     b6-04-001 рамка|          [рамка]|           75|             75| лист г/к т. 4,0 мм.|        Сталь|      4.0|    

In [70]:
df = df.where(df['Thickness'] != 0)

In [71]:
def get_avg_group_speed(data):
    # Вычисляем среднее время и среднюю длину реза для каждой группы и округляем результаты
    avg_values = data.groupBy(['Thickness', 'Material_mark']) \
        .agg(
            F.round(F.mean('Time_converted')).alias('avg_group_time'),
            F.round(F.mean('Cut_length')).alias('avg_group_length')
        )

    # Соединяем результаты с исходным DataFrame
    result = data.join(avg_values, on=['Thickness', 'Material_mark'], how='left')

    # Вычисляем среднюю скорость и округляем результат
    result = result.withColumn('avg_group_speed', F.round(result['avg_group_length'] / result['avg_group_time']))

    return result

In [72]:
df = get_avg_group_speed(df)

In [73]:
df.show(3)

+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+
|Thickness|Material_mark|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|
+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+
|      4.0|        Сталь|     B6-04-001 Рамка|      рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f407

In [74]:
def sheet_size_filter(data):
    list_hor = (
        (data['Operation'] == 'ЛР') &
        (data['Length'] > 0) & (data['Length'] < 3000) &
        (data['Width'] > 0) & (data['Width'] < 1500)
    )

    list_vert = (
        (data['Operation'] == 'ЛР') &
        (data['Length'] > 0) & (data['Length'] < 1500) &
        (data['Width'] > 0) & (data['Width'] < 3000)
    )

    tube_hor = (
        (data['Operation'] == 'ТР') &
        (data['Length'] > 0) & (data['Length'] < 6000) &
        (data['Width'] > 0) & (data['Width'] < 1000)
    )

    tube_vert = (
        (data['Operation'] == 'ТР') &
        (data['Length'] > 0) & (data['Length'] < 1000) &
        (data['Width'] > 0) & (data['Width'] < 6000)
    )

    # Применяем фильтрацию
    data_filtered = data.filter(list_hor | list_vert | tube_hor | tube_vert)

    return data_filtered

In [75]:
df = sheet_size_filter(df)

In [76]:
df.show(3)

+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+
|Thickness|Material_mark|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|
+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+
|      4.0|        Сталь|     B6-04-001 Рамка|      рамка| Лист г/к т. 4,0 мм.|       ЛР|0022f407

In [77]:
def add_one_hot(data: DataFrame, name_column: str) -> DataFrame:
    # Создаем строковый индексатор для преобразования категориальной переменной в числовую
    string_indexer = StringIndexer(inputCol=name_column, outputCol=name_column + "_index")

    # Создаем one-hot энкодер для созданного числового столбца
    one_hot_encoder = OneHotEncoder(inputCols=[name_column + "_index"], outputCols=[name_column + "_ohe"])

    # Объединяем оба шага в пайплайн
    pipeline = Pipeline(stages=[string_indexer, one_hot_encoder])

    # Обучение пайплайна и преобразование данных
    model = pipeline.fit(data)
    transformed_data = model.transform(data)

    # Убираем ненужные столбцы, если необходимо
    transformed_data = transformed_data.drop(name_column + "_index")

    return transformed_data

In [78]:
df = add_one_hot(df, 'Material_mark')
df.show()

+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|Thickness|Material_mark|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|       keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|Material_mark_ohe|
+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+--------------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|      4.0|        Сталь|     B6-0

Формат результата, который вы получили — это Sparse Vector, который используется в Spark для эффективного хранения векторов с большим количеством нулевых значений. В вашем случае: (11,[0],[1.0]) означает следующее:


    

11: длина вектора (количество возможных категорий, включая нулевые значения).

    

[0]: индексы ненулевых элементов в векторе, в данном случае 0 указывает на первый элемент.

    

[1.0]: значения ненулевых элементов в векторе, здесь 1.0 указывает, что первый элемент активен (в данном случае соответствует категории).

In [79]:
#Колонки, используемые для заполнения пропусков
columns=['Width', 'Length', 'Cut_length', 'Cut_quantity',
         'keywords_counts', 'Thickness', 'Time',
         'avg_group_speed','Material_mark_ohe']

Похоже на pyspark нету knn inputer и надо писать его самому???)))

Чтобы адаптировать вашу функцию, использующую pandas и sklearn, на PySpark, необходимо учесть, что в Spark отсутствуют некоторые функции, такие как StandardScaler и KNNImputer напрямую. Однако мы можем добиться аналогичного результата, используя библиотеки, встроенные в PySpark.

В PySpark для масштабирования данных можно использовать StandardScaler, а для заполнения пропусков — Imputer. Однако, для реализации алгоритма KNN необходимо будет применить другие подходы, так как стандартный KNN, как функция, не доступен в библиотеке Spark. Вместо KNNImputer вы можете использовать Imputer для простого заполнения пропусков.

In [80]:
def fill_nan(data: DataFrame, columns: list, fill_col: str) -> DataFrame:
    # Преобразуем только числовые столбцы
    numeric_columns = [col for col in columns if data.schema[col].dataType.typeName() in ['double', 'float', 'int', 'long']]

    # Если в numeric_columns не осталось столбцов, выдаем предупреждение
    if not numeric_columns:
        print("Нет доступных числовых столбцов для обработки.")
        return data

    # Создаем VectorAssembler для преобразования столбцов в вектор
    assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")

    # Масштабируем данные
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

    # Создаем Imputer для заполнения пропусков
    imputer = Imputer(inputCols=numeric_columns, outputCols=numeric_columns).setStrategy("mean")

    # Создаем Pipeline
    pipeline = Pipeline(stages=[assembler, scaler, imputer])

    # Применение пайплайна
    model = pipeline.fit(data)
    transformed_data = model.transform(data)

    # Удаляем временные столбцы
    transformed_data = transformed_data.drop("features", "scaled_features")

    return transformed_data

In [81]:
df = fill_nan(df, columns, 'keywords_counts')
df.show(5)

+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|Thickness|Material_mark|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|Material_mark_ohe|
+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|      4.0|        Сталь|     B6-04-001 Рам

In [82]:
df.describe().show()

+-------+-----------------+-------------+--------------------+-------+----------------+---------+---------+-------------+------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+--------------------+-----------------+-----------------+------------------+------------------+
|summary|        Thickness|Material_mark|           Part_name|keyword|        Material|Operation|     hash|         Time|             Width|            Length|        Cut_length|      Cut_quantity|     Part_name_lower|     keyword_count|  keywords_counts|Material_lower_strip|   Time_converted|   avg_group_time|  avg_group_length|   avg_group_speed|
+-------+-----------------+-------------+--------------------+-------+----------------+---------+---------+-------------+------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+--------------------+-----------------+----

In [83]:
null_counts = df.select([F.sum(F.col(column).isNull().cast("int")).alias(column) for column in df.columns]).collect()

# Печатаем количество null-значений для каждого столбца
for column, count in zip(df.columns, null_counts[0]):
    print(f"{column}: {count}")

Thickness: 0
Material_mark: 0
Part_name: 0
keyword: 0
Material: 0
Operation: 0
hash: 0
Time: 0
Width: 0
Length: 0
Cut_length: 0
Cut_quantity: 0
Part_name_lower: 0
keywords_list: 0
keyword_count: 0
keywords_counts: 0
Material_lower_strip: 0
Time_converted: 3
avg_group_time: 0
avg_group_length: 0
avg_group_speed: 0
Material_mark_ohe: 0


In [84]:
df.where(df['Time_converted'].isNull()).show()

+---------+-------------+--------------------+--------+-------------------+---------+----------+--------------------+-----+------+----------+------------+--------------------+----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|Thickness|Material_mark|           Part_name| keyword|           Material|Operation|      hash|                Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|   keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|Material_mark_ohe|
+---------+-------------+--------------------+--------+-------------------+---------+----------+--------------------+-----+------+----------+------------+--------------------+----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+
|      5.0|        Сталь|

In [85]:
df = df.filter(df["Time_converted"].isNotNull())

In [88]:
df = df.withColumn("density", F.round(
 (F.col("Cut_length") * F.col("Cut_quantity"))
 /
 (F.col("Width") * F.col("Length"))
 ))

In [89]:
df = df.withColumn("L/W", F.round(
 F.col("length") / F.col("Width")
 ))

In [91]:
# Создаем цепочку условий with when
operation_num_column = F.when(df["Operation"] == "ЛР", 1) \
    .when(df["Operation"] == "ТР", 2) \
    .otherwise(None)

# Добавляем новый столбец
df = df.withColumn("operation_num", operation_num_column)

In [92]:
df = df.filter(~((df["Cut_length"] > 15000) & (df["Time"] < 500)))

In [93]:
df.show(3)

+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+-----------------+-------+---+-------------+
|Thickness|Material_mark|           Part_name|    keyword|            Material|Operation|     hash|      Time|Width|Length|Cut_length|Cut_quantity|     Part_name_lower|    keywords_list|keyword_count|keywords_counts|Material_lower_strip|Time_converted|avg_group_time|avg_group_length|avg_group_speed|Material_mark_ohe|density|L/W|operation_num|
+---------+-------------+--------------------+-----------+--------------------+---------+---------+----------+-----+------+----------+------------+--------------------+-----------------+-------------+---------------+--------------------+--------------+--------------+----------------+---------------+----------