<a href="https://colab.research.google.com/github/YaninaK/predictive-maintenance/blob/main/01_Read_clean_and_resample_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Xакатон ЛИДЕРЫ ЦИФРОВОЙ ТРАНСФОРМАЦИИ 2023

## Северсталь. Модель раннего обнаружения неисправностей промышленного оборудования
[Задача 15](https://leaders2023.innoagency.ru/task_15) 


## Read clean and resample data

### [data](https://drive.google.com/file/d/1jrbfHULbZuCnwJQwNllQUFlCGpR_lHDc/view?usp=sharing)

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!git clone https://github.com/YaninaK/predictive-maintenance.git -q
!pip install -r predictive-maintenance/requirements_Colab.txt -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
!unzip /content/drive/MyDrive/ML_projects/02_Predictive_maintenance/data/01_raw/datasets.zip -d /content/predictive-maintenance/data/
!mv /content/predictive-maintenance/data/Датасеты/* /content/predictive-maintenance/data/01_raw
!rmdir /content/predictive-maintenance/data/Датасеты

Archive:  /content/drive/MyDrive/ML_projects/02_Predictive_maintenance/data/01_raw/datasets.zip
   creating: /content/predictive-maintenance/data/Датасеты/
 extracting: /content/predictive-maintenance/data/Датасеты/X_test.parquet  
 extracting: /content/predictive-maintenance/data/Датасеты/X_train.parquet  
 extracting: /content/predictive-maintenance/data/Датасеты/messages.xlsx  
 extracting: /content/predictive-maintenance/data/Датасеты/test_intervals.xlsx  
 extracting: /content/predictive-maintenance/data/Датасеты/y_train.parquet  


In [4]:
%cd /content/predictive-maintenance

/content/predictive-maintenance


In [5]:
import sys
import os

sys.path.append(os.getcwd())
sys.path.append(os.path.join(os.getcwd(), "src", "predictive_maintenance"))   

In [6]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as F

from pprint import pprint

from data.make_dataset import (
    load_data, 
    get_new_X_column_names,    
    rename_columns,      
)
from data.resample_dataset import (    
    save_resampled_X,
    save_resampled_y_train, 
)

In [7]:
import warnings
warnings.filterwarnings('ignore')

In [8]:
app_name = 'data_preprocessing'
spark_ui_port = 4041

In [9]:
spark = (
    pyspark.sql.SparkSession.builder
        .appName(app_name)        
        .config("spark.ui.port", spark_ui_port)
        .getOrCreate()
)

In [10]:
PATH = '/content/drive/MyDrive/ML_projects/02_Predictive_maintenance/'
FOLDER = "data/02_intermediate/"

### 1. Data ingestion

In [11]:
X_train, y_train, messages, unified_tech_places = load_data()

X_train = X_train.repartition(4)
y_train = y_train.repartition(4)

print(f'X_train.shape = ({X_train.count()}, {len(X_train.columns)})')
print(f'y_train.shape = ({y_train.count()}, {len(y_train.columns)})')

X_train.shape = (9335034, 97)
y_train.shape = (9335034, 176)


In [12]:
print(f'unified_tech_places.shape = {unified_tech_places.shape}\n')
unified_tech_places.head(2)

unified_tech_places.shape = (175, 3)



Unnamed: 0,equipment,description,unified_name
0,9,9_ЗАПОРНАЯ АРМАТУРА ЭКСГАУСТЕРА №9,ЗАПОРНАЯ АРМАТУРА ЭКСГАУСТЕРА №
1,9,9_МАСЛОСТАНЦИЯ ЖИДКОЙ СМАЗКИ ЭКСГ_ №9,МАСЛОСТАНЦИЯ ЖИДКОЙ СМАЗКИ ЭКСГ_ №


In [13]:
print(f'messages.shape = {messages.shape}\n')
messages.head(2)

messages.shape = (981, 11)



Unnamed: 0,МАШИНА,ИМЯ_МАШИНЫ,ТЕХ_МЕСТО,НАЗВАНИЕ_ТЕХ_МЕСТА,ВИД_СООБЩЕНИЯ,ОПИСАНИЕ,ДАТА_НАЧАЛА_НЕИСПРАВНОСТИ,ДАТА_УСТРАНЕНИЯ_НЕИСПРАВНОСТИ,ТЕКСТ_ГРУППЫ_КОДОВ,equipment,unified_name
390,AA2/006-006,ЭКСГАУСТЕР А/М №9,AA2/006-006-002-008,ЗАПОРНАЯ АРМАТУРА ЭКСГАУСТЕРА №9,M3,неисправен двигатель,2019-01-21 00:00:00,2019-02-25,,9,ЗАПОРНАЯ АРМАТУРА ЭКСГАУСТЕРА №
391,CH-AGP-AG2/011-005,ЭКСГАУСТЕР А/М №9,CH-AGP-AG2/011-005-002,МАСЛОСТАНЦИЯ ЖИДКОЙ СМАЗКИ ЭКСГ. №9,M3,неисправен двигатель,2019-01-21 12:26:08,2019-02-25,,9,МАСЛОСТАНЦИЯ ЖИДКОЙ СМАЗКИ ЭКСГ_ №


In [14]:
X_test = spark.read.parquet(
    "data/01_raw/" + "X_test.parquet", header=True, inferSchema= True
)
X_cols = get_new_X_column_names(X_test)
X_test = rename_columns(X_test, X_cols)
X_test = X_test.repartition(4)

print(f'X_test.shape = ({X_test.count()}, {len(X_test.columns)})')

X_test.shape = (4008961, 97)


In [15]:
test_intervals = pd.read_excel(
    "data/01_raw/" + "test_intervals.xlsx", index_col=0
).sort_index()

print(f'test_intervals.shape = {test_intervals.shape}')
test_intervals.head(2)

test_intervals.shape = (189, 4)


Unnamed: 0,start,finish,machine,tm
0,2022-01-07 09:05:16,2022-01-07 14:05:15,,
1,2022-02-25 03:44:52,2022-02-25 08:15:03,,


### 2. Clean data

In [16]:
def clean_data(X):
  for var in X.schema.names[1:]:
    X = X.withColumn(
          var, F.when(F.col(var) < 0, 0).otherwise(F.col(var))
      )
    if var[2:10] == 'ДАВЛЕНИЕ':
        X = X.withColumn(
          var, F.when(F.col(var) > 800, 800).otherwise(F.col(var))
      )
  return X

In [17]:
X_train = clean_data(X_train)
X_test = clean_data(X_test)

### 3. Resample data

In [18]:
save = True
if save:
  t = 60 * 60 # 1 hour
  save_resampled_y_train(y_train, path=PATH, period=t)
  save_resampled_X(X_train, prefix="X_train", path=PATH, period=t)
  save_resampled_X(X_test, prefix="X_test", path=PATH, period=t)



### 4. Save messages and unified_tech_places

In [19]:
save = True
if save:
  messages.to_parquet(PATH + FOLDER + "messages_unified.parquet")
  unified_tech_places.to_parquet(
      PATH + FOLDER + "unified_tech_places.parquet"
  )