# Melusine + Arta's Workshop

## Business Context &#128221;

Your are working for an insurance company.

Customers are contacting your company for two main reasons:

* **Subscriptions:** subscribe, modify or close insurance policies.
* **Claims:** initiate or follow the claim process of a covered loss (car, housing, etc).

Many employees work on processing client emails through a custom user interface (i.e., a business application).

## Data Science Context &#x1F575;

You are a Data Scientist in charge of optimizing the email pre-processing (i.e., before being processed by a human).

For that purpose your goal is to automatize:

* The definition of the *priority* of an email to: low, normal or high.
* The categorization of an email into two target business units: claims and subscriptions.
* The reply in case of a thank you email.

Two open-source packages must be used:

* [Melusine](https://github.com/MAIF/melusine): Framework for automatic email processing.

> Use: create a detection pipeline and use it to identify relevant patterns in the emails.

* [Arta](https://github.com/MAIF/arta): A Python Rules Engine

> Use: apply rules to translate the detection outputs into clear actions or extra data needed by the business application.


First, let's make sure those packages are installed in your *python virtual environment*.

In [None]:
!pip install 'melusine>=3.2' arta

## Dataset

During this workshop, you will use a demo dataset containing a variety of emails in French and in English.
In the first part, you can explore the dataset, then you will use **Melusine** and **Arta** to qualify (i.e., pre-process) the emails.


Let's import the dataset and take a sneak peak.

In [None]:
import pandas as pd
df = pd.read_json("workshop_email_data.json")
df.head()

## Melusine: a processing pipeline

### Email Preprocessing

**Melusine** is a python framework to automatize emails processing.

One of the key feature is the default *preprocessing pipeline* which does the following:

1. **Cleaning**: remove undesired characters, replace non-breaking spaces, handle specific combinations (ex: "œ" => "oe").
1. **Email Segmentation**: separate an email conversation with mutiple replies and transfers into a list of individual messages.
1. **Message Tagging**: tag the different parts of the message (typically to get rid of unwanted footers and signature text).

These steps can be edited through configuration files or class inheritance but for the sake of this workshop we will stick with the default preprocessing pipeline.

Let's import and run this pipeline on our data.

In [None]:
from melusine.pipeline import MelusinePipeline


pipe = MelusinePipeline.from_config("preprocessing_pipeline")

In [None]:
preprocessed_df = pipe.transform(df)

# Look at last columns
preprocessed_df.head()

####  &#x1F449; Your turn &#x1F477;

Inspect the message corpus:

* Print the first and second messages of the dataset.
* Print all messages and their associated tags.
* Extract the part tagged as "BODY" of any message of your choice.


In [None]:
# 1st message
print(preprocessed_df.iloc[0])

In [None]:
# 2nd message's body
print(preprocessed_df.iloc[1]["det_clean_last_body"])

In [None]:
# Get and print the last message of the conversation (check the tags at the left)
first_mail = preprocessed_df.iloc[1]
last_msg = first_mail["messages"][0]
print(last_msg)

In [None]:
# Print its tags
last_msg.tags

In [None]:
# Extract the BODY tag
body_tags = [elem for elem in last_msg.tags if elem["refined_tag"] == "BODY" ]
for tag_data in body_tags:
    print(f"Tag: {tag_data['refined_tag']} ==text==> {tag_data['base_text']}")

### The MelusineRegex

The `MelusineRegex` is a convenient class to simplify detections using **regexes**. Main features are:

* Name "regex patterns" for readable and explainable results.
* Easily define patterns to be ignored and patterns that prevent matching (forget complex and unreadable negative look behind mecanisms).
* Keep matching and non-matching exemples close to the regex definition to avoid loosing track of what the regex does.

We will provide a working regex for this workshop: `EmergencyRegex`. This one will be used later to detect urgent requests in the emails.

*N.B: You can check the code of the class [MelusineRegex](https://github.com/MAIF/melusine/blob/29815b578ba852b7c79116a926ef0a0e3bd0e1d5/melusine/base.py#L314), in particular docstrings and methods' type hints.*

In [None]:
from melusine.base import MelusineRegex


class EmergencyRegex(MelusineRegex):
    
    @property
    def positive(self):
        """At least one of these patterns has to match for a global match of the MelusineRegex"""
        return {
            "regex_1": "urgen(t|ce)|emergency",
            "regex_2": r"as soon as possible", 
            "regex_3": r"(d[èe]s que|le plus vite|aussi vite que) possible",
        }
    
    @property
    def neutral(self):
        """Text matched by these patterns will be ignored (no influence on the global match of the MelusineRegex)"""
        return None
    
    @property
    def negative(self):
        """Any matching pattern will prevent the global match of the MelusineRegex"""
        return r"urgences"
    
    @property
    def match_list(self):
        """Text exemples that MUST be matched by the MelusineRegex"""
        return [
            "C'est urgent",
            "Ceci est une URGENCE",
            "Call me as soon as possible",
        ]
    
    @property
    def no_match_list(self):
        """Text exemples that MUST NOT be matched by the MelusineRegex"""
        return [
            "Je travaille dans le service des urgences",
        ]

In [None]:
# Instantiate your new MelusineRegex
reg = EmergencyRegex()

####  &#x1F449; Your turn &#x1F477;

Make sure that the `match_list` and `no_match_list` are coherent with the defined regexes (`positive()`).

If it is coherent, nothing happens, if not, the test method will raise an error.

In [None]:
reg.test()

### The MelusineDetector

Business requests may become more and more complex over time.

E.g., "Could you handle this weird edge case when the email is sent from an autonomous vehicle?"

The objective of the `MelusineDetector` is to standardise how *detections* are performed on emails and restrain the technical debt that occurs during the life of an application.

The key elements of a `MelusineDetector` are:

1. **Declaration of input and output columns**: missing columns in the input DataFrame can be identified early-on => Keep track of which detector created a column.
1. **pre_detect()**: processing to be done prior to detection. Typically assemble the text of interest.
1. **detect()**: performed the core detection using regex, machine learning model or heuristics.
1. **post_detect()**: combine the detection outputs to product the final detector result.

You can find more details in the Melusine documentation on [MelusineDetector tutorials](https://maif.github.io/melusine/tutorials/05a_MelusineDetectors).

In [None]:
# Install unidecode, needed by the detector
! pip install unidecode

In [None]:
from melusine.base import MelusineDetector
from unidecode import unidecode


class EmergencyDetector(MelusineDetector):
    """A detector to detect emergency in emails"""

    def __init__(self, name="emergency_detector"):
        self.regex = EmergencyRegex()

    @property
    def input_columns(self):
        """Input columns required by the detector"""
        return ["messages", "header"]

    @property
    def output_columns(self):
        """Output columns created by the detector"""
        return ["emergency_detector_result"]

    def pre_detect(self, row, debug_mode=False):
        """Data transformations prior to the core detection"""
        last_body = row["messages"][0].extract_text(target_tags=["BODY", "THANKS"])
        row["effective_text"] = row["header"] + "\n" + unidecode(last_body).lower()
        return row

    def detect(self, row, debug_mode=False):
        """Core detection method"""
        match_data = self.regex(row["effective_text"])
        row["emergency_detector_result"] = match_data["match_result"]
        
        if debug_mode:
            row[self.DEBUG_DICT_COL] = match_data
            
        return row
    
    def post_detect(self, row, debug_mode=False):
        """Data transformations posterior to the core detection"""
        return row

In [None]:
emergency_detector = EmergencyDetector()

####  &#x1F449; Your turn &#x1F477;

Run the detector on the email corpus and checkout the emergency detector's result (`emergency_detector_result` column).

In [None]:
df_emergency = emergency_detector.transform(preprocessed_df)
df_emergency[["det_clean_last_body", "emergency_detector_result"]][:5]

Now, implement a `ThanksRegex` and integrate it to a `ThanksDetector`.

In [None]:
class ThanksRegex(MelusineRegex):

    @property
    def positive(self):
        return {
            "thanks_fr": r"\b(re)?mercie?\b",
            "thanks_en": r"\bth(an|na)ks?( you)?\b",
        }

    @property
    def neutral(self):
        return None

    @property
    def negative(self):
        return {
            "negative_thanks_fr": r"merci de",
        }

    @property
    def match_list(self):
        return [
            "Merci beaucoup",
            "Je vous remercie",
            "THANK YOU SO MUCH !",
        ]

    @property
    def no_match_list(self):
        return [
            "Merci de m'envoyer les document rapidement",
        ]

In [None]:
class ThanksDetector(MelusineDetector):

    def __init__(self, name="thanks_detector"):
        self.regex = ThanksRegex()

    @property
    def input_columns(self):
        return ["messages", "header"]

    @property
    def output_columns(self):
        return ["thanks_detector_result"]

    def pre_detect(self, row, debug_mode=False):
        last_body = row["messages"][0].extract_text(target_tags=["BODY", "THANKS"])
        row["effective_text"] = row["header"] + "\n" + unidecode(last_body).lower()
        return row

    def detect(self, row, debug_mode=False):
        match_data = self.regex(row["tmp_clean_body"])
        row["thanks_detector_result"] = match_data["match_result"]

        if debug_mode:
            row[self.DEBUG_DICT_COL] = match_data

        return row

    def post_detect(self, row, debug_mode=False):
        return row

In [None]:
thanks_detector = ThanksDetector()

Now, implement a `RoutingRegex` and integrate it to a `RoutingDetector`.

In [None]:
class RoutingRegex(MelusineRegex):

    @property
    def positive(self):
        return {
            "contrat": r"\b(contrat|resil|souscri|attestation)",
            "sinistre": r"\b(sinistre|accident|facture|repar[ea]|claim)",
        }

    @property
    def neutral(self):
        ...

    @property
    def negative(self):
        ...

    @property
    def match_list(self):
        return [
            "Je souhaite souscrire à un contrat d'assurance",
            "Suite à mon sinistre du 10/02/2023",
            "Ma voiture est en réparation",
        ]

    @property
    def no_match_list(self):
        return [
            "Vous avez gagné 1000000€, cliquez ici pour les recevoir.",
        ]

In [None]:
class RoutingDetector(MelusineDetector):

    def __init__(self, name="routing_detector"):
        self.regex = RoutingRegex()

    @property
    def input_columns(self):
        return ["body"]

    @property
    def output_columns(self):
        return ["routing_detector_result"]

    def pre_detect(self, row, debug_mode=False):
        row["tmp_clean_body"] = unidecode(row["body"]).lower()
        return row

    def detect(self, row, debug_mode=False):
        match_data = self.regex(row["tmp_clean_body"])
        row["routing_detector_match_data"] = match_data

        if debug_mode:
            row[self.DEBUG_DICT_COL] = match_data

        return row

    def post_detect(self, row, debug_mode=False):
        positive_match_data = row["routing_detector_match_data"]["positive_match_data"]
        if ("sinistre" in positive_match_data) and ("contrat" in positive_match_data):
            routing = "Autre"
        elif "sinistre" in positive_match_data:
            routing = "Sinistre"
        elif "contrat" in positive_match_data:
            routing = "Contrat"
        else:
            routing = "Autre"

        row["routing_detector_result"] = routing

        return row

In [None]:
routing_detector = RoutingDetector()

### The MelusinePipeline

The `MelusinePipeline` object (inheriting from the sklearn Pipeline object) lets you assemble all the detectors into a pipeline that can be executed with a simple `transform()` method. It also brings features like:
   * input columns check (raises an error if mandatory input columns are missing)
   * pipeline load from (YAML) configuration files.

Let's assemble the pipeline with the detectors we have created.

In [None]:
from melusine.pipeline import MelusinePipeline

pipe = MelusinePipeline([
    ("emergency_detector", emergency_detector),
    ("thanks_detector", thanks_detector),
    ("routing_detector", routing_detector),
])

Finally, let's run the pipeline on our DataFrame:

In [None]:
df_transformed = pipe.transform(preprocessed_df)

## Arta: a rules engine

### Use Case

Thanks to the previous **Melusine** pipeline, we are able to process emails and get the following extra details:

* Is it an urgent email?
* Is it about claims or subscriptions?
* Is it a thank you email?

But this new data is not enough. Remember, we need to define for each email a *priority* and if we need an *automatic reply* or not.

After talking with the people in charge of handling the customers emails, we defined the following business rules:

#### Email Priority

1. If an email is urgent, regardless of the business unit (claims or subscriptions), the priority should be set to "high".
2. If an email is a thank you email for the claims, priority should be set to "low".
3. Other cases should be set to "medium".

#### Automatic Reply

1. If the email is about subscriptions and is a thank you email, you should send the automatic reply which id is "THANK_U_4_YOUR_THANK_U".
2. Do nothing for other cases.


**Arta**`s goal is to simplify *rules definition* and *rules execution** in order to produce the expected outputs (priority and automatic reply here).

To do that, we simply need to:

1. Prepare the input data.
1. Define the business rules.
1. Implement the needed functions.
1. Use the rules engine to apply the rules.

### Input Data

In **Arta**, `input_data` is the data fed to the rules engine. In other words, it is the data on which the rules are applied.

It must be a [mapping](https://docs.python.org/3/glossary.html#term-mapping) object (i.e., a dictionary like object).

This dictionary could be an example:

```python
input_data = {
    "id": 42,
    "cleaned_body": "Hi, I tried to reach you but unfortunately blah blah...", 
    "business": "CLAIMS",
    "emergency": True, 
    "thank_you": False,
}
```

First, we are going to develop a function that maps the Melusine pipeline's output to the Arta's input data.

####  &#x1F449; Your turn &#x1F477;

Implement a function to convert a line (i.e., a `pandas.Series`) of the `DataFrame` referenced in the `processed_df` variable into a python dictionary with the same schema as below.

In [None]:
def convert_to_dict(row):
    line = row.to_dict()
    result = {}
    result["id"] = row.name
    result["cleaned_body"] = line["det_clean_last_body"]
    # result["business"] = "CLAIMS" if line["claims_detector_result"] else "SUBSCRIPTIONS"
    result["emergency"] = line["emergency_detector_result"]
    # result["thank_you"] = line["thank_you_detector_result"]
    return result

In [None]:
# Test
input_data = convert_to_dict(df_transformed.iloc[0])
print(input_data)

### Rule Group

In **Arta**, a **rule group** is grouping different **rules** with a common output (generally, but not always).

In our case, we could define 2 groups:

1. `priority`
1. `auto_reply`

**Rule groups** are made of **rules** and they are bundled into a **rule set**. 
All of these are defined in a configuration file (i.e., *YAML file*) as you can see in the [documentation](https://maif.github.io/arta/how_to/#rule-set-and-rule-group).

####  &#x1F449; Your turn &#x1F477;

Define above rules using the following template. You can name the **validation functions* as you want, you will implement them later.

*Hint:  use the [standard conditions](https://maif.github.io/arta/how_to/#simple-condition).*

Template:

```yaml
---
rules:
  default_rule_set:
    admission:
# Complete below
        

actions_source_modules:
  - assets.actions
```

Solution:

```yaml
---
rules:
  default_rule_set:
    priority:
      HIGH:
        simple_condition: input.emergency==True
        action: set_priority
        action_parameters:
          value: HIGH
      LOW:
        simple_condition: input.thank_you==True and input.business=="CLAIMS"
        action: set_priority
        action_parameters:
          value: LOW     
      MEDIUM:
        simple_condition: null
        action: set_priority
        action_parameters:
          value: MEDIUM
    auto_reply:
      THANK_YOU:
        simple_condition: input.thank_you==True and input.business=="SUBSCRIPTIONS"
        action: send_reply
        action_parameters:
          reply_id: THANK_U_4_YOUR_THANK_U

actions_source_modules:
  - assets.actions
```

When your are done, copy and paste these rules into a new YAML file: `./assets/rules.yaml` (file name of your choice).

### Action Functions

A *rule* is made of *conditions* and one *action*. When conditions are validated (i.e., True) the *action* is executed.

In fact, *actions* are just regular python *function* and they are named **action functions**.

If you go back to the YAML definition of your rules, you can see:

```yaml
        action: set_priority
        action_parameters:
          value: MEDIUM
```

* `action:` is the name of your action function.
* `action_parameters:` are the names and the values of the arguments passed to the function when executed.

####  &#x1F449; Your turn &#x1F477;

Look at your rules and list the validation functions that you need to implement:

* ...
* ...

Now, implement them in the following python module `rules/actions.py`. 

*Hint: look at the following [documentation page](https://maif.github.io/arta/a_simple_example/#actions).*

In [None]:
def set_priority(value, **kwargs):
    return {"priority": value}


def send_reply(reply_id, **kwargs):
    # Fake rest api call 
    print(f"The following email model was automatically sent: {reply_id}")

### Rule Engine

Last but not least, we need to apply the rules on our data, it is the purpose of the rules engine.

####  &#x1F449; Your turn &#x1F477;

Instanciate a rules engine and apply the rules on the **Melusine** pipeline results for one email.

*Hint: [Usage documentation](https://maif.github.io/arta/how_to/#usage).*

In [None]:
# Rules engine instanciation
from arta import RulesEngine


eng = RulesEngine(config_path="./assets")

In [None]:
# Apply rules on one email
input_data = convert_to_dict(df_transformed.iloc[0])

results = eng.apply_rules(input_data)
print(results)

####  &#x1F449; Your turn &#x1F477;

Apply on all emails and print their results.

In [None]:
# Function to apply rules
def apply_rules(row, rules_engine):
    input_data = convert_to_dict(row)
    result = rules_engine.apply_rules(input_data)
    return {
        "priority": result["priority"]["priority"],
        "auto_reply": result["auto_reply"],
    }

# Run on all emails
df_transformed[["priority", "auto_reply"]] = df_transformed.apply(apply_rules, axis=1, rules_engine=eng, result_type='expand')

Compare the obtained results with expected ones.

In [None]:
df_delta = df_transformed[df_transformed["priority"] != df_transformed["expected_priority"]]
i = 0
print(f"Obtained priority: {df_delta['priority'][i]['priority']}")
print(f"Expected priority: {df_delta['priority'][i]['expected_priority']}")
print(df_delta['priority'][i])

## End

Now we can feed the employee business application with the emails and the needed extra informations. Automatic replies and prioritization will speed up the email processing.