# Pipeline para a 3W

Neste notebook será implementado uma Pipiline de ML aplicado ao problema da dataset 3W da Petrobras.

Para tal será usada a biblioteca TensorFlow Extended.

Autoria: Marcus Carr

### Nomenclatura

instance/instância de um **evento**: equivale a 1 arquivo .csv

**sample**: cada timestep dentro de um .csv

### Estrutura do projeto. 

Como ainda não sei como será tudo com a implementação do módulos do TFX, vou deixar um módulo principal por enquanto.

Posteriormente, dá para analisar se seria mais adequado quebrar em diferentes módulos a estrutra do código.

### Seting up variables

These will define our pipeline.

In [1]:
import raw_data_acquisition as rda
import raw_data_inspector as rdi
from constants import utils, config
import models

# Set default logging level.
from absl import logging
logging.set_verbosity(logging.DEBUG)

Adquirir dados!

In [2]:
rda.acquire_dataset_if_needed() # 17min48s (local) -> 1m55s (server)

INFO:absl:Directory with the biggest version: /home/ubuntu/lemi_3w/data/dataset_converted_v10101
INFO:absl:Version: 10101
INFO:absl:Latest local version is 10101
INFO:absl:Going to fetch config file from $https://raw.githubusercontent.com/petrobras/3W/main/dataset/dataset.ini
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2786  100  2786    0     0   4418      0 --:--:-- --:--:-- --:--:--  4415
INFO:absl:Latest online version is 10101
INFO:absl:Found existing converted data with dataset version of 10101


In [3]:
latest_converted_data_path, latest_converted_data_version = rda.get_latest_local_converted_data_version(config.DIR_PROJECT_DATA)
print(f"Data version is: {latest_converted_data_version}")
print(f"Size of directory with converted files is: {utils.get_directory_size(latest_converted_data_path)/(1024**3):.3f} GB")

INFO:absl:Directory with the biggest version: /home/ubuntu/lemi_3w/data/dataset_converted_v10101
INFO:absl:Version: 10101


Data version is: 10101
Size of directory with converted files is: 1.263 GB


In [4]:
inspector = rdi.RawDataInspector(
    latest_converted_data_path,
    config.PATH_DATA_INSPECTOR_CACHE,
    True
)

metadata_all_data = inspector.get_metadata_table()
metadata_all_data

Unnamed: 0_level_0,class_type,source,well_id,path,timestamp,file_size,num_timesteps
hash_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
74203bb,NORMAL,REAL,1.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-05-24 03:00:00,491413,17885
9fbd6f9,NORMAL,REAL,2.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-08-09 06:00:00,520149,17933
28804c5,NORMAL,REAL,6.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-05-08 09:00:31,349159,17970
42afe91,NORMAL,REAL,8.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-07-01 14:01:35,251878,17799
fa71d94,NORMAL,REAL,6.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-08-23 19:00:00,279736,17949
...,...,...,...,...,...,...,...
ea66cf6,SEVERE_SLUGGING,SIMULATED,,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,NaT,2315902,61999
34f032a,SEVERE_SLUGGING,SIMULATED,,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,NaT,2259537,61999
876a969,SEVERE_SLUGGING,REAL,14.0,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,2017-09-25 06:00:42,1005714,17959
deac7ec,SEVERE_SLUGGING,SIMULATED,,/home/ubuntu/lemi_3w/data/dataset_converted_v1...,NaT,2045157,61999


In [5]:
import pathlib
%%script false --no-raise-error
inspector_test = rdi.RawDataInspector(
    config.DIR_PROJECT_DATA / "teset_dataset_converted_v10003",
    config.DIR_PROJECT_CACHE / "random_inspector_cache.parquet",
    False
)

UsageError: Line magic function `%%script` not found.


In [6]:
from raw_data_manager import models
import pandas as pd
def get_table_by_anomaly_source(data_all: pd.DataFrame) -> pd.DataFrame:
    anomaly = []
    real_count = []
    simul_count = []
    drawn_count = []
    soma = []

    for anomaly_type in models.EventClassType:
        anomaly.append(anomaly_type.name)
        
        real_count.append(
            len(data_all[(data_all['class_type'] == anomaly_type.name) & (data_all['source'] == 'REAL')]))
        simul_count.append(
            len(data_all[(data_all['class_type'] == anomaly_type.name) & (data_all['source'] == 'SIMULATED')]))
        drawn_count.append(
            len(data_all[(data_all['class_type'] == anomaly_type.name) & (data_all['source'] == 'HAND_DRAWN')]))
        soma.append(
            len(data_all[data_all['class_type'] == anomaly_type.name]))

    anomaly.append('Total')
    real_count.append(sum(real_count))
    simul_count.append(sum(simul_count))
    drawn_count.append(sum(drawn_count))
    soma.append(sum(soma))

    data = {
        'anomaly': anomaly,
        'real_count': real_count,
        'simul_count': simul_count,
        'drawn_count': drawn_count,
        'soma' : soma,
    }

    # Create the DataFrame
    df_source = pd.DataFrame(data)
    df_source.set_index('anomaly', inplace=True)
    return df_source

In [7]:
get_table_by_anomaly_source(metadata_all_data)

Unnamed: 0_level_0,real_count,simul_count,drawn_count,soma
anomaly,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
NORMAL,594,0,0,594
ABRUPT_INCREASE_BSW,5,114,10,129
SPURIOUS_CLOSURE_DHSV,22,16,0,38
SEVERE_SLUGGING,32,74,0,106
FLOW_INSTABILITY,344,0,0,344
RAPID_PRODUCTIVITY_LOSS,11,439,0,450
QUICK_RESTRICTION_PCK,6,215,0,221
SCALING_IN_PCK,5,0,10,15
HYDRATE_IN_PRODUCTION_LINE,0,81,0,81
Total,1019,939,20,1978


### Spliting data

In [10]:
import raw_data_splitter as rds

splitter = rds.RawDataSplitter(metadata_all_data, latest_converted_data_version)
split_train_dir, split_test_dir = splitter.stratefy_split_of_data(config.DIR_PROJECT_DATA, test_size=0.20)

DEBUG:absl:size of train data: 1582 --- size of test data: 396
DEBUG:absl:train path /home/ubuntu/lemi_3w/data/dataset_converted_v10101_split-20_source-all_class-all_well-all_train --- test path /home/ubuntu/lemi_3w/data/dataset_converted_v10101_split-20_source-all_class-all_well-all_test


DONE:   0%|          | 0/1582 [00:00<?, ?it/s]

DONE:   0%|          | 0/396 [00:00<?, ?it/s]

In [18]:
train_metadata = rdi.RawDataInspector(
    split_train_dir,
    config.DIR_PROJECT_CACHE / "train_metadata.parquet",
    False
)

test_metadata = rdi.RawDataInspector(
    split_test_dir,
    config.DIR_PROJECT_CACHE / "test_metadata.parquet",
    False
)

INFO:absl:Processing 475 events of class type 0.


DONE:   0%|          | 0/475 [00:00<?, ?it/s]

INFO:absl:Processing 275 events of class type 4.


DONE:   0%|          | 0/275 [00:00<?, ?it/s]

INFO:absl:Processing 360 events of class type 5.


DONE:   0%|          | 0/360 [00:00<?, ?it/s]

INFO:absl:Processing 103 events of class type 1.


DONE:   0%|          | 0/103 [00:00<?, ?it/s]

INFO:absl:Processing 12 events of class type 7.


DONE:   0%|          | 0/12 [00:00<?, ?it/s]

INFO:absl:Processing 177 events of class type 6.


DONE:   0%|          | 0/177 [00:00<?, ?it/s]

INFO:absl:Processing 65 events of class type 8.


DONE:   0%|          | 0/65 [00:00<?, ?it/s]

INFO:absl:Processing 31 events of class type 2.


DONE:   0%|          | 0/31 [00:00<?, ?it/s]

INFO:absl:Processing 84 events of class type 3.


DONE:   0%|          | 0/84 [00:00<?, ?it/s]

INFO:absl:Processing 119 events of class type 0.


DONE:   0%|          | 0/119 [00:00<?, ?it/s]

INFO:absl:Processing 69 events of class type 4.


DONE:   0%|          | 0/69 [00:00<?, ?it/s]

INFO:absl:Processing 90 events of class type 5.


DONE:   0%|          | 0/90 [00:00<?, ?it/s]

INFO:absl:Processing 26 events of class type 1.


DONE:   0%|          | 0/26 [00:00<?, ?it/s]

INFO:absl:Processing 3 events of class type 7.


DONE:   0%|          | 0/3 [00:00<?, ?it/s]

INFO:absl:Processing 44 events of class type 6.


DONE:   0%|          | 0/44 [00:00<?, ?it/s]

INFO:absl:Processing 16 events of class type 8.


DONE:   0%|          | 0/16 [00:00<?, ?it/s]

INFO:absl:Processing 7 events of class type 2.


DONE:   0%|          | 0/7 [00:00<?, ?it/s]

INFO:absl:Processing 22 events of class type 3.


DONE:   0%|          | 0/22 [00:00<?, ?it/s]

In [19]:
get_table_by_anomaly_source(train_metadata.get_metadata_table())

Unnamed: 0_level_0,real_count,simul_count,drawn_count,soma
anomaly,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
NORMAL,475,0,0,475
ABRUPT_INCREASE_BSW,4,91,8,103
SPURIOUS_CLOSURE_DHSV,18,13,0,31
SEVERE_SLUGGING,25,59,0,84
FLOW_INSTABILITY,275,0,0,275
RAPID_PRODUCTIVITY_LOSS,9,351,0,360
QUICK_RESTRICTION_PCK,5,172,0,177
SCALING_IN_PCK,4,0,8,12
HYDRATE_IN_PRODUCTION_LINE,0,65,0,65
Total,815,751,16,1582


In [20]:
get_table_by_anomaly_source(test_metadata.get_metadata_table())

Unnamed: 0_level_0,real_count,simul_count,drawn_count,soma
anomaly,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
NORMAL,119,0,0,119
ABRUPT_INCREASE_BSW,1,23,2,26
SPURIOUS_CLOSURE_DHSV,4,3,0,7
SEVERE_SLUGGING,7,15,0,22
FLOW_INSTABILITY,69,0,0,69
RAPID_PRODUCTIVITY_LOSS,2,88,0,90
QUICK_RESTRICTION_PCK,1,43,0,44
SCALING_IN_PCK,1,0,2,3
HYDRATE_IN_PRODUCTION_LINE,0,16,0,16
Total,204,188,4,396


In [None]:
import models
list = [models.EventClassType.ABRUPT_INCREASE_BSW, models.EventClassType.FLOW_INSTABILITY]
numbers = [class_type.value for class_type in list]

models.EventClassType
numbers = [class_type.value for class_type in models.EventClassType]

"-".join(map(str, numbers))

In [None]:
len(models.EventClassType)