<a href="https://colab.research.google.com/github/dyna478/Paz/blob/main/Arima_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
from pmdarima import auto_arima
import numpy as np

df1['Timestamp'] = pd.to_datetime(df['Timestamp'])

df1 = df.sort_values('Timestamp')

df_daily = df1.groupby(pd.Grouper(key='Timestamp', freq='D')).agg({'Amount': 'sum'}).reset_index()

model = auto_arima(
    df_daily['Amount'],
    start_p=0,
    start_q=0,
    max_p=5,
    max_q=5,
    d=None,
    seasonal=False,
    trace=True,
    error_action='ignore',
    suppress_warnings=True,
    stepwise=True
)

print(model.summary())

In [None]:
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(df_daily['Amount'], order=(1, 0, 3))
model_fit = model.fit()
predictions = model_fit.predict(start=0, end=len(df_daily) - 1)
errors = predictions - df_daily['Amount']

mu = np.mean(errors)
sigma = np.std(errors)
threshold_upper = mu + 3 * sigma
threshold_lower = mu - 3 * sigma


In [None]:
df_daily['Anomaly'] = np.where(
    (errors > threshold_upper) | (errors < threshold_lower),
    1,
    0
)

anomalies = df_daily[df_daily['Anomaly'] == 1]
print("Anomalies detected:")
print(anomalies)

In [None]:
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(df_daily['Amount'], order=(1, 0, 3))
model_fit = model.fit()
predictions = model_fit.predict(start=0, end=len(df_daily) - 1)
errors = predictions - df_daily['Amount']
rolling_mean = errors.rolling(window=window_size, min_periods=1).mean()
rolling_std = errors.rolling(window=window_size, min_periods=1).std()
threshold_upper = rolling_mean + 2 * rolling_std
threshold_lower = rolling_mean - 2 * rolling_std

In [None]:
df_daily['Anomaly'] = np.where(
    (errors > threshold_upper) | (errors < threshold_lower),
    1,
    0
)

anomalies = df_daily[df_daily['Anomaly'] == 1]
print("Anomalies detected:")
print(anomalies)
model = ARIMA(df_daily['Amount'], order=(1, 0, 1))
model_fit = model.fit()
predictions = model_fit.predict(start=0, end=len(df_daily) - 1)
errors = predictions - df_daily['Amount']
threshold = 3 * errors.std()
df_daily['Predicted_Anomaly'] = (errors.abs() > threshold).astype(int)


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

y_true = df_daily['Anomaly']
y_pred = df_daily['Predicted_Anomaly']
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f_score = f1_score(y_true, y_pred)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=['Normal', 'Anomaly'],
            yticklabels=['Normal', 'Anomaly'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score

y_probs = model_fit.predict(start=0, end=len(df_daily) - 1, typ='levels')  # Replace with your probability predictions

fpr, tpr, thresholds = roc_curve(y_true, y_probs)
auc_score = roc_auc_score(y_true, y_probs)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.2f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()

In [None]:
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score, f1_score

tscv = TimeSeriesSplit(n_splits=5)

precision_scores, recall_scores, f1_scores = [], [], []


for train_index, test_index in tscv.split(df_daily):
    train, test = df_daily.iloc[train_index], df_daily.iloc[test_index]
    model = ARIMA(train['Amount'], order=(1, 0, 3))
    model_fit = model.fit()
    predictions = model_fit.predict(start=test.index[0], end=test.index[-1])
    errors = predictions - test['Amount']
    threshold = 3 * errors.std()
    test['Predicted_Anomaly'] = (errors.abs() > threshold).astype(int)
    precision = precision_score(test['Anomaly'], test['Predicted_Anomaly'])
    recall = recall_score(test['Anomaly'], test['Predicted_Anomaly'])
    f1 = f1_score(test['Anomaly'], test['Predicted_Anomaly'])
    precision_scores.append(precision)
    recall_scores.append(recall)
    f1_scores.append(f1)

# Print average metrics
print(f"Average Precision: {np.mean(precision_scores):.4f}")
print(f"Average Recall: {np.mean(recall_scores):.4f}")
print(f"Average F1-Score: {np.mean(f1_scores):.4f}")

#Fixing Issues

In [None]:
print("True Labels Distribution:")
print(df_daily['Anomaly'].value_counts())
print("Predicted Labels Distribution:")
print(df_daily['Predicted_Anomaly'].value_counts())
print("Number of NaN values in Predicted_Anomaly:", df_daily['Predicted_Anomaly'].isna().sum())
df_daily['Predicted_Anomaly'] = df_daily['Predicted_Anomaly'].fillna(0).astype(int)
df_daily = df_daily.dropna(subset=['Predicted_Anomaly'])


In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import precision_score, recall_score, f1_score
d = 0
print("Length of errors:", len(errors))
print("Length of df_daily:", len(df_daily))

predictions = model_fit.predict(start=0, end=len(df_daily) - 1)
errors = predictions - df_daily['Amount']
if d > 0:
    predictions = np.concatenate([[np.nan] * d, predictions])
    errors = np.concatenate([[np.nan] * d, errors])
threshold = 2 * errors.std()
df_daily['Predicted_Anomaly'] = (np.abs(errors) > threshold).astype(int)
df_daily['Predicted_Anomaly'] = df_daily['Predicted_Anomaly'].fillna(0).astype(int)
precision = precision_score(y_true, df_daily['Predicted_Anomaly'])
recall = recall_score(y_true, df_daily['Predicted_Anomaly'])
f_score = f1_score(y_true, df_daily['Predicted_Anomaly'])

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")

In [None]:
print("Number of NaN values in errors:", np.isnan(errors).sum())
print("Number of infinite values in errors:", np.isinf(errors).sum())
errors = np.nan_to_num(errors, nan=0.0, posinf=0.0, neginf=0.0)
threshold = 2 * errors.std()
df_daily['Predicted_Anomaly'] = (np.abs(errors) > threshold).astype(int)

In [None]:
threshold = 2 * errors.std()
df_daily['Predicted_Anomaly'] = (np.abs(errors) > threshold).astype(int)
precision = precision_score(y_true, df_daily['Predicted_Anomaly'])
recall = recall_score(y_true, df_daily['Predicted_Anomaly'])
f_score = f1_score(y_true, df_daily['Predicted_Anomaly'])

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot(df_daily['Timestamp'], df_daily['Amount'], label='Actual')
plt.plot(df_daily['Timestamp'], predictions, label='Predicted')
plt.scatter(df_daily[df_daily['Predicted_Anomaly'] == 1]['Timestamp'],
            df_daily[df_daily['Predicted_Anomaly'] == 1]['Amount'],
            color='red', label='Predicted Anomalies')
plt.legend()
plt.title('Actual vs Predicted Values with Anomalies')
plt.xlabel('Timestamp')
plt.ylabel('Amount')
plt.show()

In [None]:
from sklearn.model_selection import train_test_split
train, test = train_test_split(df_daily, test_size=0.2, shuffle=False)

model = ARIMA(train['Amount'], order=(1, 0, 3))
model_fit = model.fit()
predictions = model_fit.predict(start=test.index[0], end=test.index[-1])
errors = predictions - test['Amount']

threshold = 2 * errors.std()
test['Predicted_Anomaly'] = (errors.abs() > threshold).astype(int)
precision = precision_score(test['Anomaly'], test['Predicted_Anomaly'])
recall = recall_score(test['Anomaly'], test['Predicted_Anomaly'])
f_score = f1_score(test['Anomaly'], test['Predicted_Anomaly'])

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")

In [None]:
print("True Labels Distribution:")
print(df_daily['Anomaly'].value_counts())
print("Predicted Labels Distribution:")
print(df_daily['Predicted_Anomaly'].value_counts())
threshold = 2 * errors.std()
df_daily['Predicted_Anomaly'] = (np.abs(errors) > threshold).astype(int)
precision = precision_score(y_true, df_daily['Predicted_Anomaly'])
recall = recall_score(y_true, df_daily['Predicted_Anomaly'])
f_score = f1_score(y_true, df_daily['Predicted_Anomaly'])

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot(df_daily['Timestamp'], df_daily['Amount'], label='Actual')
plt.plot(df_daily['Timestamp'], predictions, label='Predicted')
plt.scatter(df_daily[df_daily['Predicted_Anomaly'] == 1]['Timestamp'],
            df_daily[df_daily['Predicted_Anomaly'] == 1]['Amount'],
            color='red', label='Predicted Anomalies')
plt.legend()
plt.title('Actual vs Predicted Values with Anomalies')
plt.xlabel('Timestamp')
plt.ylabel('Amount')
plt.show()
from sklearn.model_selection import train_test_split
train, test = train_test_split(df_daily, test_size=0.2, shuffle=False)

model = ARIMA(train['Amount'], order=(1, 0, 1))
model_fit = model.fit()
predictions = model_fit.predict(start=test.index[0], end=test.index[-1])
errors = predictions - test['Amount']
threshold = 2 * errors.std()
test['Predicted_Anomaly'] = (np.abs(errors) > threshold).astype(int)
precision = precision_score(test['Anomaly'], test['Predicted_Anomaly'])
recall = recall_score(test['Anomaly'], test['Predicted_Anomaly'])
f_score = f1_score(test['Anomaly'], test['Predicted_Anomaly'])

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F-Score: {f_score:.4f}")