# 3W dataset's General Presentation

This is a general presentation of the 3W dataset, to the best of its authors' knowledge, the first realistic and public dataset with rare undesirable real events in oil wells that can be readily used as a benchmark dataset for development of machine learning techniques related to inherent difficulties of actual data.

For more information about the theory behind this dataset, refer to the paper **A Realistic and Public Dataset with Rare Undesirable Real Events in Oil Wells** published in the **Journal of Petroleum Science and Engineering** (link [here](https://doi.org/10.1016/j.petrol.2019.106223)).

# 1. Introduction

This Jupyter Notebook presents the 3W dataset in a general way. For this, some tables, graphs, and statistics are presented.

# 2. Imports and Configurations

In [1]:
import sys
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pathlib
import pickle

from river import stream, feature_extraction as fx, compose, stats, preprocessing, tree, metrics, evaluate
from river import linear_model, optim, drift, anomaly, utils, ensemble

import sklearn.model_selection
from sklearn.preprocessing import StandardScaler

%matplotlib inline
%config InlineBackend.figure_format = 'svg'

In [2]:
import river
river.__version__

'0.12.1'

# 3. Instances' Structure

Below, all 3W dataset's instances are loaded and the first one of each knowledge source (real, simulated and hand-drawn) is partially displayed.

In [3]:
class d3w():
    '''
    Class for managing Petrobras 3W dataset
    '''
    def __init__(self, path3w):
        self.path3w = path3w
        self.df = self.__load_df()
        return

    def __load_df(self):

        d = dict()
        d['origin'] = []
        d['label'] = []
        d['path'] = []
        d['nlines'] = []
        for i in pathlib.Path(self.path3w).iterdir():
            if i.stem.isnumeric():
                print(i)
                label = int(i.stem)
                for fp in i.iterdir():
                    # Considers only csv files
                    if fp.suffix == ".csv":

                        if (fp.stem.startswith("SIMULATED")):
                            d['origin'].append('S')
                        elif fp.stem.startswith("DRAWN"):
                            d['origin'].append('D')
                        else:
                            d['origin'].append('R')
                        
                        d['label'].append(label)
                        d['path'].append(fp)
                        d['nlines'].append(self.file_len(fp)-1)
                        
        return pd.DataFrame(d)
    
    def split(self, real=True, simul=True, drawn=True, test_size=0.2, val_size=0.1, sample_n=None):
        
        tmp0_df = self.get_df(real, simul, drawn)
        
        if sample_n is not None:
            N = len(tmp0_df.index)
            if N > sample_n:
                ds_list = []
                for i, ni in tmp0_df.groupby('label').count().nlines.items():
                    ns = ni*sample_n//N
                    ds_list.append(tmp0_df[tmp0_df.label == i].sample(n=ns, random_state=200560))
                tmp0_df = pd.concat(ds_list)            
        
        tmp_df, test_df = sklearn.model_selection.train_test_split(tmp0_df, 
                                                        test_size=test_size, 
                                                        random_state=200560, 
                                                        shuffle=True, 
                                                        stratify=tmp0_df['label'])
        
        if val_size == 0:
            print('Instances Train: {}  Test: {}'.format(len(tmp_df.index), 
                                                         len(test_df.index)))
            return tmp_df.reset_index(drop=True),\
                   test_df.reset_index(drop=True)
        
        train_df, val_df = sklearn.model_selection.train_test_split(tmp_df, test_size=val_size, 
                                                        random_state=200560, 
                                                        shuffle=True, 
                                                        stratify=tmp_df['label'])
        print('Instances Train: {}  Test: {}  Validation: {}'.format(len(train_df.index), 
                                                                     len(test_df.index), 
                                                                     len(val_df.index)))
        
        return train_df.reset_index(drop=True),\
               test_df.reset_index(drop=True),\
               val_df.reset_index(drop=True)
    
    def file_len(self, filename):
        j = 0
        with open(filename) as f:
            for i, x in enumerate(f):
                if x.strip() == '':
                    j += 1
        return i + 1 - j
    
    def get_df(self, real=True, simul=True, drawn=True):
        sel = []
        if real:
            sel.append('R')
        if simul:
            sel.append('S')
        if drawn:
            sel.append('D')
        if sel:
            return self.df[self.df['origin'].isin(sel)].drop(columns=['origin']).reset_index(drop=True)
    
    @property
    def all(self):
        return self.df.drop(columns=['origin'])
    @property
    def real(self):
        return self.df[self.df['origin']=='R'].drop(columns=['origin']).reset_index(drop=True)
    @property
    def simul(self):
        return self.df[self.df['origin']=='S'].drop(columns=['origin']).reset_index(drop=True)
    @property
    def drawn(self):
        return self.df[self.df['origin']=='D'].drop(columns=['origin']).reset_index(drop=True)


In [4]:
if pathlib.Path('dset_river.pkl').exists():
  with open('dset_river.pkl', 'rb') as f:
    dset = pickle.load(f)
else:
    dset = d3w('../dataset')
    with open('dset_river.pkl', 'wb') as f:
      pickle.dump(dset, f)

In [5]:
flist = ['P-PDG', 'P-TPT', 'T-TPT', 'P-MON-CKP', 'T-JUS-CKP', 'P-JUS-CKGL', 'T-JUS-CKGL', 'QGL']
categories=[[0,1,2,3,4,5,6,7,8,101,102,103,104,105,106,107,108]]

train_df, test_df, val_df = dset.split(real=True, simul=True, drawn=True, test_size=0.1, val_size=0.2, sample_n=300)

Instances Train: 213  Test: 30  Validation: 54


Each instance is stored in a CSV file and loaded into a pandas DataFrame. Each observation is stored in a line in the CSV file and loaded as a line in the pandas DataFrame. The first line of each CSV file contains a header with column identifiers. Each column of CSV files stores the following type of information:

* **timestamp**: observations timestamps loaded into pandas DataFrame as its index;
* **P-PDG**: pressure variable at the Permanent Downhole Gauge (PDG);
* **P-TPT**: pressure variable at the Temperature and Pressure Transducer (TPT);
* **T-TPT**: temperature variable at the Temperature and Pressure Transducer (TPT);
* **P-MON-CKP**: pressure variable upstream of the production choke (CKP);
* **T-JUS-CKP**: temperature variable downstream of the production choke (CKP);
* **P-JUS-CKGL**: pressure variable upstream of the gas lift choke (CKGL);
* **T-JUS-CKGL**: temperature variable upstream of the gas lift choke (CKGL);
* **QGL**: gas lift flow rate;
* **class**: observations labels associated with three types of periods (normal, fault transient, and faulty steady state).

Other information are also loaded into each pandas Dataframe:

* **label**: instance label (event type);
* **well**: well name. Hand-drawn and simulated instances have fixed names. Real instances have names masked with incremental id;
* **id**: instance identifier. Hand-drawn and simulated instances have incremental id. Each real instance has an id generated from its first timestamp.

More information about these variables can be obtained from the following publicly available documents:

* ***Option in Portuguese***: R.E.V. Vargas. Base de dados e benchmarks para prognóstico de anomalias em sistemas de elevação de petróleo. Universidade Federal do Espírito Santo. Doctoral thesis. 2019. https://github.com/ricardovvargas/3w_dataset/raw/master/docs/doctoral_thesis_ricardo_vargas.pdf.
* ***Option in English***: B.G. Carvalho. Evaluating machine learning techniques for detection of flow instability events in offshore oil wells. Universidade Federal do Espírito Santo. Master's degree dissertation. 2021. https://github.com/ricardovvargas/3w_dataset/raw/master/docs/master_degree_dissertation_bruno_carvalho.pdf.

In [None]:
dset.all

In [6]:
class CustomGen():
    
    def __init__(self, dset, flist, target, scaleb=False, out_time=False, ifileb=False):
        self.dset = dset
        self.flist = flist
        self.target = target
        self.scaleb = scaleb
        self.out_time = out_time
        self.ifileb = ifileb
        return
    
    def iter(self, max_n=None):
        n = 0
        for ifile, p in enumerate(self.dset['path']):
            df = pd.read_csv(p, parse_dates=["timestamp"])

            if np.any(df[self.target].isna()):
                df[self.target] = df[self.target].fillna(method='ffill')
            df[self.target] = df[self.target].astype(int) #.apply(str)

            for f in self.flist:
                nas = np.sum(df[f].isna())
                if nas > 0:
                    if nas < len(df.index) * 0.2:
                        df[f] = df[f].fillna(method='ffill')
                    else:
                        df[f] = 0
            
            if self.scaleb:
                np.seterr(divide='ignore', invalid='ignore')
                dfd = self.scale(df[self.flist]).to_dict(orient='records')
                np.seterr(divide='warn', invalid='warn')
            else:
                dfd = df[self.flist].to_dict(orient='records')
            for i, x in enumerate(dfd):
                #assert isinstance(df.iloc[i][self.target], np.int32), 'class not an integer!'
                result = []
                if self.ifileb:
                    result.append(ifile)
                if self.out_time:
                    result. append(df.iloc[i]['timestamp'])
                yield result + [x, df.iloc[i][self.target]]
                n += 1
                if max_n is not None and n > max_n:
                    break
            if max_n is not None and n > max_n:
                break

        return
    
    def scale(self, df):
        scale = StandardScaler()
        return pd.DataFrame(scale.fit_transform(df[self.flist]), columns=self.flist, index=df.index)

    def plot(self, ifile):
        
        ds = pd.read_csv(self.dset['path'][ifile], parse_dates=["timestamp"])
        fig, axs = plt.subplots(nrows=len(self.flist)+1, figsize=(10, 12), sharex=True)
        
        fig.suptitle(self.dset['path'][ifile])

        for i, vs in enumerate(self.flist):
            axs[i].plot(ds.timestamp[::60], ds[vs][::60])
            axs[i].set_ylabel(vs)
        
        id = np.argsort(ds[self.target])
        t = [ds.timestamp[i] for i in id][::60]
        y = [str(ds[self.target][i]) for i in id][::60]
        
        axs[i+1].scatter(t, y, marker='.')
        
        axs[i+1].set_ylabel(self.target)
        
        axs[i+1].set_xlabel('Date')
        
        for ax in axs:
            ax.grid(visible=True)

        plt.show()    


In [7]:
gen = CustomGen(dset.all, flist, 'class', scaleb=True, out_time=True, ifileb=True)
n = 0
for i, time, x, t in gen.iter():
    print(i, time, x, t)
    n += 1
    if n > 3:
        break

0 2017-02-01 02:02:07 {'P-PDG': 0.0, 'P-TPT': 1.726355710248412, 'T-TPT': 1.6877002420865492, 'P-MON-CKP': 0.16879165859933157, 'T-JUS-CKP': 0.8731632851640091, 'P-JUS-CKGL': -1.7323051228495974, 'T-JUS-CKGL': 0.0, 'QGL': 0.0} 0
0 2017-02-01 02:02:08 {'P-PDG': 0.0, 'P-TPT': 1.7211893794340596, 'T-TPT': 1.6877002420865492, 'P-MON-CKP': 0.21591556364734954, 'T-JUS-CKP': 0.8541981839009655, 'P-JUS-CKGL': -1.7313436073904254, 'T-JUS-CKGL': 0.0, 'QGL': 0.0} 0
0 2017-02-01 02:02:09 {'P-PDG': 0.0, 'P-TPT': 1.7160230486197072, 'T-TPT': 1.6877002420865492, 'P-MON-CKP': 0.2630394686953675, 'T-JUS-CKP': 0.8352572420026301, 'P-JUS-CKGL': -1.7313436073904254, 'T-JUS-CKGL': 0.0, 'QGL': 0.0} 0
0 2017-02-01 02:02:10 {'P-PDG': 0.0, 'P-TPT': 1.7108567178053549, 'T-TPT': 1.6877002420865492, 'P-MON-CKP': 0.3101633737433855, 'T-JUS-CKP': 0.8163163001042605, 'P-JUS-CKGL': -1.7313436073904254, 'T-JUS-CKGL': 0.0, 'QGL': 0.0} 0


In [None]:
gen.plot(100)

In [None]:
i = 0
for t, r in df[flist].iterrows():
    print(t, r.to_dict())
    i += 1
    if i > 5:
        break

In [18]:

scaler = preprocessing.StandardScaler()
#scaler = preprocessing.RobustScaler()

rmean = utils.Rolling(stats.Mean(), window_size=900)
rSEM = utils.Rolling(stats.SEM(ddof=1), window_size=900)
rMax = stats.RollingMax(window_size=900)
rMin = stats.RollingMin(window_size=900)
rtree = tree.HoeffdingTreeClassifier()
arfc = ensemble.AdaptiveRandomForestClassifier(seed=200560, leaf_prediction="nba", max_depth=4)
#loss = optim.losses.MultiClassLoss()
loss = optim.losses.CrossEntropy()
#rlr = linear_model.LogisticRegression(optimizer=optim.Adam(.01), loss=loss)
#metric = metrics.Accuracy()
#metric = metrics.BalancedAccuracy()
metric = metrics.ClassificationReport()

model = compose.TransformerUnion()
for f in flist:
    model += fx.Agg(on=f, by = None, how=rmean)
    model += fx.Agg(on=f, by = None, how=rSEM)
    model += fx.Agg(on=f, by = None, how=rMax)
    model += fx.Agg(on=f, by = None, how=rMin)
#model |= compose.Discard(flist[0])
#model |= scaler
#model |= arfc


In [None]:
print(model.debug_one(next(gen.iter())[0]))

In [None]:
gen = CustomGen(train_df, flist, 'class', scaleb=True, out_time=False)
evaluate.progressive_val_score(dataset=gen.iter(max_n=None), model=model, metric=metric, delay=1800, print_every=2*3600)

In [29]:
import queue
q = queue.Queue()

delay = 3600
window_size = 900

rmean = utils.Rolling(stats.Mean(), window_size=window_size)
rSEM = utils.Rolling(stats.SEM(ddof=1), window_size=window_size)
rMax = stats.RollingMax(window_size=window_size)
rMin = stats.RollingMin(window_size=window_size)
model = compose.TransformerUnion()
for f in flist:
    model += fx.Agg(on=f, by = None, how=rmean)
    model += fx.Agg(on=f, by = None, how=rSEM)
    model += fx.Agg(on=f, by = None, how=rMax)
    model += fx.Agg(on=f, by = None, how=rMin)

arfc = ensemble.AdaptiveRandomForestClassifier(seed=200560, leaf_prediction="nba")
report = metrics.ClassificationReport()

gen = CustomGen(train_df, flist, 'class', scaleb=True, out_time=False, ifileb=True)

lastfile = 0
ys_real = []
ys_pred = []
j = 0
for i, (ifile, x, y) in enumerate(gen.iter(max_n=None)):
    if ifile != lastfile:
        model = model.clone()
        lastfile = ifile
        j = 0
        print(i, 'new file', ifile)
    xt = model.transform_one(x)
    if j >= window_size:
        if i > delay+window_size+1:
            try:
                y_pred = arfc.predict_one(xt)
                if y_pred is not None:
                    ys_pred.append(y_pred)
                    ys_real.append(y)
                    report = report.update(y, y_pred)
                else:
                    print('Predicts None', i, j)
            except:
                print(i, ifile, 'Error predict')
        q.put((xt, y))
        if i > delay+window_size:
            xd, yd = q.get()
            try:
                arfc.learn_one(xd, yd)
            except:
                print(i, ifile, 'Error learn')
    j += 1
print(report)    

88799 new file 1
115798 new file 2
122958 new file 3
197357 new file 4
226656 new file 5
315456 new file 6
342455 new file 7
360372 new file 8
367506 new file 9
396805 new file 10
403981 new file 11
414650 new file 12
443949 new file 13
470948 new file 14
488705 new file 15
506683 new file 16
517350 new file 17
546649 new file 18
564294 new file 19
582260 new file 20
623438 new file 21
641313 new file 22
648432 new file 23
666361 new file 24
695660 new file 25
713630 new file 26
742929 new file 27
772228 new file 28
799227 new file 29
806369 new file 30
816893 new file 31
823955 new file 32
841858 new file 33
852396 new file 34
881695 new file 35
888863 new file 36
906579 new file 37
924354 new file 38
984353 new file 39
1041352 new file 40
1167615 new file 41
1196914 new file 42
1204035 new file 43
1211169 new file 44
1228977 new file 45
1255976 new file 46
1344776 new file 47
1362712 new file 48
1392011 new file 49
1399179 new file 50
1417138 new file 51
1446437 new file 52
1473436 n

KeyboardInterrupt: 

In [30]:
gen_test = CustomGen(test_df, flist, 'class', scaleb=True, out_time=False, ifileb=True)
report = metrics.ClassificationReport()

lastfile = 0
ys_real = []
ys_pred = []
j = 0
for i, (ifile, x, y) in enumerate(gen_test.iter(max_n=250000)):
    if ifile != lastfile:
        model = model.clone()
        lastfile = ifile
        j = 0
        print(i, 'new file', ifile)
    xt = model.transform_one(x)
    if j >= window_size:
        try:
            y_pred = arfc.predict_one(xt)
            if y_pred is not None:
                ys_pred.append(y_pred)
                ys_real.append(y)
                report = report.update(y, y_pred)
            else:
                print('Predicts None', i, j)
        except:
            print(i, ifile, 'Error predict')
    j += 1
print(report)   

26999 new file 1
56298 new file 2
83297 new file 3
112596 new file 4
141895 new file 5
149063 new file 6
156197 new file 7
185496 new file 8
203444 new file 9
210635 new file 10
           Precision   Recall    F1       Support  
                                                   
       0       0.00%     0.00%    0.00%     24837  
       4       0.00%     0.00%    0.00%     18793  
       5      40.19%   100.00%   57.34%     96496  
       6       0.00%     0.00%    0.00%     35998  
     101       0.00%     0.00%    0.00%     32477  
     105       0.00%     0.00%    0.00%       171  
     106       0.00%     0.00%    0.00%       144  
                                                   
   Macro       5.74%    14.29%    8.19%            
   Micro      40.19%    40.19%   40.19%            
Weighted      16.15%    40.19%   23.04%            

                  40.19% accuracy                  


In [31]:
np.unique(ys_pred, return_counts=True)

(array([5]), array([240101], dtype=int64))

In [None]:
model = model.clone()
evaluate.progressive_val_score(
    model = model,
    dataset = stream.iter_pandas(df_clean[flist], df_clean['class']),
    metric = metrics.Accuracy(),
    delay = 0, 
    print_every = 900)

## Drift test

In [None]:
# Auxiliary function to plot the data
def plot_data(stream, sts, warnings=None, drifts=None):
    fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(12, 6), sharex=True)
    ax1.grid; ax2.grid
    ax1.set_ylabel('Stream'); ax2.set_ylabel('Stat')
    fig.suptitle('Drift test')
    ax1.plot(stream, label='Stream')
    ax2.plot(sts, label='stats')
    if drifts is not None:
        for drift_detected in drifts:
            ax2.axvline(stream.index[drift_detected], color='red', linewidth=0.5)
    if warnings is not None:
        for warning_detected in warnings:
            ax2.axvline(stream.index[warning_detected], color='green', linewidth=0.25)
    plt.show()

In [None]:
df = tk.load_instance(real_instances[real_instances.label==3].iloc[0])
df_clean = df[flist+['class']].dropna()

#sem = stats.RollingSEM(ddof=1, window_size=30)
sem = stats.SEM()

f = 'P-TPT'
#drift_detector = drift.ADWIN(delta=0.002)
drift_detector = drift.DDM(min_num_instances=600)
warnings = []
drifts = []
sems = []

for i, val in enumerate(df_clean[f]):
    sems.append(sem.update(val).get())
    in_drift, in_warning = drift_detector.update(sems[-1])   # Data is processed one sample at a time
    if in_warning:
        # The drift detector indicates after each sample if there is a drift in the data
        #print(f'Change detected at index {i}')
        warnings.append(i)
#        drift_detector.reset()   # As a best practice, we reset the detector
    if in_drift:
        # The drift detector indicates after each sample if there is a drift in the data
        print(f'Change detected at index {i}, {x}')
        drifts.append(i)
#        drift_detector.reset()   # As a best practice, we reset the detector

plot_data(df_clean[f], pd.Series(sems, index=df_clean.index), warnings=warnings, drifts=drifts)

In [None]:
sems

## Anomaly detector

In [None]:
df = tk.load_instance(real_instances[real_instances.label==2].iloc[0])
df_clean = df[flist+['class']].dropna()

f = 'T-TPT'

detector = anomaly.GaussianScorer(window_size=None, grace_period=300)

anomalies = []

for i, val in enumerate(df_clean[f]):
    anomalies.append(detector.score_one(None, val))
    detector = detector.learn_one(None, val)
anomalies = pd.Series(anomalies, index=df_clean.index)

fig, (ax1, ax2, ax3) = plt.subplots(nrows=3, ncols=1, figsize=(12, 6), sharex=True)
ax1.grid
ax2.grid
ax3.grid
ax1.plot(df_clean[f], label='Stream', color='blue')
ax2.plot(anomalies, label='Anomalies prob', color='red')
ax2.axhline(0.95, color='green')
ax3.plot(df_clean['class'], label='class', color='black')

plt.tight_layout
plt.show()