In [32]:
import findspark
findspark.init()

In [33]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

In [34]:
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local').getOrCreate() 

In [35]:
alerts_df = spark.read.json('alerts.json')

In [36]:
alerts_df.printSchema()

root
 |-- alert_id: string (nullable = true)
 |-- entitled_assets: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- event_source_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- instance_id: string (nullable = true)
 |-- serial: string (nullable = true)



In [37]:
alerts_df.show(1)

+-----------+------------------+-------------------+-------------------+--------------------+----------+
|   alert_id|   entitled_assets|  event_source_type|         event_time|         instance_id|    serial|
+-----------+------------------+-------------------+-------------------+--------------------+----------+
|ArbwAO2m4Oa|[qnTrje2, WT87cRS]|healthchecker_alert|2019-06-07 20:50:41|ypJdMVE8XfRgbq8E2...|cyN-1QeXWm|
+-----------+------------------+-------------------+-------------------+--------------------+----------+
only showing top 1 row



In [38]:
alerts_df.count()

10000

In [39]:
alerts_df = alerts_df.withColumn("event_timestamp",
        f.to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))
alerts_df.printSchema()
alerts_df.show(1)

root
 |-- alert_id: string (nullable = true)
 |-- entitled_assets: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- event_source_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- instance_id: string (nullable = true)
 |-- serial: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)

+-----------+------------------+-------------------+-------------------+--------------------+----------+-------------------+
|   alert_id|   entitled_assets|  event_source_type|         event_time|         instance_id|    serial|    event_timestamp|
+-----------+------------------+-------------------+-------------------+--------------------+----------+-------------------+
|ArbwAO2m4Oa|[qnTrje2, WT87cRS]|healthchecker_alert|2019-06-07 20:50:41|ypJdMVE8XfRgbq8E2...|cyN-1QeXWm|2019-06-07 20:50:41|
+-----------+------------------+-------------------+-------------------+--------------------+----------+-------------------+
only showing 

In [40]:
alerts_df = alerts_df.filter((alerts_df.event_timestamp <= '2019-06-19 00:00:00') & (alerts_df.event_timestamp >= '2019-06-10 00:00:00'))
print(alerts_df.count())
alerts_df.show(1)

3130
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
|   alert_id|     entitled_assets|  event_source_type|         event_time|         instance_id|    serial|    event_timestamp|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
|1AlMxb4jeZ8|[VrE3Mr8, Z0SBQ7L...|healthchecker_alert|2019-06-11 12:46:18|Aygpbf1YPEpvn1CQj...|Ei3-SNCq7e|2019-06-11 12:46:18|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
only showing top 1 row



In [41]:
alerts_df = alerts_df.filter(alerts_df.event_source_type == 'healthchecker_alert')
print(alerts_df.count())
alerts_df.show(1)

1062
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
|   alert_id|     entitled_assets|  event_source_type|         event_time|         instance_id|    serial|    event_timestamp|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
|1AlMxb4jeZ8|[VrE3Mr8, Z0SBQ7L...|healthchecker_alert|2019-06-11 12:46:18|Aygpbf1YPEpvn1CQj...|Ei3-SNCq7e|2019-06-11 12:46:18|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+
only showing top 1 row



In [42]:
first_alerts_df = alerts_df.withColumn('date', f.split(alerts_df['event_timestamp'], ' ').getItem(0)).withColumn("date",
        f.to_timestamp("date", "yyyy-MM-dd"))
window = Window.partitionBy([f.col('alert_id'),f.col('date')]).orderBy(first_alerts_df['event_timestamp'].asc())
first_alerts_df = first_alerts_df.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') <= 1).drop('rank')

# temp_df = alerts_df.groupBy('alert_id', 'date').agg(f.min('event_timestamp').alias('event_timestamp'))

In [43]:
first_alerts_df.show(5)
first_alerts_df.count()

+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+-------------------+
|   alert_id|     entitled_assets|  event_source_type|         event_time|         instance_id|    serial|    event_timestamp|               date|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+-------------------+
|XGo0nQHCZzR|[23EOWVe, 6qIJC7R...|healthchecker_alert|2019-06-16 09:49:19|a0xSU2hkczxtj0AoI...|znS-hwMYoM|2019-06-16 09:49:19|2019-06-16 00:00:00|
|S2byDm68oNU|  [yX3zmwh, uBwVGky]|healthchecker_alert|2019-06-17 09:04:18|soSZ8ncvdQ5wwFVXJ...|MRQ-YS2Z7W|2019-06-17 09:04:18|2019-06-17 00:00:00|
|W03mt1Aibkm|[9XSEE9j, fUFO9bL...|healthchecker_alert|2019-06-11 16:10:43|BV0DVoTnydpyY66MY...|Srn-ixyDps|2019-06-11 16:10:43|2019-06-11 00:00:00|
|YgCI2UmvDhe|  [PG9M2zW, 56N9LN5]|healthchecker_alert|2019-06-17 10:38:41|BiLVjErIIEgwFKS28...|iiG-PBDH7w|2019-06-17 1

629

In [44]:
first_alerts_df = first_alerts_df.withColumn('asset_id',f.explode('entitled_assets'))

In [45]:
first_alerts_df.show(5)
first_alerts_df.count()

+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+-------------------+--------+
|   alert_id|     entitled_assets|  event_source_type|         event_time|         instance_id|    serial|    event_timestamp|               date|asset_id|
+-----------+--------------------+-------------------+-------------------+--------------------+----------+-------------------+-------------------+--------+
|XGo0nQHCZzR|[23EOWVe, 6qIJC7R...|healthchecker_alert|2019-06-16 09:49:19|a0xSU2hkczxtj0AoI...|znS-hwMYoM|2019-06-16 09:49:19|2019-06-16 00:00:00| 23EOWVe|
|XGo0nQHCZzR|[23EOWVe, 6qIJC7R...|healthchecker_alert|2019-06-16 09:49:19|a0xSU2hkczxtj0AoI...|znS-hwMYoM|2019-06-16 09:49:19|2019-06-16 00:00:00| 6qIJC7R|
|XGo0nQHCZzR|[23EOWVe, 6qIJC7R...|healthchecker_alert|2019-06-16 09:49:19|a0xSU2hkczxtj0AoI...|znS-hwMYoM|2019-06-16 09:49:19|2019-06-16 00:00:00| nTxy7lO|
|S2byDm68oNU|  [yX3zmwh, uBwVGky]|healthchecker_alert|2019-06-17

1896

In [46]:
first_alerts_df = first_alerts_df.withColumn('event_time', f.split(first_alerts_df['event_timestamp'], ' ').getItem(1))
first_alerts_df = first_alerts_df.select('alert_id', 'asset_id', 'date', 'event_time')
# first_alerts_df = first_alerts_df.select('alert_id', f.col("event_timestamp").alias('event_time'), 'asset_id', 'date')

In [47]:
first_alerts_df.show(5)
first_alerts_df.count()

+-----------+--------+-------------------+----------+
|   alert_id|asset_id|               date|event_time|
+-----------+--------+-------------------+----------+
|XGo0nQHCZzR| 23EOWVe|2019-06-16 00:00:00|  09:49:19|
|XGo0nQHCZzR| 6qIJC7R|2019-06-16 00:00:00|  09:49:19|
|XGo0nQHCZzR| nTxy7lO|2019-06-16 00:00:00|  09:49:19|
|S2byDm68oNU| yX3zmwh|2019-06-17 00:00:00|  09:04:18|
|S2byDm68oNU| uBwVGky|2019-06-17 00:00:00|  09:04:18|
+-----------+--------+-------------------+----------+
only showing top 5 rows



1896

In [48]:
first_alerts_df.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("alerts.csv")
#first_alerts_df.write.format("com.databricks.spark.csv").option("header", "true").save("alerts.csv")

In [49]:
sc.stop()