# **Handle Missing value using Imputation**

In [1]:
import pandas as pd
from sklearn.impute import SimpleImputer

# Example DataFrame with missing values
data = pd.DataFrame({
    'hour_of_day': [1, 2, None, 4, 5],
    'day_of_week': [1, None, 3, 4, 5],
    'distance': [10, 20, 30, None, 50],
    'user_role': [1, 2, 1, 2, None],
    'migration_frequency': [5, 3, None, 2, 7]
})

# Display the DataFrame with missing values
print("DataFrame with missing values:")
print(data)

# Handle missing values using SimpleImputer
imputer = SimpleImputer(strategy='mean')  # You can change the strategy to 'median', 'most_frequent', or 'constant'

# Fit the imputer and transform the data
data_imputed = pd.DataFrame(imputer.fit_transform(data), columns=data.columns)

# Display the DataFrame after imputation
print("\nDataFrame after imputation:")
print(data_imputed)

DataFrame with missing values:
   hour_of_day  day_of_week  distance  user_role  migration_frequency
0          1.0          1.0      10.0        1.0                  5.0
1          2.0          NaN      20.0        2.0                  3.0
2          NaN          3.0      30.0        1.0                  NaN
3          4.0          4.0       NaN        2.0                  2.0
4          5.0          5.0      50.0        NaN                  7.0

DataFrame after imputation:
   hour_of_day  day_of_week  distance  user_role  migration_frequency
0          1.0         1.00      10.0        1.0                 5.00
1          2.0         3.25      20.0        2.0                 3.00
2          3.0         3.00      30.0        1.0                 4.25
3          4.0         4.00      27.5        2.0                 2.00
4          5.0         5.00      50.0        1.5                 7.00


# **Isolation Forest model**

In [2]:
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

# Assuming `data` is the DataFrame with feature-engineered migration data

data = pd.DataFrame({
    'hour_of_day': [1, 2, 3, 4, 5],
    'day_of_week': [1, 2, 3, 4, 5],
    'distance': [10, 20, 30, 40, 50],
    'user_role': [1, 2, 1, 2, 1],
    'migration_frequency': [5, 3, 6, 2, 7]
})
# Ensure `data` is a DataFrame and contains the required columns
required_columns = ['hour_of_day', 'day_of_week', 'distance', 'user_role', 'migration_frequency']
if not all(column in data.columns for column in required_columns):
    raise ValueError(f"DataFrame must contain the following columns: {required_columns}")

# Check for missing values
if data[required_columns].isnull().any().any():
    raise ValueError("Data contains missing values. Please handle them before proceeding.")

# Extract features
features = data[required_columns]

# Scale features for better performance
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

# Fit the Isolation Forest model
model = IsolationForest(contamination=0.05)  # You may adjust contamination depending on your expectations of anomalies
model.fit(features_scaled)

# Predict anomalies
anomalies = model.predict(features_scaled)
data['anomaly'] = anomalies  # Anomalies will be marked as -1, normal instances as 1

# Filter suspicious activities
suspicious_migrations = data[data['anomaly'] == -1]
print(suspicious_migrations)

   hour_of_day  day_of_week  distance  user_role  migration_frequency  anomaly
4            5            5        50          1                    7       -1


Output shows:

*   **hour_of_day:** The hour of the day when the migration occurred (4 AM).
*   **day_of_week:** The day of the week when the migration occurred (Friday, assuming 1 = Monday).
*   **distance:**The distance associated with the migration (5 units, the specific unit depends on your context).
*   **user_role:** The role of the user who initiated the migration (role ID 50).
*  **migration_frequency:** The frequency of migrations (7 times).

This is an Suspicious Activity as the anomaly is -1

# 2. **One Class SVM model to detect Anomalies**     
A One-Class SVM is another option for anomaly detection in high-dimensional spaces, and it works by learning a decision function for outlier detection.


In [3]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM

# Example data
data = pd.DataFrame({
    'feature1': [1, 2, 3, 4, 5],
    'feature2': [5, 4, 3, 2, 1]
})

# Scale the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(data)

# Fit the One-Class SVM model
model = OneClassSVM(nu=0.05, kernel='rbf', gamma='auto')
model.fit(features_scaled)

# Predict anomalies
anomalies = model.predict(features_scaled)
data['anomaly'] = anomalies  # Anomalies are marked as -1

# Show suspicious migrations
suspicious_migrations = data[data['anomaly'] == -1]
print(suspicious_migrations)

   feature1  feature2  anomaly
2         3         3       -1
4         5         1       -1


This shows the following data are the outliers present in the data set.

# **Anomaly Scores**
Anomaly detection, also called outlier detection, is the process of finding patterns in any dataset that deviate significantly from the expected or 'normal behavior.'

# **Anomaly detection model**

In [5]:
# Anomaly detection model for data leakage detection

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

def detect_data_leakage(data, contamination=0.1):

#Detects anomalies in data that might indicate data leakage.
#data: A pandas DataFrame containing the data. Features should be numerical.
#contamination of Data : The presense of outliers in the data set.
#Returns: A pandas Series with anomaly scores, where higher scores indicate a higher likelihood of data leakage.

    # Handle missing values (replace with mean for simplicity; consider more robust imputation)
    data = data.fillna(data.mean())

    # Use IsolationForest for anomaly detection
    model = IsolationForest(contamination=contamination, random_state=42)  # random_state for reproducibility
    model.fit(data)
    anomaly_scores = -model.decision_function(data)  # Invert scores for higher = more anomalous

    # Convert anomaly_scores to a pandas Series:
    anomaly_scores = pd.Series(anomaly_scores) # Convert to pandas Series

    return anomaly_scores


# Example Usage (replace with your actual data and features):
# Assuming 'data' is a pandas DataFrame with relevant features
# Example dataframe
data = {'feature1': [1, 6, 3, 4, 9, 100],
        'feature2': [6, 7, 5, 9, 10, 200]}
df = pd.DataFrame(data)

anomaly_scores = detect_data_leakage(df)

# Print the anomaly scores:
print("Anomaly Scores:")
print(anomaly_scores)


# You can set a threshold to identify potential data leaks
threshold = anomaly_scores.quantile(0.95)  # top 5 %
print(f"\nThreshold for anomaly detection: {threshold}")

potential_leaks = df[anomaly_scores > threshold]
print("\nPotential Data Leaks:")
potential_leaks

Anomaly Scores:
0   -0.199608
1   -0.226077
2   -0.214904
3   -0.214341
4   -0.148439
5    0.148439
dtype: float64

Threshold for anomaly detection: 0.07421926298355744

Potential Data Leaks:


Unnamed: 0,feature1,feature2
5,100,200


# **ONE-CLASS SVM**

In [6]:
# One class SVM

import pandas as pd
import numpy as np
from sklearn.svm import OneClassSVM

def detect_data_leakage(data, contamination=0.1):
    """
    Detects anomalies in data that might indicate data leakage using One-Class SVM.

    Args:
        data: A pandas DataFrame containing the data. Features should be numerical.
        contamination: The proportion of outliers in the data set.

    Returns:
        A pandas Series with anomaly scores, where higher scores indicate a higher likelihood of data leakage.
    """

    # Handle missing values (replace with mean for simplicity; consider more robust imputation)
    data = data.fillna(data.mean())

    # Use OneClassSVM for anomaly detection
    model = OneClassSVM(nu=contamination, kernel='rbf', gamma='scale') #'scale' for automatic gamma
    model.fit(data)
    anomaly_scores = -model.decision_function(data)  # Invert scores for higher = more anomalous

    # Convert anomaly_scores to a pandas Series:
    anomaly_scores = pd.Series(anomaly_scores) # Convert to pandas Series

    return anomaly_scores


# Example Usage (replace with your actual data and features):
# Assuming 'data' is a pandas DataFrame with relevant features
# Example dataframe
data = {'feature1': [1, 5, 3, 4, 5, 100],
        'feature2': [6, 7, 8, 9, 10, 200]}
df = pd.DataFrame(data)

anomaly_scores = detect_data_leakage(df)

# Print the anomaly scores:
print("Anomaly Scores:")
print(anomaly_scores)


# You can set a threshold to identify potential data leaks
threshold = anomaly_scores.quantile(0.95)  # top 5 %
print(f"\nThreshold for anomaly detection: {threshold}")

potential_leaks = df[anomaly_scores > threshold]
print("\nPotential Data Leaks:")
potential_leaks

Anomaly Scores:
0    0.000240
1    0.000121
2   -0.000117
3   -0.000028
4    0.000240
5   -0.000480
dtype: float64

Threshold for anomaly detection: 0.00023999159397512793

Potential Data Leaks:


Unnamed: 0,feature1,feature2
4,5,10


# **permutation importance**

In [7]:
#Advance techniques: permutation importance

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.inspection import permutation_importance

# Simulate data with a data leak
np.random.seed(0)
n_samples = 1000
X = pd.DataFrame({'feature1': np.random.rand(n_samples),
                   'feature2': np.random.rand(n_samples)})
y = np.random.randint(0, 2, n_samples)

# Introduce a data leak: 'leak' perfectly predicts y
X['leak'] = y

# Split data (leak is in both training and testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a model (leak will be highly important)
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)

# Evaluate the model
y_pred = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred)
print(f"AUC score: {auc}")  # Expect AUC to be near-perfect due to the leak


# Permutation Importance to highlight the leak
result = permutation_importance(model, X_test, y_test, n_repeats=30, random_state=42)

# Print feature importances
for i in result.importances_mean.argsort()[::-1]:
    print(f"{X_test.columns[i]:<8}"
          f"{result.importances_mean[i]:.3f}"
          f" +/- {result.importances_std[i]:.3f}")

#The 'leak' feature will have a very high importance score, indicative of data leakage.

AUC score: 1.0
leak    0.489 +/- 0.036
feature20.000 +/- 0.000
feature10.000 +/- 0.000


# **Encrypt a Certificate**

In [21]:


from google.colab import files
import os
from cryptography.fernet import Fernet

# Install cryptography library if not already installed
# !pip install cryptography  # Already installed in the provided code


def encrypt_file(filename, key):
    """Encrypts a file using Fernet."""
    f = Fernet(key)
    with open(filename, "rb") as file:
        file_data = file.read()
    encrypted_data = f.encrypt(file_data)
    encrypted_filename = filename + ".encrypted"
    with open(encrypted_filename, "wb") as file:
        file.write(encrypted_data)
    print(f"File '{filename}' encrypted successfully as '{encrypted_filename}'.")
    return encrypted_filename


def decrypt_file(filename, key):
    """Decrypts a file using Fernet."""
    f = Fernet(key)
    try:
      with open(filename, "rb") as file:
          encrypted_data = file.read()
      decrypted_data = f.decrypt(encrypted_data)
      decrypted_filename = filename[:-10]  # remove ".encrypted"
      with open(decrypted_filename, "wb") as file:
          file.write(decrypted_data)
      print(f"File '{filename}' decrypted successfully as '{decrypted_filename}'.")
      return decrypted_filename
    except Exception as e:
      print(f"Decryption failed: {e}")
      return None


# Upload file
uploaded = files.upload()
filename = list(uploaded.keys())[0]

# Generate encryption key
key = Fernet.generate_key()

# Encrypt the uploaded file
encrypted_file = encrypt_file(filename, key)

# Provide option to download the encrypted file
files.download(encrypted_file)

print("Encryption key:", key.decode()) # print the key so you can decrypt later


Saving Certificate.png to Certificate (1).png
File 'Certificate (1).png' encrypted successfully as 'Certificate (1).png.encrypted'.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Encryption key: pc-l0lNjsyn8LQdq140QKBo_BAmkhwATjyn_bVhdExQ=


# **Decrypt that Certificate**

In [22]:
# prompt: download the original file from encrypted file upload from local computer

from google.colab import files
import os
from cryptography.fernet import Fernet

# Decrypt the file
key = input("Enter the encryption key: ") # Get the key from the user
try:
    key = key.encode() # Encode the key into bytes
    decrypted_file = decrypt_file(filename + ".encrypted", key)

    # Download the decrypted file
    if decrypted_file:
      files.download(decrypted_file)
except Exception as e:
  print(f"An error occurred: {e}")

Enter the encryption key: pc-l0lNjsyn8LQdq140QKBo_BAmkhwATjyn_bVhdExQ=
File 'Certificate (1).png.encrypted' decrypted successfully as 'Certificate (1).png'.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# **WaterMarking File**

In [23]:
from PIL import Image, ImageDraw, ImageFont
from google.colab import files
import os

def watermark_image(input_image_path, output_image_path, watermark_text):
    try:
        # Open the image
        img = Image.open(input_image_path)
        draw = ImageDraw.Draw(img)

        # Choose a font (you might need to adjust the path)
        font_path = "/usr/share/fonts/truetype/liberation/LiberationSerif-Bold.ttf"
        if not os.path.exists(font_path):
            raise FileNotFoundError(f"Font file not found: {font_path}")
        font = ImageFont.truetype(font_path, 36)

        # Calculate text size using the font's getbbox method
        text_bbox = font.getbbox(watermark_text)
        text_width = text_bbox[2] - text_bbox[0]
        text_height = text_bbox[3] - text_bbox[1]

        # Calculate position (bottom right corner)
        margin = 10
        x = img.width - text_width - margin
        y = img.height - text_height - margin

        # Draw the watermark (semi-transparent)
        draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, 128))
        # Adjust color and transparency as needed (For black color=0)

        # Save the watermarked image
        img.save(output_image_path)
        print(f"Watermarked image saved to {output_image_path}")
        return output_image_path
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Upload the image file
uploaded = files.upload()
image_filename = list(uploaded.keys())[0]

# Customize your watermark text
watermark_text = "Priyanka"
watermarked_image_path = watermark_image(image_filename, "watermarked_" + image_filename, watermark_text)

# Download the watermarked image
if watermarked_image_path:
    files.download(watermarked_image_path)

Saving Cust Detail.png to Cust Detail.png
Watermarked image saved to watermarked_Cust Detail.png


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>