## Overview
Temporal provides an append-only table `history_node` that contains all data.
It deletes rows from the table but only to save space.

## Generating a descriptor file for `temporal.api.history.v1.History` (array of `HistoryEvent`)
```shell
git clone https://github.com/temporalio/api.git
cd api
protoc -I . \      
    temporal/api/history/v1/message.proto \
    -o descriptors.binpb \
    --include_imports \
    --include_source_info
```

The output file `descriptors.binpb` should be placed somewhere Spark can read it. In Databricks, a convenient place is DBFS. This can be done via the UI by clicking "upload" in a folder in the file explorer and selecting the file, or programmatically via the Databricks API.

## References
- https://github.com/temporalio/api/tree/v1.24.0
- https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html
- https://docs.gcp.databricks.com/structured-streaming/protocol-buffers.html
- https://buf.build/docs/reference/descriptors#generating-and-exchanging-descriptors


## `temporal.history_node_cdc`
- We save this in Databricks as a delta table for performance only
- The CDC records contain deletes. We do not care about this at all because the table is append only. Deletes are an implementation detail of Temporal that are part of garbage collection. They are  useful only if we want to analyze how it cleans up the database, or something

In [None]:
# raw = spark.read.format("parquet").load("gs://path/to/history-cdc/").dropDuplicates()
# spark.sql("CREATE DATABASE IF NOT EXISTS temporal")

# raw.write.format("delta").mode("overwrite").saveAsTable("temporal.history_node_cdc")

The history node table from Debezium / Kafka CDC has the following schema:

In [None]:
# %sql
# CREATE TABLE spark_catalog.temporal.history_node_cdc (
#   key STRUCT < shard_id: INT,
#   tree_id: BINARY,
#   branch_id: BINARY,
#   node_id: BIGINT,
#   txn_id: BIGINT >,
#   value STRUCT < before: STRUCT < shard_id: INT,
#   tree_id: BINARY,
#   branch_id: BINARY,
#   node_id: BIGINT,
#   txn_id: BIGINT,
#   data: BINARY,
#   data_encoding: STRING,
#   prev_txn_id: BIGINT >,
#   after: STRUCT < shard_id: INT,
#     tree_id: BINARY,
#     branch_id: BINARY,
#     node_id: BIGINT,
#     txn_id: BIGINT,
#     data: BINARY,
#     data_encoding: STRING,
#     prev_txn_id: BIGINT >,
#     source: STRUCT < version: STRING,
#     connector: STRING,
#     name: STRING,
#     ts_ms: BIGINT,
#     snapshot: STRING,
#     db: STRING,
#     sequence: STRING,
#     schema: STRING,
#     table: STRING,
#     txId: BIGINT,
#     lsn: BIGINT,
#     xmin: BIGINT >,
#     op: STRING,
#     ts_ms: BIGINT,
#     transaction: STRUCT < id: STRING,
#     total_order: BIGINT,
#     data_collection_order: BIGINT > >,
#     offset BIGINT,
#     timestamp BIGINT,
#     _rescued_data STRING
# ) USING delta TBLPROPERTIES (
#   'delta.minReaderVersion' = '1',
#   'delta.minWriterVersion' = '2'
# )

This is a fast way to copy in batch in Databricks and uses checkpointing.

In [None]:
%sql
CREATE TABLE IF NOT EXISTS `temporal`.`history_node_cdc`;
COPY INTO temporal.history_node_cdc 
FROM 'gs://path/to/history-node-cdc/'
FILEFORMAT = parquet
COPY_OPTIONS ('mergeSchema' = 'true')

Print out information about the CDC source data from Debezium/Kafka (optional).

In [None]:
from pyspark.sql.functions import col

n_deletes = spark.table("temporal.history_node_cdc").where(col("value.after").isNull()).count()
n_cdc_records = spark.table("temporal.history_node_cdc").count()
n_table_rows = spark.table("temporal.history_node_cdc").where(col("value.after").isNotNull()).count()
n_modifications = spark.table("temporal.history_node_cdc").where(col('value.before').isNotNull()).where(col("value.after").isNotNull()).count()

print({
    'n_deletes': n_deletes,
    'n_modifications': n_modifications,
    'n_cdc_records': n_cdc_records,
    'n_table_rows': n_table_rows
})

# Temporal History

Provides the `"temporal.history"` table from Temporal's raw `history_node` table with the following properties:
- All event log / history fields at the top level of the table (decoded from their protobuf representation)
- All rows have `workflow_info` which contains workflow id and other core identifying information about the workflow that the row belongs to
- This is optimized for batch workloads. For streaming we cannot use Window. Instead, we can swap with self joins.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.window import Window


# Get Temporal history_node table
input_df = (
    spark.read.table("temporal.history_node_cdc")
    .where(col("value.after").isNotNull())
    .select(col("value.after.*"))
    .dropDuplicates()
)

# Order history_node table such that workflow executions are grouped and within each one their events are in ascending order
history_node_df = input_df#.orderBy("tree_id", "branch_id", "node_id", "txn_id")


# Decode the data column and spread all columns from it on the same level as the history_node table columns
temporal_history_node_proto_descriptor_filepath = (
    "./descriptors.binpb"
)

history_node_exploded_proto = (
    input_df.withColumn(
        "proto",
        from_protobuf(
            input_df.data,
            "History",
            descFilePath=temporal_history_node_proto_descriptor_filepath,
            options={"recursive.fields.max.depth": "2"},
        ),
    )
    .select(
        # Primary key columns (in this order)
        "shard_id",
        "tree_id",
        "branch_id",
        "node_id",
        "txn_id",
        # Adds a row per item in the history array entry. The array item is stored in the entry column and star-expended in the next step
        explode("proto.events").alias("entry"),
        "prev_txn_id",
    )
    .select(
        # Repeat all fields from above
        "shard_id",
        "tree_id",
        "branch_id",
        "node_id",
        "txn_id",
        # Star expand the history entry, effectively adding a column per history event type to the table
        "entry.*",
        "prev_txn_id",
    )
)

# Adds a column workflow_info to each row, where workflow_info is the execution start event of each workflow
with_wf_info = (
    history_node_exploded_proto.withColumn(
        "workflow_info",
        first(
            history_node_exploded_proto.workflow_execution_started_event_attributes,
            ignorenulls=True,
        ).over(
            Window.partitionBy("shard_id", "tree_id").orderBy(
                -col("txn_id")
            )
        ),
    )
    .withColumn(
        "run_id",
        coalesce(
            first(
                col("workflow_task_failed_event_attributes.new_run_id"),
                ignorenulls=True,
            ).over(
                Window.partitionBy("shard_id", "tree_id", "branch_id").orderBy(
                    -col("txn_id")
                )
            ),
            col("workflow_info.original_execution_run_id"),
        ),
    )
    .withColumn("workflow_id", col("workflow_info.workflow_id"))
    .withColumn("workflow_type", col("workflow_info.workflow_type.name"))
    .withColumn(
        "parent_workflow_id", col("workflow_info.parent_workflow_execution.workflow_id")
    )
    .withColumn(
        "parent_workflow_run_id", col("workflow_info.parent_workflow_execution.run_id")
    )
    # .withColumn("run_id", col("workflow_info.original_execution_run_id"))
    .withColumn("first_execution_run_id", col("workflow_info.first_execution_run_id"))
    .withColumn(
        "prev_execution_run_id",
        coalesce(
            first(
                col("workflow_task_failed_event_attributes.base_run_id"),
                ignorenulls=True,
            ).over(
                Window.partitionBy("shard_id", "tree_id", "branch_id").orderBy(
                    -col("txn_id")
                )
            ),
            col("workflow_info.continued_execution_run_id"),
        ),
    )
    .withColumn(
        "task_queue",
        coalesce(
            col("workflow_info.task_queue.normal_name"),
            col("workflow_info.task_queue.name"),
        ),
    )
    # Select all columns in the order we want to view them in
    .select(
        "workflow_id",
        "run_id",
        "workflow_type",
        "event_time",
        "event_type",
        "parent_workflow_id",
        "parent_workflow_run_id",
        "first_execution_run_id",
        "prev_execution_run_id",
        "temporal_ui_link",
        "task_queue",
        "event_id",
        "workflow_info",
        "workflow",
        "workflow_execution_started_event_attributes",
        "workflow_execution_completed_event_attributes",
        "workflow_execution_failed_event_attributes",
        "workflow_execution_timed_out_event_attributes",
        "workflow_task_scheduled_event_attributes",
        "workflow_task_started_event_attributes",
        "workflow_task_completed_event_attributes",
        "workflow_task_timed_out_event_attributes",
        "workflow_task_failed_event_attributes",
        "activity_task_scheduled_event_attributes",
        "activity_task_started_event_attributes",
        "activity_task_completed_event_attributes",
        "activity_task_failed_event_attributes",
        "activity_task_timed_out_event_attributes",
        "timer_started_event_attributes",
        "timer_fired_event_attributes",
        "activity_task_cancel_requested_event_attributes",
        "activity_task_canceled_event_attributes",
        "timer_canceled_event_attributes",
        "marker_recorded_event_attributes",
        "workflow_execution_signaled_event_attributes",
        "workflow_execution_terminated_event_attributes",
        "workflow_execution_cancel_requested_event_attributes",
        "workflow_execution_canceled_event_attributes",
        "request_cancel_external_workflow_execution_initiated_event_attributes",
        "request_cancel_external_workflow_execution_failed_event_attributes",
        "external_workflow_execution_cancel_requested_event_attributes",
        "workflow_execution_continued_as_new_event_attributes",
        "start_child_workflow_execution_initiated_event_attributes",
        "start_child_workflow_execution_failed_event_attributes",
        "child_workflow_execution_started_event_attributes",
        "child_workflow_execution_completed_event_attributes",
        "child_workflow_execution_failed_event_attributes",
        "child_workflow_execution_canceled_event_attributes",
        "child_workflow_execution_timed_out_event_attributes",
        "child_workflow_execution_terminated_event_attributes",
        "signal_external_workflow_execution_initiated_event_attributes",
        "signal_external_workflow_execution_failed_event_attributes",
        "external_workflow_execution_signaled_event_attributes",
        "upsert_workflow_search_attributes_event_attributes",
        "workflow_execution_update_accepted_event_attributes",
        "workflow_execution_update_rejected_event_attributes",
        "workflow_execution_update_completed_event_attributes",
        "workflow_properties_modified_externally_event_attributes",
        "activity_properties_modified_externally_event_attributes",
        "workflow_properties_modified_event_attributes",
        "shard_id",
        "tree_id",
        "branch_id",
        "node_id",
        "txn_id",
        # "prev_txn_id",
        "task_id",
        "version",
        "worker_may_ignore",
    )
)

## Invariants

Derivations require the following invariants about the base tables to hold:

For the base history node table in Databricks we assume 1 row per the primary key Temporal uses in Postgres: `"shard_id"` + `"tree_id"` + `"branch_id"` + `"node_id"` + `"txn_id"`. 

For the decoded protobuf tables, we assume 1 row per `"shard_id"` + `"tree_id"` + `"branch_id"` + `"node_id"` + `"event_id"`, where `"event_id"` a value from the decoded protobuf array items for History. This is a monotonically increasing number that Temporal uses to identify log entries in their API.

The code in the following cell expresses these invariants. It throws errors if our assumptions about the base data are ever violated, stopping updates (or creation) of the tables that we use for further computations.

In [None]:
history_node_primary_key = ["shard_id", "tree_id", "branch_id", "node_id", "txn_id"]

# Source data has one row per primary key
assert (
    input_df.orderBy(col("timestamp").desc())
    .groupBy(*history_node_primary_key)
    .agg(count("*").alias('ct'))
    .where(col("ct") > 1)
    .count()
    == 0
)

# Exploded proto has one one row per shard_id + tree_id + branch_id + node_id (from the source table) + event_id (from the decoded and exploded protobuf)
# (expected if CDC table rows are edited/updated and/or temporal appends to the table only)
assert(
    history_node_exploded_proto.groupBy(
        "shard_id", "tree_id", "branch_id", "node_id", "event_id"
    )
    .agg(count("event_id").alias("ct_event"))
    .where(col("ct_event") > 1)
    .count()
    == 0
)

# The table with workflow info should have no repeats for the same criteria
assert(
    with_wf_info.groupBy("shard_id", "tree_id", "branch_id", "node_id", "event_id")
    .agg(count("event_id").alias("ct_event"))
    .where(col("ct_event") > 1)
    .count()
    == 0
)

# All workflow started event rows have event_type workflow_execution_started,
# non-null column `workflow_execution_started_event_attributes`
# and event_id.= 1
assert (
    with_wf_info.where(
        (col("event_type") == "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED")
        & (col("workflow_execution_started_event_attributes").isNotNull())
        & (col("event_id") != 1)
    ).count()
    == 0
)

# All rows have workflow_id
assert(with_wf_info.where(col("workflow_id").isNull()).count() == 0)

## Drop Columns
- We can always add these back
- We expect they are not needed by anything from here on
- Anything we leave will be needed for something or other

In [None]:
with_wf_info = with_wf_info.drop(
    # 100% missing
    "version",
    "worker_may_ignore",

    # Primary key attributes of the Postgres table
    # Used to derive ordering and uniqueness, don't expect to need them past this point. If we do we can add them back
    "shard_id",
    "tree_id",
    "branch_id",
    "node_id",
    "txn_id",
    "task_id",
    # "event_id"
)

## Drop Rows
- Ignore history of temporal system workflows

In [None]:
is_ignored_workflow_type = col("workflow_type").isin(
    [
        "temporal-sys-batch-workflow",
        "ExecuteBlobStoreCleanupCron",
        "RunScheduleSyncCron",
        "UpdateTimedOutWorkflowRuns",
        "temporalCloudAuthRotationWorkflow"
        "temporal-sys-history-scanner-workflow"
        "temporal-sys-tq-scanner-workflow",
    ]
)
with_wf_info = with_wf_info.where(~is_ignored_workflow_type)

## Save table

In [None]:
with_wf_info.write.format("delta").mode("overwrite").saveAsTable("temporal.history")

# Temporal Workflows

Produces a table `temporal.workflows` with the following properties:

- The set of all workflows, independent of executions. This simply means unique by `"workflow_id"`.
- Excludes child workflows.
- The status of the workflow, one of "RUNNING", "TERMINATED", "COMPLETED". This is derived from the latest execution that corresponds to the workflow_id.
- The time the workflow started.
- The time the workflow completed, if any.



## Running Workflows
Derives from `temporal.history` the workflows that are root workflows, meaning
1. they have started events
2. they have no parent
3. there is (at least) 1 run for which there is no corresponding event that terminates the run (terminated, failed, completed, cancelled, continued as new)

In [None]:
is_terminal_event_type = col("event_type").isin(
    [
        "EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED",
        "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
        "EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED",
        "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED",
        "EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW",
        "EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT",
    ]
)

wind = Window.partitionBy("workflow_id").orderBy(col("event_time").desc())
# a better name in this context might be execution or workflow execution to disambiguate from other uses of workflow
running_workflows = (
    # Inconsistent with the Temporal UI, which shows children and root workflows together
    with_wf_info.where(col("parent_workflow_id").isNull()) 
    .withColumn("latest_event_time", max(col("event_time")).over(wind))
    .where(col("event_time") == col("latest_event_time"))
    .where(~is_terminal_event_type)
)


### Invariants


In [None]:
# No duplicate workflow ids for running workflows
assert (
    running_workflows.dropDuplicates(["workflow_id"]).count()
    == running_workflows.count()
)

## Completed Workflows


In [None]:
completed_workflows = (
    with_wf_info
    .where(col("event_type") == "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED")
)

In [None]:
# TODO. move. maybe this is reusable
def get_histories_for_runs(wfs: DataFrame, history: DataFrame):
    wfs.select("workflow_id", "run_id").alias("runs").join(
        history.alias("history"),
        ["workflow_id", "run_id"],
        "inner",
    ).orderBy("workflow_id", "run_id", "event_id")


completed_of_example_type = (
    completed_workflows.where(col("workflow_type") == "example")
    .select("workflow_id", "run_id")
    .alias("runs")
    .join(
        with_wf_info.alias("history"),
        ["workflow_id", "run_id"],
        "inner",
    )
    .orderBy("workflow_id", "run_id", "event_id")
)

## Summary


In [None]:
summary = {
    "running": running_workflows.count(),
    "completed": completed_workflows.count(),
}

print(summary)

In [None]:
event_type = [
    "EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED",
    "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
    "EVENT_TYPE_ACTIVITY_TASK_FAILED",
    "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
    "EVENT_TYPE_ACTIVITY_TASK_STARTED",
    "EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT",

    "EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED",
    "EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED",

    "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED",

    "EVENT_TYPE_MARKER_RECORDED",
    
    "EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
    "EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
    
    "EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED",
    "EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED",
    
    "EVENT_TYPE_TIMER_CANCELED",
    "EVENT_TYPE_TIMER_FIRED",
    "EVENT_TYPE_TIMER_STARTED",
    
    "EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES",
    
    "EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW",
    "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT",

    
    "EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED",
    
    "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
    "EVENT_TYPE_WORKFLOW_TASK_FAILED",
    "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
    "EVENT_TYPE_WORKFLOW_TASK_STARTED",
    "EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT",
]

workflow_execution = [
    "EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW",
    "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED",
    "EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT"
]

timer_event_type = [
    "EVENT_TYPE_TIMER_CANCELED",
    "EVENT_TYPE_TIMER_FIRED",
    "EVENT_TYPE_TIMER_STARTED",
]

activity_event_type = [
    "EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED",
    "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
    "EVENT_TYPE_ACTIVITY_TASK_FAILED",
    "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
    "EVENT_TYPE_ACTIVITY_TASK_STARTED",
    "EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT",
]

child_workflow_event_type = [
    "EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED",
    "EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED",
    "EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED",
    "EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED",
]

# Temporal Activities

Derives a table `temporal.activities` from the `temporal.history` table with the following properties:
- Matches activity input to activity success or failure
- Decodes activity input / output / error columns from base64 string to JSON encoded string (`input`, `output`, `error` columns)


## Activity History

In [None]:
activity_history = (
    spark.table("temporal.history")
    .where(col("event_type").isin(activity_event_type))
    .withColumn(
        "input",
        col("activity_task_scheduled_event_attributes.input.payloads")[0]["data"].cast(
            "string"
        ),
    )
    .withColumn(
        "output",
        col("activity_task_completed_event_attributes.result.payloads")[0]["data"].cast(
            "string"
        ),
    )
    .withColumn(
        "error",
        # Provides any JSON data that was returned from an activity. There is often richer data like error messages and stacktraces in the raw column
        col(
            "activity_task_failed_event_attributes.failure.application_failure_info.details.payloads"
        )[0]["data"].cast("string"),
    )
    .select(
        "workflow_id",
        "run_id",
        "workflow_type",
        "event_time",
        "event_type",
        "parent_workflow_id",
        "parent_workflow_run_id",
        "first_execution_run_id",
        "prev_execution_run_id",
        "temporal_ui_link",
        "task_queue",
        "event_id",
        "workflow_info",
        "workflow",
        "input",
        "output",
        "error",
        "activity_task_scheduled_event_attributes",
        "activity_task_started_event_attributes",
        "activity_task_completed_event_attributes",
        "activity_task_failed_event_attributes",
        "activity_task_timed_out_event_attributes",
        "activity_task_cancel_requested_event_attributes",
        "activity_task_canceled_event_attributes",
    )
)

## Activity Calls

In [None]:
# Temporal only puts the activity's name on the scheduled event 
# Most of this is just linking all other activity events with the activity name
# As a side effect we end up establishing the identity of an activity invocation, spreading an id
# globally unique by workflow_id+run_id+activity_scheduled_event_id across one or more rows
#
# In the second part we use this to form the activity_calls table. 
# This collapses the different activity events that make up an invocation into one row.

activity_name_event = (
    activity_history
    .withColumn(
        "activity_type",
        col("activity_task_scheduled_event_attributes.activity_type.name"),
    )
     .withColumn(
        "activity_task_queue",
        coalesce(
            col("activity_task_scheduled_event_attributes.task_queue.normal_name"),
            col("activity_task_scheduled_event_attributes.task_queue.name"),
        ),
    )
     .withColumnRenamed("task_queue",'workflow_task_queue')
    .where(col("activity_type").isNotNull())
    .select(
        "workflow_id",
        "run_id",
        col("event_id").alias("activity_event_id"),
        "activity_type",
        "activity_task_queue",
        "workflow_task_queue"
    )
)

with_activity_evt_id = activity_history.withColumn(
    "activity_event_id",
    coalesce(
        when(
            col("activity_task_scheduled_event_attributes").isNotNull(),
            col("event_id"),
        ),
        when(
            col("activity_task_started_event_attributes").isNotNull(),
            col("activity_task_started_event_attributes.scheduled_event_id"),
        ),
        when(
            col("activity_task_completed_event_attributes").isNotNull(),
            col("activity_task_completed_event_attributes.scheduled_event_id"),
        ),
         when(
            col("activity_task_failed_event_attributes").isNotNull(),
            col("activity_task_failed_event_attributes.scheduled_event_id"),
        ),
         when(
            col("activity_task_timed_out_event_attributes").isNotNull(),
            col("activity_task_timed_out_event_attributes.scheduled_event_id"),
        ),
    ),
)

activity_invocation_window = Window.partitionBy(
    "workflow_id", "run_id", "activity_event_id"
)

activity_calls = (
    with_activity_evt_id.join(
        activity_name_event, ["workflow_id", "run_id", "activity_event_id"]
    )
    .withColumn(
        "time_scheduled",
        first(
            when(
                col("event_type") == "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
                col("event_time"),
            ),
            ignorenulls=True,
        ).over(activity_invocation_window),
    )
    .withColumn(
        "time_started",
        first(
            when(
                col("event_type") == "EVENT_TYPE_ACTIVITY_TASK_STARTED",
                col("event_time"),
            ),
            ignorenulls=True,
        ).over(activity_invocation_window),
    )
    .withColumn(
        "time_completed",
        first(
            when(
                col("event_type").isin(
                    [
                        "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
                        "EVENT_TYPE_ACTIVITY_TASK_FAILED",
                        "EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT",
                    ]
                ),
                col("event_time"),
            ),
            ignorenulls=True,
        ).over(activity_invocation_window),
    )
    .withColumn(
        "activity_task_started_event_attributes",
        first(col("activity_task_started_event_attributes"), ignorenulls=True).over(
            activity_invocation_window
        ),
    )
    .withColumn(
        "activity_task_completed_event_attributes",
        first(col("activity_task_completed_event_attributes"), ignorenulls=True).over(
            activity_invocation_window
        ),
    )
    .withColumn(
        "activity_task_failed_event_attributes",
        first(col("activity_task_failed_event_attributes"), ignorenulls=True).over(
            activity_invocation_window
        ),
    )
    .withColumn(
        "activity_task_timed_out_event_attributes",
        first(col("activity_task_timed_out_event_attributes"), ignorenulls=True).over(
            activity_invocation_window
        ),
    )
    .withColumn(
        "activity_task_cancel_requested_event_attributes",
        first(
            col("activity_task_cancel_requested_event_attributes"), ignorenulls=True
        ).over(activity_invocation_window),
    )
    .withColumn(
        "activity_task_canceled_event_attributes",
        first(col("activity_task_canceled_event_attributes"), ignorenulls=True).over(
            activity_invocation_window
        ),
    )
    .withColumn(
        "input",
        first(col("input"), ignorenulls=True).over(activity_invocation_window),
    )
    .withColumn(
        "output",
        first(col("output"), ignorenulls=True).over(activity_invocation_window),
    )
    .withColumn(
        "error",
        first(col("error"), ignorenulls=True).over(activity_invocation_window),
    )
    .dropDuplicates(["workflow_id", "run_id", "activity_event_id"])
    .select(
        "workflow_id",
        "run_id",
        "workflow_type",
        "activity_type",
        "time_scheduled",
        "time_started",
        "time_completed",
        "input",
        "output",
        "error",
        "parent_workflow_id",
        "parent_workflow_run_id",
        "first_execution_run_id",
        "prev_execution_run_id",
        "temporal_ui_link",
        "task_queue",
        "event_id",
        "workflow_info",
        "workflow",
        "activity_task_scheduled_event_attributes",
        "activity_task_started_event_attributes",
        "activity_task_completed_event_attributes",
        "activity_task_failed_event_attributes",
        "activity_task_timed_out_event_attributes",
        "activity_task_cancel_requested_event_attributes",
        "activity_task_canceled_event_attributes",
    )
)

### Save Table

In [None]:
activity_calls.write.format("delta").mode("overwrite").option("overwriteSchema",True).saveAsTable("temporal.activity_calls")

## Activities Tables

In [None]:
# TODO. can we use temporal.activity_calls if we partition by activity_type?
(
    activity_calls.write.format("delta")
    .partitionBy("activity_type")
    .mode("overwrite")
    .saveAsTable("tmp_activity_calls_by_activity_type")
)

In [None]:
# for now we save to activites.{activity_type}
spark.sql("CREATE DATABASE IF NOT EXISTS activities")

In [None]:
# Import generated Diachronic types Python Package
from diachronic.types.activities import schemas as activity_schemas
import multiprocessing
from threading import Thread
from queue import Queue
from pyspark.sql.functions import *

by_activity_type = spark.table("tmp_activity_calls_by_activity_type")


def run(f, q: Queue):
    while not q.empty():
        activity_type, schema = q.get()
        f(activity_type, schema)
        q.task_done()


def fn(activity_type, schema):
    try:
        has_input = True if schema.get("input", None) is not None else False
        has_output = True if schema.get("output", None) is not None else False
        has_error = True if schema.get("error", None) is not None else False

        table = (
            by_activity_type.where(col("activity_type") == activity_type)
            .withColumn(
                "input", from_json("input", schema["input"]) if has_input else col("input")
            )
            .withColumn(
                "output",
                from_json("output", schema["output"]) if has_output else col("output"),
            )
            .withColumn(
                "error", from_json("error", schema["error"]) if has_error else col("error")
            )
        )
  
        table.write.format("delta").mode("overwrite").saveAsTable(
            f"activities.{activity_type}"
        )
    except Exception as e:
        print(f"this failed {activity_type}", e)


num_cores = multiprocessing.cpu_count()

q = Queue()

for (activity_type, schema) in activity_schemas.items():
    q.put((activity_type, schema))

for i in range(num_cores):
    # print("core", i)
    t = Thread(target=run, args=(fn, q), name=f"activity_tables-{i}")
    # t.daemon = True
    t.start()

q.join()

# Temporal Signals

Derives table `temporal.signals` from `temporal.history` with the following properties:

- Table contains only "workflow execution signaled" // TODO. decide if we want to include child workflow signals or any others temporal encodes
- Each row has a column with the JSON encoded `signal_payload`
- Each row has columns for each Embedded Insurance signal type with JSON decoded `signal_payload`. Only one of these is populated per row since it corresponds to the signal type definition. The name of the column for the decoded data is derived from `signal_type` via the function `signal_column_name`.

In [None]:
from pyspark.sql.functions import col

with_wf_info = spark.table("temporal.history")

signals = (
    spark.table("temporal.history").where(
        # TODO. we may wish to include other signal types in this batch (child wfs may have special event_type for this, others)
        (col("event_type") == "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED")
    )
    .withColumn(
        "signal_type",
        col("workflow_execution_signaled_event_attributes.signal_name"),
    )
    .withColumn(
        "signal_payload",
       col("workflow_execution_signaled_event_attributes.input.payloads")[0][
            "data"
        ].cast("string"),
    )
    .select(
        "event_time",
        "signal_type",
        "signal_payload",
        "workflow_info",
        "workflow",
        "workflow_id",
        "workflow_type",
        "parent_workflow_id",
        "parent_workflow_run_id",
        "run_id",
        "first_execution_run_id",
        "prev_execution_run_id",
        "temporal_ui_link",
        "task_queue",
        "event_id",
        # "event_type",
        # "workflow_execution_signaled_event_attributes",
    )
)

## Save table

In [None]:
signals.write.format("delta").mode("overwrite").option("mergeSchema","true").saveAsTable("temporal.signals")

## Signals Tables

Create 1 table per signal type.


In [None]:
def omit(m: dict, ks: list[str]):
    return {k: v for k, v in m.items() if k not in ks}

In [None]:
from diachronic.types.signals import schemas

# Helpers: signals
def signal_sql_name(s: str) -> str:
    """Normalizes signal names for usage as Spark SQL tables or columns."""
    s2 =  s.replace(".", "_").replace("-","_")
    return s2

In [None]:
generated_tables_map_fully_qualified = {k: f"signals.{signal_sql_name(k)}" for k in schemas.keys() }

generated_tables_map_fully_qualified

In [None]:
# above should be new temporal.signals (everything is a string) or some other name
# from which we now derive the following individuated signals tables (schematized, exploded):

from diachronic.types.signals import schemas
from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json


def create_signals_tables(
    schemas: dict[str, str], table_map: dict[str, str], base_df: DataFrame
) -> dict[str, DataFrame]:
    return {
        table_name: base_df.where(col("signal_type") == signal_type)
        .withColumn(
            "payload", from_json(base_df["signal_payload"], schema=schemas[signal_type])
        )
        .drop("signal_type", "signal_payload")
        .select("payload.*", "*")
        for signal_type, table_name in table_map.items()
    }


signal_tables = create_signals_tables(
    schemas, generated_tables_map_fully_qualified, signals
)


# for name, df in signal_tables.items():
#     display(df)

In [None]:
%sql
create database if not exists signals

### Save tables

In [None]:
for table_name, df in signal_tables.items():
    df.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable(table_name)

In [None]:
# import threading
# for thread in threading.enumerate():
#     print(thread.name)