In [None]:
create database if not exists IP_ALERT;
create schema if not exists IP_ALERT.LOGINS;
create table if not exists IP_ALERT.LOGINS.USER_EMAIL_SETTINGS (EMAIL STRING);

In [None]:
create view if not exists ip_alert.logins.last_login_with_check as (
WITH last_login_per_user AS (
  SELECT
    USER_NAME,
    CLIENT_IP,
    EVENT_TIMESTAMP,
    ROW_NUMBER() OVER (PARTITION BY USER_NAME ORDER BY EVENT_TIMESTAMP DESC) AS rn
  FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
),
all_logins_excluding_last AS (
  SELECT
    USER_NAME,
    CLIENT_IP,
    EVENT_TIMESTAMP
  FROM last_login_per_user
  WHERE rn > 1 AND event_timestamp < dateadd('minute', -15, current_timestamp())
),
last_login_with_check AS (
  SELECT
    l.USER_NAME,
    l.CLIENT_IP AS LAST_LOGIN_IP,
    EVENT_TIMESTAMP as LAST_LOGIN_TIMESTAMP,
    CASE
      WHEN EXISTS (
        SELECT 1
        FROM all_logins_excluding_last a
        WHERE a.USER_NAME = l.USER_NAME AND a.CLIENT_IP = l.CLIENT_IP
      ) THEN 'YES'
      ELSE 'NO'
    END AS LAST_IP_USED_BEFORE
  FROM last_login_per_user l
  WHERE l.rn = 1 AND USER_NAME != 'SNOWFLAKE'
)
SELECT * FROM last_login_with_check);

In [None]:
CREATE OR REPLACE PROCEDURE ip_alert.logins.detect_new_ip_logins()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'run'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
AS
$$
def run(session):
    df = session.table("ip_alert.logins.last_login_with_check") \
                .filter("LAST_IP_USED_BEFORE = 'NO'") \
                .to_pandas()

    if df.empty:
        return "No new IPs flagged."

    email_row = session.sql("SELECT email FROM ip_alert.logins.user_email_settings LIMIT 1").collect()
    if email_row:
        email = email_row[0]["EMAIL"]
        body = "\n".join(
                f"{row['USER_NAME']:<12} {row['LAST_LOGIN_IP']:<15} {row['LAST_LOGIN_TIMESTAMP']}"
                for _, row in df.iterrows())
        q = "CALL SYSTEM$SEND_EMAIL('email_alerts_int', '"+email+"', 'New Login IP detected', '"+body+"')"
        session.sql(q).collect()
    

    return "new IPs flagged."
$$;

In [None]:
import streamlit as st
import snowflake.snowpark as snowpark

session = snowpark.Session.builder.getOrCreate()

st.title("🔐 Login IP Monitor")

email = st.text_input("Your email for alerts")
if st.button("Save Email"):
    session.sql("DELETE FROM ip_alert.logins.user_email_settings").collect()
    session.sql(f"INSERT INTO ip_alert.logins.user_email_settings VALUES ('{email}')").collect()
    q = f"""CREATE OR REPLACE NOTIFICATION INTEGRATION email_alerts_int
           TYPE = EMAIL
           ENABLED = TRUE
           ALLOWED_RECIPIENTS = ('{email}');
           """
    session.sql(q).collect()
    st.success("Email saved!")

if st.button("Run Now"):
    result = session.sql("CALL ip_alert.logins.detect_new_ip_logins()").collect()
    st.info(result[0][0])
    df = session.table("ip_alert.logins.last_login_with_check") \
                .filter("LAST_IP_USED_BEFORE = 'NO'") \
                .to_pandas()
    st.dataframe(df, use_container_width=True)

if st.button('Create Task'):
    num_of_mins = st.text_input("Every X minutes:")
    q = f"""CREATE OR REPLACE TASK ip_alert.logins.check_new_ip_logins_task
            SCHEDULE = '{num_of_mins} MINUTE'
            AS
            CALL ip_alert.logins.detect_new_ip_logins();
            """
    session.sql(q).collect()
    q = 'ALTER TASK ip_alert.logins.check_new_ip_logins_task RESUME'
    session.sql(q).collect()
    st.success('task created')
    # q = 'describe task ip_alert.logins.check_new_ip_logins_task'
    # st.success(session.sql(q).collect())
        