In [None]:
# Import python packages
import streamlit as st
import pandas as pd
from snowflake.snowpark.functions import col, min as sf_min, max as sf_max, datediff, sql_expr, to_timestamp_ntz
from datetime import datetime
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
def filter_ips_with_long_activity(session, source_table: str, target_table: str, stage_path: str):
    # Load the data
    df = session.table(source_table)

    df_ts = (
        df.with_column("ts_converted", to_timestamp_ntz(col('"timestamp"') / 1e9)))
    
    df_grouped = (
        df_ts.group_by(col('"ip"'))
        .agg(
            sf_min("ts_converted").alias("min_ts"),
            sf_max("ts_converted").alias("max_ts")
        )
        .with_column("diff_days", datediff("day", col("min_ts"), col("max_ts")))
        .filter(col("diff_days") > 30)
        .select(col('"ip"'), "min_ts", "max_ts", "diff_days")
    )

    # Write result to Snowflake table
    df_grouped.write.mode("overwrite").save_as_table(target_table)

    # Unload to external stage as Parquet
    session.sql(f"""
        COPY INTO '{stage_path}'
        FROM {target_table}
        PARTITION BY (TO_CHAR("MAX_TS",'MM'))
        FILE_FORMAT = (TYPE = PARQUET)
        --OVERWRITE = TRUE;
         """).collect()

    return df_grouped

In [None]:
SOURCE_TABLE = "my_events"
TARGET_TABLE = "output_results"
STAGE_PATH = "@ext_stage"

result_df = filter_ips_with_long_activity(session, SOURCE_TABLE, TARGET_TABLE, STAGE_PATH)

result_df.show()

In [None]:
st.line_chart(result_df, x="MIN_TS", y="DIFF_DAYS")