In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, abs, to_timestamp, unix_timestamp, when
import yaml


In [0]:
yaml_df = spark.read.text(
    "/Volumes/workspace/default/frontrunningproject/configuration/alert_config.yaml"
)

yaml_text = "\n".join(row.value for row in yaml_df.collect())
config = yaml.safe_load(yaml_text)

# Extract thresholds
pip_threshold = config["thresholds"]["pip_deviation_threshold"]
latency_threshold = config["thresholds"]["latency_threshold_ms"]
slippage_threshold = config["thresholds"]["slippage_threshold_pips"]
pre_trade_window = config["thresholds"]["pre_trade_lookback_seconds"]

# Weights
weights = config["risk_weights"]

# Paths
rfq_path = config["paths"]["rfq_input"]
trades_path = config["paths"]["trades_input"]
benchmark_path = config["paths"]["benchmark_input"]
output_path = config["paths"]["output_path"]


In [0]:
rfq_df = spark.read.option("header", True).option("inferSchema", True).csv(rfq_path)
trades_df = spark.read.option("header", True).option("inferSchema", True).csv(trades_path)
benchmark_df = spark.read.option("header", True).option("inferSchema", True).csv(benchmark_path)


In [0]:
rfq_df = rfq_df.withColumn(
    "rfq_time", to_timestamp("rfq_time", "yyyy-MM-dd HH:mm:ss")
)

trades_df = trades_df.withColumn(
    "trade_time", to_timestamp("trade_time", "yyyy-MM-dd HH:mm:ss")
)


In [0]:
r = rfq_df.alias("r")
t = trades_df.alias("t")


In [0]:
pre_trade_df = r.join(
    t,
    (col("r.dealer") == col("t.dealer")) &
    (col("r.pair") == col("t.pair")) &
    (unix_timestamp(col("r.rfq_time")) > unix_timestamp(col("t.trade_time"))) &
    (unix_timestamp(col("r.rfq_time")) - unix_timestamp(col("t.trade_time")) <= pre_trade_window),
    "left"
)


In [0]:
alerts_df = pre_trade_df.withColumn(
    "pre_trade_alert",
    col("t.trade_time").isNotNull().cast("int")
)


In [0]:
pip_multiplier = when(col("r.pair").like("%JPY"), 100).otherwise(10000)

alerts_df = alerts_df.withColumn(
    "benchmark_alert",
    (abs(col("r.dealer_quote") - col("r.market_mid")) * pip_multiplier > pip_threshold).cast("int")
)


In [0]:
alerts_df = alerts_df.withColumn(
    "latency_alert",
    (col("r.latency_ms") > latency_threshold).cast("int")
)


In [0]:
alerts_df = alerts_df.withColumn(
    "execution_quality_alert",
    (abs(col("r.client_exec_price") - col("r.market_mid")) * pip_multiplier > slippage_threshold).cast("int")
)


In [0]:
alerts_df = alerts_df.withColumn(
    "risk_score",
    col("pre_trade_alert") * weights["pre_trade"] +
    col("benchmark_alert") * weights["benchmark"] +
    col("latency_alert") * weights["latency"] +
    col("execution_quality_alert") * weights["execution_quality"]
)


In [0]:
alerts_df = alerts_df.withColumn(
    "risk_reasons",
    F.expr("""
        filter(
            array(
                CASE WHEN pre_trade_alert = 1 THEN 'PRE_TRADE' END,
                CASE WHEN benchmark_alert = 1 THEN 'BENCHMARK_DEVIATION' END,
                CASE WHEN latency_alert = 1 THEN 'LATENCY' END,
                CASE WHEN execution_quality_alert = 1 THEN 'EXECUTION_QUALITY' END
            ),
            x -> x IS NOT NULL
        )
    """)
)

alerts_df = alerts_df.withColumn(
    "risk_reasons",
    when(F.size(col("risk_reasons")) == 0, F.array(F.lit("NO_RISK")))
    .otherwise(col("risk_reasons"))
)


In [0]:
alerts_df = alerts_df.withColumn(
    "risk_severity",
    when(col("risk_score") >= 5, "HIGH")
    .when(col("risk_score") >= 3, "MEDIUM")
    .otherwise("LOW")
)


In [0]:
final_df = alerts_df.select(
    col("r.rfq_id").alias("rfq_id"),
    col("r.client").alias("client"),
    col("r.dealer").alias("dealer"),
    col("r.pair").alias("pair"),
    "pre_trade_alert",
    "benchmark_alert",
    "latency_alert",
    "execution_quality_alert",
    "risk_score",
    "risk_severity",
    "risk_reasons"
)

display(final_df)


rfq_id,client,dealer,pair,pre_trade_alert,benchmark_alert,latency_alert,execution_quality_alert,risk_score,risk_severity,risk_reasons
RFQ0004,CLT004,DLR01,AUDUSD,0,0,0,0,0,LOW,List(NO_RISK)
RFQ0005,CLT005,DLR02,EURUSD,1,0,1,0,4,MEDIUM,"List(PRE_TRADE, LATENCY)"
RFQ0006,CLT006,DLR03,USDJPY,0,0,0,0,0,LOW,List(NO_RISK)
RFQ0007,CLT007,DLR01,GBPUSD,0,0,0,0,0,LOW,List(NO_RISK)
RFQ0008,CLT008,DLR02,AUDUSD,0,0,1,0,1,LOW,List(LATENCY)
RFQ0009,CLT009,DLR03,EURUSD,0,0,0,0,0,LOW,List(NO_RISK)
RFQ0010,CLT010,DLR01,USDJPY,1,0,1,0,4,MEDIUM,"List(PRE_TRADE, LATENCY)"
RFQ0011,CLT011,DLR02,GBPUSD,0,0,0,0,0,LOW,List(NO_RISK)
RFQ0012,CLT012,DLR03,AUDUSD,0,0,1,0,1,LOW,List(LATENCY)
RFQ0016,CLT016,DLR01,AUDUSD,0,0,0,0,0,LOW,List(NO_RISK)


In [0]:
dbutils.fs.mkdirs(output_path)


True

In [0]:
csv_df = final_df.withColumn(
    "risk_reasons",
    F.concat_ws("|", col("risk_reasons"))
)


In [0]:
if config["write"]["csv"]:
    csv_df.coalesce(1) \
        .write.mode("overwrite") \
        .option("header", True) \
        .csv(f"{output_path}/rfq_alerts_csv")


In [0]:
if config["write"]["json"]:
    final_df.write.mode("overwrite").json(
        f"{output_path}/rfq_alerts_json"
    )


In [0]:
summary_df = final_df.agg(
    F.sum("pre_trade_alert").alias("pre_trade_count"),
    F.sum("benchmark_alert").alias("benchmark_count"),
    F.sum("latency_alert").alias("latency_count"),
    F.sum("execution_quality_alert").alias("execution_quality_count"),
    F.sum(when(col("risk_severity") == "HIGH", 1).otherwise(0)).alias("high_risk_rfqs")
)

summary_df.write.mode("overwrite").json(
    f"{output_path}/summary"
)

display(summary_df)


pre_trade_count,benchmark_count,latency_count,execution_quality_count,high_risk_rfqs
16,4,16,4,4


In [0]:
dbutils.fs.ls(output_path)


[FileInfo(path='dbfs:/Volumes/workspace/default/frontrunningproject/output/rfq_alerts_csv/', name='rfq_alerts_csv/', size=0, modificationTime=1766647469142),
 FileInfo(path='dbfs:/Volumes/workspace/default/frontrunningproject/output/rfq_alerts_json/', name='rfq_alerts_json/', size=0, modificationTime=1766647469142),
 FileInfo(path='dbfs:/Volumes/workspace/default/frontrunningproject/output/summary/', name='summary/', size=0, modificationTime=1766647469142)]

In [0]:
df = spark.read.option("header", True).csv(f"{output_path}/rfq_alerts_csv")
display(df)

rfq_id,client,dealer,pair,pre_trade_alert,benchmark_alert,latency_alert,execution_quality_alert,risk_score,risk_severity,risk_reasons
RFQ0004,CLT004,DLR01,AUDUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0005,CLT005,DLR02,EURUSD,1,0,1,0,4,MEDIUM,PRE_TRADE|LATENCY
RFQ0006,CLT006,DLR03,USDJPY,0,0,0,0,0,LOW,NO_RISK
RFQ0007,CLT007,DLR01,GBPUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0008,CLT008,DLR02,AUDUSD,0,0,1,0,1,LOW,LATENCY
RFQ0009,CLT009,DLR03,EURUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0010,CLT010,DLR01,USDJPY,1,0,1,0,4,MEDIUM,PRE_TRADE|LATENCY
RFQ0011,CLT011,DLR02,GBPUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0012,CLT012,DLR03,AUDUSD,0,0,1,0,1,LOW,LATENCY
RFQ0016,CLT016,DLR01,AUDUSD,0,0,0,0,0,LOW,NO_RISK


In [0]:
df = spark.read.option("header", True).json(f"{output_path}/rfq_alerts_json")
display(df)

benchmark_alert,client,dealer,execution_quality_alert,latency_alert,pair,pre_trade_alert,rfq_id,risk_reasons,risk_score,risk_severity
0,CLT004,DLR01,0,0,AUDUSD,0,RFQ0004,List(NO_RISK),0,LOW
0,CLT005,DLR02,0,1,EURUSD,1,RFQ0005,"List(PRE_TRADE, LATENCY)",4,MEDIUM
0,CLT006,DLR03,0,0,USDJPY,0,RFQ0006,List(NO_RISK),0,LOW
0,CLT007,DLR01,0,0,GBPUSD,0,RFQ0007,List(NO_RISK),0,LOW
0,CLT008,DLR02,0,1,AUDUSD,0,RFQ0008,List(LATENCY),1,LOW
0,CLT009,DLR03,0,0,EURUSD,0,RFQ0009,List(NO_RISK),0,LOW
0,CLT010,DLR01,0,1,USDJPY,1,RFQ0010,"List(PRE_TRADE, LATENCY)",4,MEDIUM
0,CLT011,DLR02,0,0,GBPUSD,0,RFQ0011,List(NO_RISK),0,LOW
0,CLT012,DLR03,0,1,AUDUSD,0,RFQ0012,List(LATENCY),1,LOW
0,CLT016,DLR01,0,0,AUDUSD,0,RFQ0016,List(NO_RISK),0,LOW


In [0]:
summary_outfile_df = spark.read.json(f"{output_path}/summary")
display(summary_outfile_df)

benchmark_count,execution_quality_count,high_risk_rfqs,latency_count,pre_trade_count
4,4,4,16,16


In [0]:
dbutils.fs.ls(output_path)

display(spark.read.option("header", True).csv(f"{output_path}/rfq_alerts_csv"))
display(spark.read.json(f"{output_path}/rfq_alerts_json"))
display(spark.read.json(f"{output_path}/summary"))


rfq_id,client,dealer,pair,pre_trade_alert,benchmark_alert,latency_alert,execution_quality_alert,risk_score,risk_severity,risk_reasons
RFQ0004,CLT004,DLR01,AUDUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0005,CLT005,DLR02,EURUSD,1,0,1,0,4,MEDIUM,PRE_TRADE|LATENCY
RFQ0006,CLT006,DLR03,USDJPY,0,0,0,0,0,LOW,NO_RISK
RFQ0007,CLT007,DLR01,GBPUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0008,CLT008,DLR02,AUDUSD,0,0,1,0,1,LOW,LATENCY
RFQ0009,CLT009,DLR03,EURUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0010,CLT010,DLR01,USDJPY,1,0,1,0,4,MEDIUM,PRE_TRADE|LATENCY
RFQ0011,CLT011,DLR02,GBPUSD,0,0,0,0,0,LOW,NO_RISK
RFQ0012,CLT012,DLR03,AUDUSD,0,0,1,0,1,LOW,LATENCY
RFQ0016,CLT016,DLR01,AUDUSD,0,0,0,0,0,LOW,NO_RISK


benchmark_alert,client,dealer,execution_quality_alert,latency_alert,pair,pre_trade_alert,rfq_id,risk_reasons,risk_score,risk_severity
0,CLT004,DLR01,0,0,AUDUSD,0,RFQ0004,List(NO_RISK),0,LOW
0,CLT005,DLR02,0,1,EURUSD,1,RFQ0005,"List(PRE_TRADE, LATENCY)",4,MEDIUM
0,CLT006,DLR03,0,0,USDJPY,0,RFQ0006,List(NO_RISK),0,LOW
0,CLT007,DLR01,0,0,GBPUSD,0,RFQ0007,List(NO_RISK),0,LOW
0,CLT008,DLR02,0,1,AUDUSD,0,RFQ0008,List(LATENCY),1,LOW
0,CLT009,DLR03,0,0,EURUSD,0,RFQ0009,List(NO_RISK),0,LOW
0,CLT010,DLR01,0,1,USDJPY,1,RFQ0010,"List(PRE_TRADE, LATENCY)",4,MEDIUM
0,CLT011,DLR02,0,0,GBPUSD,0,RFQ0011,List(NO_RISK),0,LOW
0,CLT012,DLR03,0,1,AUDUSD,0,RFQ0012,List(LATENCY),1,LOW
0,CLT016,DLR01,0,0,AUDUSD,0,RFQ0016,List(NO_RISK),0,LOW


benchmark_count,execution_quality_count,high_risk_rfqs,latency_count,pre_trade_count
4,4,4,16,16
