# End-to-End Data Cleaning Pipeline with Raha and Baran (Minimal and Sequential)
We build an end-to-end data cleaning pipeline with our configuration-free error detection and correction systems, Raha and Baran.

In [1]:
import pandas
import IPython.display
import os 
import logging
import daskraha as raha
import daskraha.dask_version.dataset_parallel as dp


## Error Detection with Raha

### 1. Instantiating the Detection Class
We first instantiate the `Detection` class.

In [2]:
app_1 = raha.dask_version.detection_parallel.DetectionParallel()
# How many tuples would you label?
app_1.LABELING_BUDGET = 20
# Would you like to see the logs?
app_1.VERBOSE = True

### 2. Instantiating the Dataset
We next load and instantiate the dataset object.

In [3]:
dataset_dictionary = {
    "name": "flights",
    "path": "../datasets/flights/dirty.csv",
    "clean_path": "../datasets/flights/clean.csv"
}

dataset_par = dp.DatasetParallel(dataset_dictionary)
shared_df = app_1.initialize_dataframe(dataset_dictionary["path"])

if app_1.VERBOSE:
    print("Starting Cluster...")
client = app_1.start_dask_cluster(
    num_workers=os.cpu_count(), logging_level=logging.ERROR
)
client.run(app_1.init_workers)
dataset_par, differences_dict = app_1.initialize_dataset(dataset_dictionary)


Starting Cluster...
7 cells are detected by ["OD", ["histogram", "0.3", "0.1"]]
7 cells are detected by ["OD", ["histogram", "0.1", "0.1"]]
7 cells are detected by ["OD", ["histogram", "0.3", "0.3"]]
7 cells are detected by ["OD", ["histogram", "0.1", "0.3"]]
7 cells are detected by ["OD", ["histogram", "0.1", "0.5"]]
7 cells are detected by ["OD", ["histogram", "0.3", "0.5"]]
7 cells are detected by ["OD", ["histogram", "0.1", "0.9"]]
7 cells are detected by ["OD", ["histogram", "0.1", "0.7"]]
7 cells are detected by ["OD", ["histogram", "0.5", "0.7"]]
7 cells are detected by ["OD", ["histogram", "0.5", "0.1"]]
7 cells are detected by ["OD", ["histogram", "0.3", "0.7"]]
7 cells are detected by ["OD", ["histogram", "0.7", "0.1"]]
7 cells are detected by ["OD", ["histogram", "0.5", "0.3"]]
7 cells are detected by ["OD", ["histogram", "0.5", "0.5"]]
7 cells are detected by ["OD", ["histogram", "0.3", "0.9"]]
7 cells are detected by ["OD", ["histogram", "0.5", "0.9"]]
7 cells are detected

### 3. Running Error Detection Strategies
Raha runs (all or the promising) error detection strategies on the dataset. This step could take a while because all the strategies should be run on the dataset. 

In [4]:
app_1.run_strategies(dataset_par)

Raha strategy metadata generation(parallel): 0.08550477027893066
Raha running all strategies total time(parallel): 21.516072511672974


[{'name': '["OD", ["histogram", "0.1", "0.1"]]',
  'output_col_0': [(0, 0),
   (1, 0),
   (2, 0),
   (3, 0),
   (4, 0),
   (5, 0),
   (6, 0),
   (7, 0),
   (8, 0),
   (9, 0),
   (10, 0),
   (11, 0),
   (12, 0),
   (13, 0),
   (14, 0),
   (15, 0),
   (16, 0),
   (17, 0),
   (18, 0),
   (19, 0),
   (20, 0),
   (21, 0),
   (22, 0),
   (23, 0),
   (24, 0),
   (25, 0),
   (26, 0),
   (27, 0),
   (28, 0),
   (29, 0),
   (30, 0),
   (31, 0),
   (32, 0),
   (33, 0),
   (34, 0),
   (35, 0),
   (36, 0),
   (37, 0),
   (38, 0),
   (39, 0),
   (40, 0),
   (41, 0),
   (42, 0),
   (43, 0),
   (44, 0),
   (45, 0),
   (46, 0),
   (47, 0),
   (48, 0),
   (49, 0),
   (50, 0),
   (51, 0),
   (52, 0),
   (53, 0),
   (54, 0),
   (55, 0),
   (56, 0),
   (57, 0),
   (58, 0),
   (59, 0),
   (60, 0),
   (61, 0),
   (62, 0),
   (63, 0),
   (64, 0),
   (65, 0),
   (66, 0),
   (67, 0),
   (68, 0),
   (69, 0),
   (70, 0),
   (71, 0),
   (72, 0),
   (73, 0),
   (74, 0),
   (75, 0),
   (76, 0),
   (77, 0),
   (78, 0

### 4. Generating Features
Raha then generates a feature vector for each data cell based on the output of error detection strategies. 

In [5]:
app_1.generate_features(dataset_par)

Generate Features(parallel): 0.1339869499206543


[['2e133-f-r-c0',
  '2e133-f-r-c1',
  '2e133-f-r-c2',
  '2e133-f-r-c3',
  '2e133-f-r-c4',
  '2e133-f-r-c5',
  '2e133-f-r-c6']]

### 5. Building Clusters
Raha next builds a hierarchical clustering model for our clustering-based sampling approach.

In [6]:
app_1.build_clusters(dataset_par)

Build clusters (parallel): 0.41680097579956055


[[0,
  {2: {},
   3: {},
   4: {},
   5: {},
   6: {},
   7: {},
   8: {},
   9: {},
   10: {},
   11: {},
   12: {},
   13: {},
   14: {},
   15: {},
   16: {},
   17: {},
   18: {},
   19: {},
   20: {},
   21: {}},
  {2: {},
   3: {},
   4: {},
   5: {},
   6: {},
   7: {},
   8: {},
   9: {},
   10: {},
   11: {},
   12: {},
   13: {},
   14: {},
   15: {},
   16: {},
   17: {},
   18: {},
   19: {},
   20: {},
   21: {}}],
 [1,
  {2: {},
   3: {},
   4: {},
   5: {},
   6: {},
   7: {},
   8: {},
   9: {},
   10: {},
   11: {},
   12: {},
   13: {},
   14: {},
   15: {},
   16: {},
   17: {},
   18: {},
   19: {},
   20: {},
   21: {}},
  {2: {},
   3: {},
   4: {},
   5: {},
   6: {},
   7: {},
   8: {},
   9: {},
   10: {},
   11: {},
   12: {},
   13: {},
   14: {},
   15: {},
   16: {},
   17: {},
   18: {},
   19: {},
   20: {},
   21: {}}],
 [2,
  {2: {},
   3: {},
   4: {},
   5: {},
   6: {},
   7: {},
   8: {},
   9: {},
   10: {},
   11: {},
   12: {},
   13: {},
   14: 

### 6. Interactive Tuple Sampling and Labeling
Raha then iteratively samples a tuple. We should label data cells of each sampled tuple.

In [7]:
while len(dataset_par.labeled_tuples) < app_1.LABELING_BUDGET:
    app_1.sample_tuple(dataset_par)
    if dataset_par.has_ground_truth:
        # Compute the differences dictionary (if not already available)
        differences_dict = dataset_par.get_actual_errors_dictionary()
        # Load the clean dataframe
        clean_dataframe = dataset_par.read_csv_dataframe(dataset_par.clean_path)

        app_1.label_with_ground_truth(dataset_par, differences_dict, clean_dataframe)
        # app_1.label_with_ground_truth(dataset_par)
    else:
        print("Label the dirty cells in the following sampled tuple.")
        sampled_tuple = pandas.DataFrame(data=[dataset_par.dataframe.iloc[dataset_par.sampled_tuple, :]], columns=dataset_par.dataframe.columns)
        IPython.display.display(sampled_tuple)
        for j in range(d.dataframe.shape[1]):
            cell = (dataset_par.sampled_tuple, j)
            value = dataset_par.dataframe.iloc[cell]
            correction = input("What is the correction for value '{}'? Type in the same value if it is not erronous.\n".format(value))
            user_label = 1 if value != correction else 0
            dataset_par.labeled_cells[cell] = [user_label, correction]
        dataset_par.labeled_tuples[dataset_par.sampled_tuple] = 1

Tuple 2141 is sampled
Tuple 2141 is labeled.
Tuple 1714 is sampled
Tuple 1714 is labeled.
Tuple 74 is sampled
Tuple 74 is labeled.
Tuple 313 is sampled
Tuple 313 is labeled.
Tuple 2350 is sampled
Tuple 2350 is labeled.
Tuple 1156 is sampled
Tuple 1156 is labeled.
Tuple 1898 is sampled
Tuple 1898 is labeled.
Tuple 1711 is sampled
Tuple 1711 is labeled.
Tuple 879 is sampled
Tuple 879 is labeled.
Tuple 565 is sampled
Tuple 565 is labeled.
Tuple 1108 is sampled
Tuple 1108 is labeled.
Tuple 234 is sampled
Tuple 234 is labeled.
Tuple 217 is sampled
Tuple 217 is labeled.
Tuple 1064 is sampled
Tuple 1064 is labeled.
Tuple 2145 is sampled
Tuple 2145 is labeled.
Tuple 528 is sampled
Tuple 528 is labeled.
Tuple 1932 is sampled
Tuple 1932 is labeled.
Tuple 145 is sampled
Tuple 145 is labeled.
Tuple 1454 is sampled
Tuple 1454 is labeled.
Tuple 1731 is sampled
Tuple 1731 is labeled.


### 7. Propagating User Labels
Raha then propagates each user label through its cluster.

In [8]:
app_1.propagate_labels(dataset_par)

The number of labeled data cells increased from 140 to 140.
Propagating labels (parallel): 0.00013327598571777344


### 8. Predicting Labels of Data Cells
Raha then trains and applies one classifier per data column to predict the label of the rest of data cells.

In [9]:
app_1.predict_labels(dataset_par)

Predict (parallel): 0.12921667098999023


### 9. Storing Results
Raha can also store the error detection results.

In [10]:
app_1.store_results(dataset_par)

The results are stored in ../datasets/flights/raha-baran-results-flights/error-detection/detection.dataset.


### 10. Evaluating the Error Detection Task
We can finally evaluate our error detection task.

In [11]:
p, r, f = dataset_par.get_data_cleaning_evaluation(dataset_par.detected_cells)[:3]
print("Raha's performance on {}:\nPrecision = {:.2f}\nRecall = {:.2f}\nF1 = {:.2f}".format(dataset_par.name, p, r, f))

Raha's performance on flights:
Precision = 0.86
Recall = 0.79
F1 = 0.83


# Error Correction with Baran

### 1. Instantiating the Correction Class
We first instantiate the `Correction` class.

In [12]:
app_2 = raha.dask_version.correction_parallel.CorrectionParallel()

# How many tuples would you label?
app_2.LABELING_BUDGET = 20

# Would you like to see the logs?
app_2.VERBOSE = True

### 2. Initializing the Dataset Object
We next initialize the dataset object.

In [13]:
dataset_par2 = app_2.initialize_dataset(dataset_par)
# dataset_par2.dataframe.head()

### 3. Initializing the Error Corrector Models
Baran initializes the error corrector models.

In [14]:
app_2.initialize_models(dataset_par2)

The error corrector models are initialized.


### 4. Interactive Tuple Sampling, Labeling, Model updating, Feature Generation, and Correction Prediction
Baran then iteratively samples a tuple. We should label data cells of each sampled tuple. It then udpates the models accordingly and generates a feature vector for each pair of a data error and a correction candidate. Finally, it trains and applies a classifier to each data column to predict the final correction of each data error. Since we already labeled tuples for Raha, we use the same labeled tuples and do not label new tuples here.

In [15]:
# while len(d.labeled_tuples) < app_2.LABELING_BUDGET:
#     app_2.sample_tuple(d)
#     if d.has_ground_truth:
#         app_2.label_with_ground_truth(d)
#     else:
#         print("Label the dirty cells in the following sampled tuple.")
#         sampled_tuple = pandas.DataFrame(data=[d.dataframe.iloc[d.sampled_tuple, :]], columns=d.dataframe.columns)
#         IPython.display.display(sampled_tuple)
#         for j in range(d.dataframe.shape[1]):
#             cell = (d.sampled_tuple, j)
#             value = d.dataframe.iloc[cell]
#             correction = input("What is the correction for value '{}'? Type in the same value if it is not erronous.\n".format(value))
#             user_label = 1 if value != correction else 0
#             d.labeled_cells[cell] = [user_label, correction]
#         d.labeled_tuples[d.sampled_tuple] = 1
#     app_2.update_models(d)
#     app_2.predict_corrections(d)


for si in dataset_par2.labeled_tuples:
    dataset_par2.sampled_tuple = si
    
    # run Pipeline, so that column_prediction_futures gets assigned
    # app_2.run(dataset_par2)
    
    app_2.update_models(dataset_par2)

    # Prevent AttributeError: 'DatasetParallel' object has no attribute 'column_prediction_futures
    if not hasattr(dataset_par2, 'column_prediction_futures'):
        dataset_par2.column_prediction_futures = []
    
    app_2.predict_corrections(dataset_par2)


The error corrector models are updated with new labeled tuple 2141.
The error corrector models are updated with new labeled tuple 1714.
The error corrector models are updated with new labeled tuple 74.
The error corrector models are updated with new labeled tuple 313.
The error corrector models are updated with new labeled tuple 2350.
The error corrector models are updated with new labeled tuple 1156.
The error corrector models are updated with new labeled tuple 1898.
The error corrector models are updated with new labeled tuple 1711.
The error corrector models are updated with new labeled tuple 879.
The error corrector models are updated with new labeled tuple 565.
The error corrector models are updated with new labeled tuple 1108.
The error corrector models are updated with new labeled tuple 234.
The error corrector models are updated with new labeled tuple 217.
The error corrector models are updated with new labeled tuple 1064.
The error corrector models are updated with new labeled

### 5. Storing Results
Baran can also store the error correction results.

In [18]:
app_2.store_results(dataset_par2)

The results are stored in ../datasets/flights/raha-baran-results-flights/error-correction/correction.dataset.


### 6. Evaluating the Error Correction Task
We can finally evaluate our error correction task.

In [19]:
p, r, f = dataset_par2.get_data_cleaning_evaluation(dataset_par2.corrected_cells)[-3:]
print("Baran's performance on {}:\nPrecision = {:.2f}\nRecall = {:.2f}\nF1 = {:.2f}".format(dataset_par2.name, p, r, f))

Baran's performance on flights:
Precision = 0.00
Recall = 0.00
F1 = 0.00
