In [109]:
import logging
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import *

Defaulting to user installation because normal site-packages is not writeable


In [110]:
spark = (SparkSession.builder.appName("itoss-ai")
         .config("spark.jars","./postgresql-42.7.3.jar")
         .getOrCreate())

In [111]:
jdbcUrl = "jdbc:postgresql://localhost:5432/itossconfig"
connection_properties = {
    "driver": "org.postgresql.Driver",
    "user": "itoss",
    "password": "admin"
}
query = "(select c.*, w.name as ctWorkgroup, l.name as ctLocation from ct c \
         inner join workgroup w on c.workgroup_id = w.id \
         inner join location l on c.location_id = l.id \
         inner join ct_type ct on c.type_id = ct.id \
) as subquery"  
ctDf = spark.read.jdbc(url=jdbcUrl,table=query,properties=connection_properties)
ctDf.printSchema()




root
 |-- id: long (nullable = true)
 |-- created_by: string (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- last_modified_date: timestamp (nullable = true)
 |-- attributes: string (nullable = true)
 |-- key: string (nullable = true)
 |-- environment: string (nullable = true)
 |-- name: string (nullable = true)
 |-- old_password: string (nullable = true)
 |-- state: string (nullable = true)
 |-- collector_id: long (nullable = true)
 |-- company_id: long (nullable = true)
 |-- contact_id: long (nullable = true)
 |-- location_id: long (nullable = true)
 |-- monitoring_profile_id: long (nullable = true)
 |-- support_user_id: long (nullable = true)
 |-- type_id: long (nullable = true)
 |-- workgroup_id: long (nullable = true)
 |-- instrumentation_parameter_values: string (nullable = true)
 |-- old_crypted_property_values: string (nullable = true)
 |-- integration_id: string (nullable = true)
 |-- ctworkgroup: string (nu

In [112]:
queryStatusDelta = "(select * from ct_status_delta) as subquery"  
ctStatusDeltaDf = spark.read.jdbc(url=jdbcUrl,table=queryStatusDelta,properties=connection_properties)
ctStatusDeltaDf.printSchema()


root
 |-- ct_id: long (nullable = true)
 |-- status: string (nullable = true)
 |-- last_status_change: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [131]:
from pyspark.sql.functions import col, count, lit, when, current_timestamp, expr, collect_list, struct
from pyspark.sql.types import StringType, IntegerType
import json

# 1. Filter the ct_status_delta table to include only records within the last hour
one_hour_ago = current_timestamp() - expr('INTERVAL 2 HOUR')
filtered_status_delta = ctStatusDeltaDf.filter(col("last_status_change") >= one_hour_ago)
filtered_status_delta.show()

+------+------+--------------------+--------------------+
| ct_id|status|  last_status_change|           timestamp|
+------+------+--------------------+--------------------+
|200148|    up|2024-07-01 17:06:...|2024-07-01 17:06:...|
|200148|  down|2024-07-01 17:06:...|2024-07-01 17:06:...|
|200147|  down|2024-07-01 17:06:...|2024-07-01 17:06:...|
|200147|    up|2024-07-01 17:07:...|2024-07-01 17:07:...|
|200148|  down|2024-07-01 18:19:...|2024-07-01 18:19:...|
|200147|  down|2024-07-01 18:19:...|2024-07-01 18:19:...|
|200146|  down|2024-07-01 18:19:...|2024-07-01 18:19:...|
|200145|  down|2024-07-01 18:19:...|2024-07-01 18:19:...|
+------+------+--------------------+--------------------+



In [133]:
from pyspark.sql.functions import row_number, lag, sum as spark_sum
from pyspark.sql import Window

windowSpec = Window.partitionBy("ct_id").orderBy(filtered_status_delta.last_status_change)
statusDeltaDf = filtered_status_delta.withColumn("prev_status", lag("status").over(windowSpec))

df_trend = statusDeltaDf.withColumn(
    "change",
    when((col("status") == "up") & (col("prev_status") == "down"), "improving")
    .when((col("status") == "down") & (col("prev_status") == "up"), "worsening")
    .otherwise("no change")
)

# Aggregate changes to determine the trend for each ct_id
trend_summary = df_trend.groupBy().agg(
    spark_sum(when(col("change") == "improving", 1).otherwise(0)).alias("improving_count"),
    spark_sum(when(col("change") == "worsening", 1).otherwise(0)).alias("worsening_count")
)

# Determine the final trend
final_trend = trend_summary.withColumn(
    "trend",
    when(col("improving_count") > col("worsening_count"), "Improving")
    .when(col("improving_count") < col("worsening_count"), "Worsening")
    .otherwise("No change")
)
df_trend.show(truncate=False)
final_trend.show()

df_trend.show(truncate=False)


+------+------+--------------------------+--------------------------+-----------+---------+
|ct_id |status|last_status_change        |timestamp                 |prev_status|change   |
+------+------+--------------------------+--------------------------+-----------+---------+
|200145|down  |2024-07-01 18:19:08.770353|2024-07-01 18:19:08.770353|NULL       |no change|
|200146|down  |2024-07-01 18:19:08.759216|2024-07-01 18:19:08.759216|NULL       |no change|
|200147|down  |2024-07-01 17:06:56.60581 |2024-07-01 17:06:56.60581 |NULL       |no change|
|200147|up    |2024-07-01 17:07:08.670188|2024-07-01 17:07:08.670188|down       |improving|
|200147|down  |2024-07-01 18:19:08.748226|2024-07-01 18:19:08.748226|up         |worsening|
|200148|up    |2024-07-01 17:06:00.494579|2024-07-01 17:06:00.494579|NULL       |no change|
|200148|down  |2024-07-01 17:06:27.129998|2024-07-01 17:06:27.129998|up         |worsening|
|200148|down  |2024-07-01 18:19:08.736937|2024-07-01 18:19:08.736937|down       

In [144]:

df_last_status_count = statusDeltaDf.withColumn("row_num", row_number().over(windowSpec)).filter(col("row_num") == 1).drop("row_num").groupBy("status").count()

df_last_status_count.show(truncate=False)

+------+-----+
|status|count|
+------+-----+
|down  |3    |
|up    |1    |
+------+-----+



TypeError: 'Column' object is not callable

In [115]:

# 2. Join the filtered ct_status_delta table with the ct table
joinedDf = ctDf.join(statusDeltaDf, ctDf.id == statusDeltaDf.ct_id, "inner")

# 3. Calculate the counts for different statuses and environments within the last hour
statusCounts = joinedDf.groupBy("status").agg(count("*").alias("count_last_hour"))
#envCounts = joinedDf.groupBy("environment").agg(count("*").alias("count_last_hour"))

# Calculate total counts in the ct table
total_statusCounts = ctDf.groupBy("status").agg(count("*").alias("total_count"))
#total_envCounts = ctDf.groupBy("environment").agg(count("*").alias("total_count"))

statusCounts.show(5)
#envCounts.show(5)
total_statusCounts.show(5)
#total_envCounts.show(5)



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `status` cannot be resolved. Did you mean one of the following? [`state`, `name`, `id`, `key`, `type_id`].;
'Aggregate ['status], ['status, count(1) AS total_count#6578L]
+- Relation [id#6364L,created_by#6365,creation_date#6366,last_modified_by#6367,last_modified_date#6368,attributes#6369,key#6370,environment#6371,name#6372,old_password#6373,state#6374,collector_id#6375L,company_id#6376L,contact_id#6377L,location_id#6378L,monitoring_profile_id#6379L,support_user_id#6380L,type_id#6381L,workgroup_id#6382L,instrumentation_parameter_values#6383,old_crypted_property_values#6384,integration_id#6385,ctworkgroup#6386,ctlocation#6387] JDBCRelation((select c.*, w.name as ctWorkgroup, l.name as ctLocation from ct c          inner join workgroup w on c.workgroup_id = w.id          inner join location l on c.location_id = l.id          inner join ct_type ct on c.type_id = ct.id ) as subquery) [numPartitions=1]


In [None]:

# 4. Calculate quantity and percentage variations
statusCounts = statusCounts.join(total_statusCounts, "ctStatus") \
    .withColumn("quantityVariation", col("count_last_hour") - col("total_count")) \
    .withColumn("percentageVariation", col("quantityVariation") / col("total_count") * 100)
statusCounts.printSchema()
statusCounts.show(5)


In [None]:

#envCounts = envCounts.join(total_envCounts, "environment") \
#    .withColumn("quantityVariation", col("count_last_hour") - col("total_count")) \
#    .withColumn("percentageVariation", col("quantityVariation") / col("total_count") * 100)

#envCounts.printSchema()
#envCounts.show(5)

In [None]:

# 5. Calculate the trend
statusCounts = statusCounts.withColumn("trend", when(col("count_last_hour") > col("total_count"), "up")
                                      .when(col("count_last_hour") < col("total_count"), "down")
                                      .otherwise("stable"))

#envCounts = envCounts.withColumn("trend", when(col("count_last_hour") > col("total_count"), "up")
 #                                 .when(col("count_last_hour") < col("total_count"), "down")
 #                                 .otherwise("stable"))

statusCounts.printSchema()
statusCounts.show(5)

#envCounts.printSchema()
#envCounts.show(5)
