# Getting Started with Snowflake Openflow Kafka Connector

Companion code for the [Snowflake Openflow Kafka Connector Quickstart](https://quickstarts.snowflake.com/guide/getting_started_with_openflow_kafka_connector/index.html).

## Overview

This quickstart demonstrates how to build a real-time streaming pipeline from Apache Kafka to Snowflake using the Openflow Kafka Connector. You'll learn how to:

- Set up a Kafka topic for application log streaming
- Configure Snowflake objects (database, schema, tables, network rules)
- Deploy Openflow SPCS runtime
- Configure the Kafka connector in Openflow Canvas
- Stream real-time logs from Kafka to Snowflake
- Perform powerful SQL analytics on streaming log data


## Database Setup
The following cell sets up all required Snowflake objects for the Kafka log streaming demo. Run this BEFORE configuring the Openflow connector

In [None]:

USE ROLE ACCOUNTADMIN;

-- Step 1: Create Role and Database
-- ----------------------------------------------------------------------------

-- Create runtime role (reuse if coming from SPCS quickstart)
CREATE ROLE IF NOT EXISTS QUICKSTART_ROLE;

-- Create database for Kafka streaming data
CREATE DATABASE IF NOT EXISTS QUICKSTART_KAFKA_CONNECTOR_DB;

-- Create warehouse for data processing and queries
CREATE WAREHOUSE IF NOT EXISTS QUICKSTART_KAFKA_CONNECTOR_WH
  WAREHOUSE_SIZE = XSMALL
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE;

-- Grant privileges to runtime role
GRANT OWNERSHIP ON DATABASE QUICKSTART_KAFKA_CONNECTOR_DB TO ROLE QUICKSTART_ROLE;
GRANT OWNERSHIP ON SCHEMA QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC TO ROLE QUICKSTART_ROLE;
GRANT USAGE ON WAREHOUSE QUICKSTART_KAFKA_CONNECTOR_WH TO ROLE QUICKSTART_ROLE;

-- Grant runtime role to Openflow admin
GRANT ROLE QUICKSTART_ROLE TO ROLE OPENFLOW_ADMIN;

## Create Network Rule

Create network rule to allow Openflow runtime to access the Kafka clusters

> **IMPORTANT**: Replace with your Kafka broker endpoint(s)
> This quickstart works with any Kafka service:
> - GCP Managed Kafka:    `34.123.45.67:9092(public IP)`
> - AWS MSK:              `b-1.mycluster.kafka.us-east-1.amazonaws.com:9092`
> - Confluent Cloud:      `pkc-xxxxx.us-east-1.aws.confluent.cloud:9092`
> - Azure Event Hubs:     `myeventhub.servicebus.windows.net:9093`
> - Self-hosted:          `kafka.mycompany.com:9092`
>
> **NOTE**: Ensure network connectivity and firewall rules allow Snowflake access
> For multiple brokers (recommended), include all broker endpoints in VALUE_LIST

In [None]:
USE ROLE QUICKSTART_ROLE;
USE DATABASE QUICKSTART_KAFKA_CONNECTOR_DB;

-- Create schema for network rules
CREATE SCHEMA IF NOT EXISTS QUICKSTART_KAFKA_CONNECTOR_DB.NETWORKS;

CREATE OR REPLACE NETWORK RULE QUICKSTART_KAFKA_CONNECTOR_DB.NETWORKS.kafka_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = (
     -- use wild card for similar domin patterns or
    -- add all the bootstrap servers
);

In [None]:
DESC NETWORK RULE QUICKSTART_KAFKA_CONNECTOR_DB.NETWORKS.kafka_network_rule;

## Create External Access Integration(EAI)

Create an EAI to that can be associated with the Openflow runtime

In [None]:
USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION quickstart_kafka_connector_access
  ALLOWED_NETWORK_RULES = (
    QUICKSTART_KAFKA_CONNECTOR_DB.NETWORKS.kafka_network_rule
  )
  ENABLED = TRUE
  COMMENT = 'Openflow SPCS runtime access for Kafka connector';

-- Grant usage to runtime role
GRANT USAGE ON INTEGRATION quickstart_kafka_connector_access TO ROLE OPENFLOW_ADMIN;
-- Grant usage to runtime role
GRANT USAGE ON INTEGRATION quickstart_kafka_connector_access TO ROLE QUICKSTART_ROLE;

In [None]:
DESCRIBE EXTERNAL ACCESS INTEGRATION quickstart_kafka_connector_access;

## Create Runtime
Create quickstart runtime as described in the Quickstart guide. Once the connector is configured and all running, you can use the other cells to verify the setup

## Verify Ingestion

Lets verify the ingestion that happens form our Kafka topic's to `"APPLICATION-LOGS"` tables

>**IMPORTANT**:
>
> Ensure that you have triggered Apache Kafka stream ingestion following instructions in the [Chapter 9](http://quickstarts.snowflake.com/guide/getting-started-with-openflow-kafka-connector/index.html?index=..%2F..index#9)

In [None]:
USE ROLE QUICKSTART_ROLE;
USE DATABASE QUICKSTART_KAFKA_CONNECTOR_DB;
USE SCHEMA PUBLIC;
USE WAREHOUSE QUICKSTART_KAFKA_CONNECTOR_WH;

In [None]:
SHOW TABLES;

### Verify Initial Schema
Lets verify the table that was created and data thats ingested

Expected: ~50+ records, the messages that were sent to `application-logs` topic

In [None]:
SELECT COUNT(*) as TOTAL_RECORDS FROM "APPLICATION-LOGS";

Let us inspect the initial Capture Initial Schema, it should have *11* columns.

In [None]:
SELECT 
  COLUMN_NAME, 
  DATA_TYPE
FROM QUICKSTART_KAFKA_CONNECTOR_DB.INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_NAME ILIKE 'APPLICATION-LOGS'
ORDER BY COLUMN_NAME;

In [None]:
# Convert SQL result to pandas DataFrame and extract column names
column_names_df = base_schema.to_pandas()['COLUMN_NAME'].sort_values()

# Display the sorted column names
print(f"Total Number of Columns:{len(column_names_df)}")
print(column_names_df.to_string(index=False))

#### Query Base Schema Data

In [None]:
SELECT 
  TIMESTAMP,
  LEVEL,
  SERVICE,
  MESSAGE,
  STATUS_CODE,
  DURATION_MS
FROM "APPLICATION-LOGS"
ORDER BY TIMESTAMP DESC
LIMIT 10;


In [None]:
SELECT 
  LEVEL,
  COUNT(*) as LOG_COUNT
FROM "APPLICATION-LOGS"
GROUP BY LEVEL
ORDER BY LOG_COUNT DESC;

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  MESSAGE,
  ERROR,
  STATUS_CODE
FROM "APPLICATION-LOGS"
WHERE ERROR IS NOT NULL
ORDER BY TIMESTAMP DESC
LIMIT 10;

In [None]:
SELECT 
  SERVICE,
  COUNT(*) as TOTAL_LOGS,
  SUM(CASE WHEN LEVEL = 'ERROR' THEN 1 ELSE 0 END) as ERROR_COUNT,
  SUM(CASE WHEN LEVEL = 'WARN' THEN 1 ELSE 0 END) as WARN_COUNT,
  ROUND(AVG(DURATION_MS), 2) as AVG_DURATION_MS
FROM "APPLICATION-LOGS"
GROUP BY SERVICE
ORDER BY ERROR_COUNT DESC;

### Verify Evolved Schema
Lets verify how the `"APPLICATION-LOGS"` table evolves when we get messges with extra columns

>**IMPORTANT**:
>
> Ensure that you have triggered Apache Kafka stream ingestion following instructions in the [Chapter 10](http://quickstarts.snowflake.com/guide/guide/getting-started-with-openflow-kafka-connector/index.html?index=..%2F..index#10)

Compare Schemas - **NEW** columns appeared, the table should have `11(base) + 26(evolved) = 37columns`.

In [None]:
SELECT 
  COLUMN_NAME, 
  DATA_TYPE
FROM QUICKSTART_KAFKA_CONNECTOR_DB.INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_NAME ILIKE 'APPLICATION-LOGS'
ORDER BY COLUMN_NAME;

In [None]:
# Convert SQL result to pandas DataFrame and extract column names
column_names_df = evolved_columns.to_pandas()['COLUMN_NAME'].sort_values()

# Display the sorted column names
print(f"Total Number of Columns:{len(column_names_df)}")
print(column_names_df.to_string(index=False))


#### Query Evolved Schema Fields

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  MESSAGE,
  REGION,
  TRACE_ID
FROM "APPLICATION-LOGS"
WHERE REGION IS NOT NULL
ORDER BY TIMESTAMP DESC
LIMIT 10;

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  USER_ID,
  AUTH_METHOD,
  PROVIDER,
  REGION
FROM "APPLICATION-LOGS"
WHERE AUTH_METHOD IS NOT NULL
ORDER BY TIMESTAMP DESC;

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  AMOUNT,
  CURRENCY,
  PAYMENT_METHOD,
  USER_ID,
  REGION
FROM "APPLICATION-LOGS"
WHERE CURRENCY IS NOT NULL
ORDER BY TIMESTAMP DESC;

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  MESSAGE,
  FILE_SIZE_BYTES,
  CONTENT_TYPE,
  REGION
FROM "APPLICATION-LOGS"
WHERE FILE_SIZE_BYTES IS NOT NULL
ORDER BY TIMESTAMP DESC;

In [None]:
SELECT 
  TIMESTAMP,
  SERVICE,
  MESSAGE,
  MEMORY_PERCENT,
  AVAILABLE_MB,
  DISK_USAGE_PERCENT,
  AVAILABLE_GB,
  REGION
FROM "APPLICATION-LOGS"
WHERE MEMORY_PERCENT IS NOT NULL 
   OR DISK_USAGE_PERCENT IS NOT NULL
ORDER BY TIMESTAMP DESC;

## Analytics on Log Data
Now that logs are streaming into Snowflake, let's perform powerful analytics that would be difficult or expensive in traditional log platforms.

In [None]:
# Import required packages
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

# Execute SQL query
sql = """
SELECT 
    LEVEL as LOG_LEVEL,
    COUNT(*) as EVENT_COUNT,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as PERCENTAGE
FROM "APPLICATION-LOGS"
GROUP BY LOG_LEVEL
ORDER BY EVENT_COUNT DESC
"""

# Convert results to DataFrame
df = session.sql(sql).to_pandas()

# Create visualization
st.header("Log Level Distribution")

# Create metrics for total events
total_events = df['EVENT_COUNT'].sum()
st.metric("Total Log Events", f"{total_events:,}")

# Create bar chart
chart = alt.Chart(df).mark_bar().encode(
    x=alt.X('LOG_LEVEL:N', title='Log Level'),
    y=alt.Y('EVENT_COUNT:Q', title='Number of Events'),
    color=alt.Color('LOG_LEVEL:N', legend=None),
    tooltip=[
        alt.Tooltip('LOG_LEVEL:N', title='Level'),
        alt.Tooltip('EVENT_COUNT:Q', title='Count'),
        alt.Tooltip('PERCENTAGE:Q', title='Percentage', format='.2f')
    ]
).properties(
    width=600,
    height=400
)

st.altair_chart(chart, use_container_width=True)

# Display data table
st.dataframe(df, hide_index=True)


In [None]:
# Import required packages
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

# Execute SQL query
sql = """
SELECT 
    MESSAGE as ERROR_MESSAGE,
    SERVICE,
    COUNT(*) as ERROR_COUNT
FROM "APPLICATION-LOGS"
WHERE LEVEL ILIKE '%ERROR%'
GROUP BY ERROR_MESSAGE, SERVICE
ORDER BY ERROR_COUNT DESC
LIMIT 10
"""

# Convert results to DataFrame
df = session.sql(sql).to_pandas()

# Create visualization
st.header("Top Error Messages by Service")

# Create summary metrics
total_errors = df['ERROR_COUNT'].sum()
unique_services = df['SERVICE'].nunique()
st.metric("Total Errors", total_errors)
st.metric("Affected Services", unique_services)

# Create bar chart
chart = alt.Chart(df).mark_bar().encode(
    y=alt.Y('ERROR_MESSAGE:N', 
            sort='-x',
            title='Error Message',
            axis=alt.Axis(labelLimit=250)),
    x=alt.X('ERROR_COUNT:Q', title='Number of Occurrences'),
    color=alt.Color('SERVICE:N', title='Service'),
    tooltip=[
        alt.Tooltip('ERROR_MESSAGE:N', title='Error'),
        alt.Tooltip('SERVICE:N', title='Service'),
        alt.Tooltip('ERROR_COUNT:Q', title='Count')
    ]
).properties(
    height=400
)

st.altair_chart(chart, use_container_width=True)

# Display detailed data
with st.expander("View Detailed Data"):
    st.dataframe(df, hide_index=True)


In [None]:
# Import required packages
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

# Execute SQL query
sql = """
SELECT 
    SERVICE as SERVICE_NAME,
    COUNT(*) as TOTAL_EVENTS,
    SUM(CASE WHEN LEVEL ILIKE '%ERROR%' THEN 1 ELSE 0 END) as ERROR_COUNT,
    SUM(CASE WHEN LEVEL ILIKE '%WARN%' THEN 1 ELSE 0 END) as WARN_COUNT,
    ROUND(ERROR_COUNT * 100.0 / NULLIF(TOTAL_EVENTS, 0), 2) as ERROR_RATE_PCT
FROM "APPLICATION-LOGS"
GROUP BY SERVICE_NAME
ORDER BY ERROR_RATE_PCT DESC
"""

# Convert results to DataFrame
df = session.sql(sql).to_pandas()

# Create visualization
st.header("Service Health Overview")

# Create summary metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
    st.metric("Total Services", len(df))
with col2:
    st.metric("Total Events", df['TOTAL_EVENTS'].sum())
with col3:
    st.metric("Total Errors", df['ERROR_COUNT'].sum())
with col4:
    st.metric("Total Warnings", df['WARN_COUNT'].sum())

# Create main chart
chart = alt.Chart(df).mark_bar().encode(
    x=alt.X('SERVICE_NAME:N', title='Service'),
    y=alt.Y('TOTAL_EVENTS:Q', title='Number of Events'),
    color=alt.Color('ERROR_RATE_PCT:Q', 
                   scale=alt.Scale(scheme='redyellowgreen', reverse=True),
                   title='Error Rate (%)'),
    tooltip=[
        alt.Tooltip('SERVICE_NAME:N', title='Service'),
        alt.Tooltip('TOTAL_EVENTS:Q', title='Total Events'),
        alt.Tooltip('ERROR_COUNT:Q', title='Errors'),
        alt.Tooltip('WARN_COUNT:Q', title='Warnings'),
        alt.Tooltip('ERROR_RATE_PCT:Q', title='Error Rate %', format='.2f')
    ]
).properties(
    height=400
)

st.altair_chart(chart, use_container_width=True)

# Display detailed data
with st.expander("View Detailed Data"):
    st.dataframe(df, hide_index=True)


### Time-Series Analysis

In [None]:
# Import required packages
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

# Execute SQL query
sql = """
SELECT 
    DATE_TRUNC('minute', TIMESTAMP::TIMESTAMP) as TIME_BUCKET,
    COUNT(*) as EVENTS_PER_MINUTE,
    SUM(CASE WHEN LEVEL ILIKE '%ERROR%' THEN 1 ELSE 0 END) as ERRORS_PER_MINUTE
FROM "APPLICATION-LOGS"
WHERE TIMESTAMP::TIMESTAMP >= DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY TIME_BUCKET
ORDER BY TIME_BUCKET DESC
"""

# Convert results to DataFrame
df = session.sql(sql).to_pandas()

# Create visualization
st.header("Log Events Time Series Analysis")

# Create summary metrics
col1, col2, col3 = st.columns(3)
with col1:
    st.metric("Total Events", df['EVENTS_PER_MINUTE'].sum())
with col2:
    st.metric("Total Errors", df['ERRORS_PER_MINUTE'].sum())
with col3:
    error_rate = round(df['ERRORS_PER_MINUTE'].sum() * 100 / df['EVENTS_PER_MINUTE'].sum(), 2) if df['EVENTS_PER_MINUTE'].sum() > 0 else 0
    st.metric("Error Rate", f"{error_rate}%")

# Create time series chart
base = alt.Chart(df).encode(
    x=alt.X('TIME_BUCKET:T', title='Time')
)

events_line = base.mark_line(color='blue').encode(
    y=alt.Y('EVENTS_PER_MINUTE:Q', title='Events per Minute'),
    tooltip=[
        alt.Tooltip('TIME_BUCKET:T', title='Time'),
        alt.Tooltip('EVENTS_PER_MINUTE:Q', title='Events'),
        alt.Tooltip('ERRORS_PER_MINUTE:Q', title='Errors')
    ]
)

errors_line = base.mark_line(color='red').encode(
    y=alt.Y('ERRORS_PER_MINUTE:Q', title='Errors per Minute'),
    tooltip=[
        alt.Tooltip('TIME_BUCKET:T', title='Time'),
        alt.Tooltip('EVENTS_PER_MINUTE:Q', title='Events'),
        alt.Tooltip('ERRORS_PER_MINUTE:Q', title='Errors')
    ]
)

chart = alt.layer(events_line, errors_line).resolve_scale(
    y='independent'
).properties(
    height=400
)

st.altair_chart(chart, use_container_width=True)

# Display detailed data
with st.expander("View Detailed Data"):
    st.dataframe(df, hide_index=True)


In [None]:
# Import required packages
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

# Execute SQL query
sql = """
SELECT 
    HOUR(TIMESTAMP::TIMESTAMP) as HOUR_OF_DAY,
    COUNT(*) as EVENT_COUNT,
    AVG(DURATION_MS) as AVG_DURATION_MS
FROM "APPLICATION-LOGS"
WHERE TIMESTAMP::TIMESTAMP >= DATEADD('day', -1, CURRENT_TIMESTAMP())
GROUP BY HOUR_OF_DAY
ORDER BY HOUR_OF_DAY
"""

# Convert results to DataFrame
df = session.sql(sql).to_pandas()

# Create visualization
st.header("Hourly Event Distribution")

# Create summary metrics
col1, col2 = st.columns(2)
with col1:
    st.metric("Total Events", df['EVENT_COUNT'].sum())
with col2:
    avg_duration = round(df['AVG_DURATION_MS'].mean(), 2)
    st.metric("Average Duration (ms)", f"{avg_duration:.2f}")

# Create dual-axis chart
base = alt.Chart(df).encode(
    x=alt.X('HOUR_OF_DAY:Q', 
            axis=alt.Axis(title='Hour of Day', tickCount=24),
            scale=alt.Scale(domain=[0, 23]))
)

# Event count bars
bars = base.mark_bar().encode(
    y=alt.Y('EVENT_COUNT:Q', title='Event Count'),
    color=alt.value('#5276A7'),
    tooltip=[
        alt.Tooltip('HOUR_OF_DAY:Q', title='Hour'),
        alt.Tooltip('EVENT_COUNT:Q', title='Events'),
        alt.Tooltip('AVG_DURATION_MS:Q', title='Avg Duration (ms)', format='.2f')
    ]
)

# Average duration line
line = base.mark_line(color='red').encode(
    y=alt.Y('AVG_DURATION_MS:Q', title='Average Duration (ms)')
)

# Combine charts
chart = alt.layer(bars, line).resolve_scale(
    y='independent'
).properties(
    height=400
)

st.altair_chart(chart, use_container_width=True)

# Display detailed data
with st.expander("View Detailed Data"):
    st.dataframe(df, hide_index=True)


## Performance Analytics

In [None]:
SELECT
  SERVICE,
  REQUEST_ID,
  DURATION_MS,
  MESSAGE,
  TIMESTAMP AS REQUEST_TIME,
  LEVEL,
  STATUS_CODE
FROM
  QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC."APPLICATION-LOGS"
WHERE
  NOT DURATION_MS IS NULL
  AND DURATION_MS > 0
ORDER BY
  DURATION_MS DESC
LIMIT
  20;

In [None]:
SELECT
  SERVICE,
  COUNT(*) as TOTAL_REQUESTS,
  ROUND(AVG(DURATION_MS), 2) as AVG_DURATION_MS,
  MIN(DURATION_MS) as MIN_DURATION_MS,
  MAX(DURATION_MS) as MAX_DURATION_MS,
  SUM(
    CASE
      WHEN STATUS_CODE >= 400 THEN 1
      ELSE 0
    END
  ) as ERROR_COUNT,
  ROUND(
    AVG(
      CASE
        WHEN STATUS_CODE >= 400 THEN 1
        ELSE 0
      END
    ) * 100,
    2
  ) as ERROR_RATE_PCT
FROM
  QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC."APPLICATION-LOGS"
WHERE
  DURATION_MS IS NOT NULL
  AND DURATION_MS > 0
GROUP BY
  SERVICE
ORDER BY
  AVG_DURATION_MS DESC;

In [None]:
WITH ERROR_WINDOWS AS (
  SELECT
    SERVICE,
    CAST(TIMESTAMP AS TIMESTAMP) AS ERROR_TIME,
    REQUEST_ID,
    STATUS_CODE,
    DURATION_MS,
    DATE_TRUNC ('MINUTE', CAST(TIMESTAMP AS TIMESTAMP)) AS TIME_WINDOW
    /* Create 5-minute time windows */
  FROM
    QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC."APPLICATION-LOGS"
  WHERE
    LEVEL ILIKE '%ERROR%'
),
SERVICE_PAIRS AS (
  SELECT
    a.SERVICE AS SERVICE_A,
    b.SERVICE AS SERVICE_B,
    a.TIME_WINDOW,
    COUNT(DISTINCT a.REQUEST_ID) AS ERRORS_SERVICE_A,
    COUNT(DISTINCT b.REQUEST_ID) AS ERRORS_SERVICE_B,
    AVG(a.DURATION_MS) AS AVG_DURATION_A,
    AVG(b.DURATION_MS) AS AVG_DURATION_B
  FROM
    ERROR_WINDOWS AS a
    JOIN ERROR_WINDOWS AS b ON a.TIME_WINDOW = b.TIME_WINDOW
    AND a.SERVICE < b.SERVICE
    /* Avoid duplicate pairs */
  GROUP BY
    a.SERVICE,
    b.SERVICE,
    a.TIME_WINDOW
  HAVING
    ERRORS_SERVICE_A > 0
    AND ERRORS_SERVICE_B > 0
)
SELECT
  SERVICE_A,
  SERVICE_B,
  COUNT(*) AS CONCURRENT_ERROR_WINDOWS,
  SUM(ERRORS_SERVICE_A) AS TOTAL_ERRORS_A,
  SUM(ERRORS_SERVICE_B) AS TOTAL_ERRORS_B,
  ROUND(AVG(AVG_DURATION_A), 2) AS AVG_DURATION_A,
  ROUND(AVG(AVG_DURATION_B), 2) AS AVG_DURATION_B
FROM
  SERVICE_PAIRS
GROUP BY
  SERVICE_A,
  SERVICE_B
HAVING
  CONCURRENT_ERROR_WINDOWS > 1
ORDER BY
  CONCURRENT_ERROR_WINDOWS DESC,
  TOTAL_ERRORS_A + TOTAL_ERRORS_B DESC;

## Using Cortex Search
Enable natural language search over your log messages using Snowflake Cortex Search. This allows you to query logs using plain English through Snowflake Intelligence or programmatically.

In [None]:
USE ROLE QUICKSTART_ROLE;
USE DATABASE QUICKSTART_KAFKA_CONNECTOR_DB;
USE SCHEMA PUBLIC;
USE WAREHOUSE QUICKSTART_KAFKA_CONNECTOR_WH;

CREATE OR REPLACE CORTEX SEARCH SERVICE application_logs_search
  ON MESSAGE
  ATTRIBUTES LEVEL, SERVICE, ERROR, STATUS_CODE, MEMORY_PERCENT, DISK_USAGE_PERCENT, REGION
  WAREHOUSE = QUICKSTART_KAFKA_CONNECTOR_WH
  TARGET_LAG = '1 minute'
  AS (
    SELECT 
      MESSAGE,
      LEVEL,
      SERVICE,
      ERROR,
      STATUS_CODE,
      TIMESTAMP,
      REQUEST_ID,
      HOST,
      USER_ID,
      MEMORY_PERCENT::NUMBER as MEMORY_PERCENT,
      AVAILABLE_MB::NUMBER as AVAILABLE_MB,
      DISK_USAGE_PERCENT::NUMBER as DISK_USAGE_PERCENT,
      AVAILABLE_GB::NUMBER as AVAILABLE_GB,
      REGION
    FROM "APPLICATION-LOGS"
  );

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "authentication failed",
      "columns": ["MESSAGE", "LEVEL", "SERVICE", "TIMESTAMP", "ERROR"],
      "filter": {"@eq": {"LEVEL": "ERROR"}},
      "limit": 10
    }'
  ) as authentication_errors;

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "payment declined timeout",
      "columns": ["MESSAGE", "SERVICE", "STATUS_CODE", "TIMESTAMP", "REQUEST_ID"],
      "filter": {"@eq": {"SERVICE": "payment-service"}},
      "limit": 10
    }'
  );

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "database connection timeout",
      "columns": ["MESSAGE", "LEVEL", "SERVICE", "HOST", "TIMESTAMP"],
      "filter": {"@eq": {"LEVEL": "ERROR"}},
      "limit": 10
    }'
  );

### Multiple Filters

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "failed transaction",
      "columns": ["MESSAGE", "LEVEL", "SERVICE", "ERROR", "STATUS_CODE", "TIMESTAMP"],
      "filter": {
        "@and": [
          {"@eq": {"LEVEL": "ERROR"}},
          {"@or": [
            {"@eq": {"SERVICE": "payment-service"}},
            {"@eq": {"SERVICE": "auth-service"}}
          ]}
        ]
      },
      "limit": 20
    }'
  );

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "memory usage warning system resource",
      "columns": ["MESSAGE", "LEVEL", "SERVICE", "MEMORY_PERCENT", "AVAILABLE_MB", "REGION", "TIMESTAMP"],
      "filter": {
        "@and": [
          {"@eq": {"LEVEL": "WARN"}},
          {"@gte": {"MEMORY_PERCENT": 80}}
        ]
      },
      "limit": 15
    }'
  );

In [None]:
SELECT
  SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
    'QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search',
    '{
      "query": "disk space running low storage",
      "columns": ["MESSAGE", "SERVICE", "DISK_USAGE_PERCENT", "AVAILABLE_GB", "REGION", "HOST", "TIMESTAMP"],
      "filter": {
        "@and": [
          {"@gte": {"DISK_USAGE_PERCENT": 85}},
          {"@eq": {"REGION": "us-west-2"}}
        ]
      },
      "limit": 10
    }'
  );

## Cleanup
Clean up all Snowflake resources.

> **IMPORTANT**:
> 
> Stop the Openflow Processors as described in [Chapter 14](https://quickstarts.snowflake.com/guide/getting-started-with-openflow-kafka-connector/index.html?index=..%2F..index#14)
> Delete the Kafka Topic and Cluster if needed

In [None]:
-- Use ACCOUNTADMIN to drop objects
USE ROLE ACCOUNTADMIN;

-- Drop Cortex Search service (if created)
DROP CORTEX SEARCH SERVICE IF EXISTS QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.application_logs_search;

-- Drop alerts (if created)
DROP ALERT IF EXISTS QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.HIGH_ERROR_RATE_ALERT;
DROP ALERT IF EXISTS QUICKSTART_KAFKA_CONNECTOR_DB.PUBLIC.CRITICAL_SERVICE_ALERT;

-- Drop database (this removes all tables and data)
DROP DATABASE IF EXISTS QUICKSTART_KAFKA_CONNECTOR_DB;

-- Drop warehouse
DROP WAREHOUSE IF EXISTS QUICKSTART_KAFKA_CONNECTOR_WH;

-- Drop external access integration
DROP EXTERNAL ACCESS INTEGRATION IF EXISTS quickstart_kafka_connector_access;

-- Note: We don't drop QUICKSTART_ROLE as it may be used by other quickstarts