# Implementing Security and Compliance in MLOps with Airflow


# Tasks

1. Set Up Airflow and Enable RBAC

2. Implement Data Protection Measures

3. Secure Model Artifacts

4. Enable Audit Logging

5. Apply Data Retention Policies

6. Test and Verify the Pipeline

# Task 1: Set Up Airflow and Enable RBAC


### Enable RBAC



#### Open the airflow.cfg file and set

In [None]:
rbac = True


### Restart the Airflow webserver and scheduler.



# Steps to Create Roles and Assign Permissions in Airflow

## Create Roles and Assign Permissions:
1. Navigate to the **Security > Roles** section in the Airflow UI.
2. Create roles (e.g., `data_scientist`, `admin`, `auditor`).
3. Assign permissions to roles (e.g., `can_read`, `can_edit`, `can_delete`).

## Assign Users to Roles:
1. Go to the **Security > Users** section.
2. Create users and assign them to the desired roles.

# Task 2: Implement Data Protection Measures

## Encrypt Sensitive Data:
- Utilize the `cryptography` library to encrypt sensitive data (e.g., the target column) during the data ingestion process.

## Mask Sensitive Data:
- Replace sensitive data with placeholders during the preprocessing stage to ensure data privacy.

In [None]:

from datetime import datetime
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
import os
from cryptography.fernet import Fernet
import logging

# Ensure the necessary directories exist
os.makedirs('data', exist_ok=True)
os.makedirs('results', exist_ok=True)

# Step 1: Data Ingestion with Encryption
def data_ingestion():
    print("Starting data ingestion...")
    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['target'] = data.target

    # Encrypt sensitive data (e.g., target column)
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    df['target'] = df['target'].apply(lambda x: cipher_suite.encrypt(str(x).encode()))
    df.to_csv('data/encrypted_breast_cancer.csv', index=False)
    print("Data ingestion completed. Encrypted data saved to 'data/encrypted_breast_cancer.csv'.")

# Step 2: Data Preprocessing
def data_preprocessing():
    print("Starting data preprocessing...")
    df = pd.read_csv('data/encrypted_breast_cancer.csv')

    # Decrypt the target column
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    df['target'] = df['target'].apply(lambda x: int(cipher_suite.decrypt(eval(x)).decode()))

    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Save preprocessed data
    pd.DataFrame(X_train).to_csv('data/X_train.csv', index=False)
    pd.DataFrame(X_test).to_csv('data/X_test.csv', index=False)
    pd.DataFrame(y_train, columns=['target']).to_csv('data/y_train.csv', index=False)
    pd.DataFrame(y_test, columns=['target']).to_csv('data/y_test.csv', index=False)

    # Save the scaler
    joblib.dump(scaler, 'results/scaler.pkl')
    print("Data preprocessing completed. Preprocessed data and scaler saved.")


# Task 3: Secure Model Artifacts

## Encrypt Model Files:
- Encrypt the trained model before saving it to disk to ensure confidentiality and prevent unauthorized access.

## Sign Model Artifacts:
- Use digital signatures to validate the integrity of model artifacts and ensure they have not been tampered with.


In [None]:
# Step 3: Model Training with Encryption
def model_training():
    print("Starting model training...")
    X_train = pd.read_csv('data/X_train.csv')
    y_train = pd.read_csv('data/y_train.csv')

    # Convert y_train to a 1D array
    y_train = y_train.values.ravel()

    # Train the model
    model = LogisticRegression()
    model.fit(X_train, y_train)

    # Encrypt and save the trained model
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    joblib.dump(model, 'results/model.pkl')
    with open('results/model.pkl', 'rb') as f:
        model_data = f.read()
    encrypted_model = cipher_suite.encrypt(model_data)
    with open('results/encrypted_model.pkl', 'wb') as f:
        f.write(encrypted_model)
    print("Model training completed. Encrypted model saved to 'results/encrypted_model.pkl'.")

# Task 4: Enable Audit Logging

## Log Actions in the Pipeline:
- Use Python’s `logging` module to log actions and events in the ML pipeline for better traceability.




In [None]:
# Step 4: Model Evaluation with Logging
def model_evaluation():
    print("Starting model evaluation...")
    X_test = pd.read_csv('data/X_test.csv')
    y_test = pd.read_csv('data/y_test.csv')

    # Convert y_test to a 1D array
    y_test = y_test.values.ravel()

    # Load the model
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    with open('results/encrypted_model.pkl', 'rb') as f:
        encrypted_model = f.read()
    decrypted_model = cipher_suite.decrypt(encrypted_model)
    with open('results/decrypted_model.pkl', 'wb') as f:
        f.write(decrypted_model)
    model = joblib.load('results/decrypted_model.pkl')

    # Make predictions
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Model Accuracy: {accuracy}")

    # Log the accuracy
    logging.basicConfig(filename='pipeline_audit.log', level=logging.INFO)
    logging.info(f"Model Accuracy: {accuracy}")

    # Save the accuracy result
    with open('results/accuracy.txt', 'w') as f:
        f.write(f"Accuracy: {accuracy}")
    print("Model evaluation completed. Accuracy saved to 'results/accuracy.txt'.")


# Task 5: Apply Data Retention Policies

## Delete Old Files:
- Implement a Python function to delete old files after a specified retention period to ensure compliance with data retention policies.

## Schedule the Cleanup Task:
- Add the cleanup task to the DAG and configure it to run periodically as part of the workflow.


In [None]:
import os
import time

def delete_old_files(directory, days):
    current_time = time.time()
    for file in os.listdir(directory):
        file_path = os.path.join(directory, file)
        if os.path.getmtime(file_path) < current_time - days * 86400:
            os.remove(file_path)
            print(f"Deleted {file_path}")

delete_old_files('data', days=30)  # Delete files older than 30 days

# Task 6: Test and Verify the Pipeline

## Trigger the DAG:
- Use the Airflow UI to manually trigger the DAG and monitor its execution in real-time.
## View Logs in the Airflow UI:
- Navigate to the **Browse > Task Logs** section in the Airflow UI to view detailed logs for each task.

## Verify Security and Compliance Measures:
- Review logs to confirm that:
  - Encryption is applied correctly.
  - Data masking is performed as expected.
  - Audit logging is capturing all necessary actions.


In [1]:
''' 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
import os
from cryptography.fernet import Fernet
import logging

# Ensure the necessary directories exist
os.makedirs('data', exist_ok=True)
os.makedirs('results', exist_ok=True)

# Step 1: Data Ingestion with Encryption
def data_ingestion():
    print("Starting data ingestion...")
    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['target'] = data.target

    # Encrypt sensitive data (e.g., target column)
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    df['target'] = df['target'].apply(lambda x: cipher_suite.encrypt(str(x).encode()))
    df.to_csv('data/encrypted_breast_cancer.csv', index=False)
    print("Data ingestion completed. Encrypted data saved to 'data/encrypted_breast_cancer.csv'.")

# Step 2: Data Preprocessing
def data_preprocessing():
    print("Starting data preprocessing...")
    df = pd.read_csv('data/encrypted_breast_cancer.csv')

    # Decrypt the target column
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    df['target'] = df['target'].apply(lambda x: int(cipher_suite.decrypt(eval(x)).decode()))

    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Save preprocessed data
    pd.DataFrame(X_train).to_csv('data/X_train.csv', index=False)
    pd.DataFrame(X_test).to_csv('data/X_test.csv', index=False)
    pd.DataFrame(y_train, columns=['target']).to_csv('data/y_train.csv', index=False)
    pd.DataFrame(y_test, columns=['target']).to_csv('data/y_test.csv', index=False)

    # Save the scaler
    joblib.dump(scaler, 'results/scaler.pkl')
    print("Data preprocessing completed. Preprocessed data and scaler saved.")

# Step 3: Model Training with Encryption
def model_training():
    print("Starting model training...")
    X_train = pd.read_csv('data/X_train.csv')
    y_train = pd.read_csv('data/y_train.csv')

    # Convert y_train to a 1D array
    y_train = y_train.values.ravel()

    # Train the model
    model = LogisticRegression()
    model.fit(X_train, y_train)

    # Encrypt and save the trained model
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    joblib.dump(model, 'results/model.pkl')
    with open('results/model.pkl', 'rb') as f:
        model_data = f.read()
    encrypted_model = cipher_suite.encrypt(model_data)
    with open('results/encrypted_model.pkl', 'wb') as f:
        f.write(encrypted_model)
    print("Model training completed. Encrypted model saved to 'results/encrypted_model.pkl'.")

# Step 4: Model Evaluation with Logging
def model_evaluation():
    print("Starting model evaluation...")
    X_test = pd.read_csv('data/X_test.csv')
    y_test = pd.read_csv('data/y_test.csv')

    # Convert y_test to a 1D array
    y_test = y_test.values.ravel()

    # Load the model
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    with open('results/encrypted_model.pkl', 'rb') as f:
        encrypted_model = f.read()
    decrypted_model = cipher_suite.decrypt(encrypted_model)
    with open('results/decrypted_model.pkl', 'wb') as f:
        f.write(decrypted_model)
    model = joblib.load('results/decrypted_model.pkl')

    # Make predictions
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Model Accuracy: {accuracy}")

    # Log the accuracy
    logging.basicConfig(filename='pipeline_audit.log', level=logging.INFO)
    logging.info(f"Model Accuracy: {accuracy}")

    # Save the accuracy result
    with open('results/accuracy.txt', 'w') as f:
        f.write(f"Accuracy: {accuracy}")
    print("Model evaluation completed. Accuracy saved to 'results/accuracy.txt'.")

# Define the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 10, 1),
    'retries': 1,
}

dag = DAG(
    'ml_pipeline_dag',
    default_args=default_args,
    description='End-to-End ML Pipeline with Security and Compliance',
    schedule_interval='@daily',  # Run the DAG daily
    catchup=False,  # Disable catchup to avoid backfilling
)

# Define tasks
ingestion_task = PythonOperator(
    task_id='data_ingestion',
    python_callable=data_ingestion,
    dag=dag,
)

preprocessing_task = PythonOperator(
    task_id='data_preprocessing',
    python_callable=data_preprocessing,
    dag=dag,
)

training_task = PythonOperator(
    task_id='model_training',
    python_callable=model_training,
    dag=dag,
)

evaluation_task = PythonOperator(
    task_id='model_evaluation',
    python_callable=model_evaluation,
    dag=dag,
)

# Define task dependencies
ingestion_task >> preprocessing_task >> training_task >> evaluation_task

'''

' from airflow import DAG\nfrom airflow.operators.python_operator import PythonOperator\nfrom datetime import datetime\nimport pandas as pd\nfrom sklearn.datasets import load_breast_cancer\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.preprocessing import StandardScaler\nfrom sklearn.linear_model import LogisticRegression\nfrom sklearn.metrics import accuracy_score\nimport joblib\nimport os\nfrom cryptography.fernet import Fernet\nimport logging\n\n# Ensure the necessary directories exist\nos.makedirs(\'data\', exist_ok=True)\nos.makedirs(\'results\', exist_ok=True)\n\n# Step 1: Data Ingestion with Encryption\ndef data_ingestion():\n    print("Starting data ingestion...")\n    data = load_breast_cancer()\n    df = pd.DataFrame(data.data, columns=data.feature_names)\n    df[\'target\'] = data.target\n\n    # Encrypt sensitive data (e.g., target column)\n    key = Fernet.generate_key()\n    cipher_suite = Fernet(key)\n    df[\'target\'] = df[\'target\'].apply(lambda