In [0]:
from azure.kusto.data import KustoConnectionStringBuilder, KustoClient

# ADX Cluster & Database
cluster = "<uri>"
database = "iot-table"

# Use your existing App Registration details
client_id = "<CLIENT_ID>"       # Application (Client) ID
client_secret = "<SECRET>"  # Client Secret Value
tenant_id = "<TENANT ID>"        # Directory (Tenant) ID

# Authenticate with ADX using Service Principal
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
    cluster, client_id, client_secret, tenant_id
)
client = KustoClient(kcsb)

# Test Query to Check Access
response = client.execute(database, ".show tables")
print(response)


<azure.kusto.data.response.KustoResponseDataSetV1 object at 0x7f88045d79d0>


In [0]:
query = "iot_table_new1 | take 10"  # Fetch 10 latest sensor records

# Execute query
response = client.execute(database, query)

# Convert response to Pandas DataFrame
rows = response.primary_results[0]
df = pd.DataFrame(rows.raw_rows, columns=[col.column_name for col in rows.columns])

# Show first few rows
print(df.head())


            deviceId  ...             received_time
0  device10000181022  ...  2025-02-27T10:49:26.346Z
1  device10000181022  ...  2025-02-27T10:49:26.471Z
2  device10000181022  ...  2025-02-27T10:49:26.471Z
3  device10000181022  ...  2025-02-27T10:49:26.471Z
4  device10000181022  ...  2025-02-27T10:49:27.346Z

[5 rows x 6 columns]


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

# Initialize Spark Session
spark = SparkSession.builder.appName("ADXtoDatabricks").getOrCreate()

# Convert Pandas to PySpark DataFrame
df_spark = spark.createDataFrame(df)

# Define a window to calculate moving average for last 5 readings
window_spec = Window.orderBy("received_time").rowsBetween(-5, 0)

# Compute rolling average of temperature
df_ml = df_spark.withColumn("rolling_avg_temp", avg(col("temperature")).over(window_spec))

# Show results
df_ml.select("deviceId", "temperature", "rolling_avg_temp").show(10)


+-----------------+-----------+------------------+
|         deviceId|temperature|  rolling_avg_temp|
+-----------------+-----------+------------------+
|device10000181022|      20.21|             20.21|
|device10000181022|      23.16|21.685000000000002|
|device10000181022|      27.64|             23.67|
|device10000181022|      28.23|24.810000000000002|
|device10000181022|      19.72|            23.792|
|device10000181022|      30.66|24.936666666666667|
|device10000181022|      24.25|             25.61|
|device10000181022|       27.4|26.316666666666666|
|device10000181022|       26.9| 26.19333333333333|
|device10000181022|      29.31|26.373333333333335|
+-----------------+-----------+------------------+



In [0]:
threshold = 5  # Define a temperature increase threshold (adjust as needed)

# Add a new column to flag temperature spikes
df_spikes = df_ml.withColumn("is_spike", col("temperature") > (col("rolling_avg_temp") + threshold))

# Show detected spikes
df_spikes.select("deviceId", "temperature", "rolling_avg_temp", "is_spike").show(10)


+-----------------+-----------+------------------+--------+
|         deviceId|temperature|  rolling_avg_temp|is_spike|
+-----------------+-----------+------------------+--------+
|device10000181022|      20.21|             20.21|   false|
|device10000181022|      23.16|21.685000000000002|   false|
|device10000181022|      27.64|             23.67|   false|
|device10000181022|      28.23|24.810000000000002|   false|
|device10000181022|      19.72|            23.792|   false|
|device10000181022|      30.66|24.936666666666667|    true|
|device10000181022|      24.25|             25.61|   false|
|device10000181022|       27.4|26.316666666666666|   false|
|device10000181022|       26.9| 26.19333333333333|   false|
|device10000181022|      29.31|26.373333333333335|   false|
+-----------------+-----------+------------------+--------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StringType, FloatType, TimestampType

# Initialize Spark Session
spark = SparkSession.builder.appName("ADX_Streaming_Polling").getOrCreate()

# Define Schema for IoT data
schema = StructType() \
    .add("deviceId", StringType()) \
    .add("temperature", FloatType()) \
    .add("humidity", FloatType()) \
    .add("status", StringType()) \
    .add("sent_time", TimestampType()) \
    .add("received_time", TimestampType())

# Read data from Azure Data Explorer (ADX) every 5 seconds
df_stream = spark.readStream \
    .format("com.microsoft.azure.kusto.spark.datasource") \
    .option("kustoCluster", "<URI>") \
    .option("kustoDatabase", "iot-table") \
    .option("kustoQuery", "iot_table_new1 | where received_time > ago(10s)") \
    .load()

# Display streaming data in Databricks
df_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime="5 seconds") \
    .start()
