## Create Input DataFrames

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#### Opening Inventory

In [0]:
opening_inventory = spark.createDataFrame([
    ("Dolo 650", 1000),
    ("Azithromycin 500", 300),
    ("Pantoprazole 40", 500),
    ("Cetirizine 10", 400)
], ["medicine_master", "opening_stock"])


#### Purchase Register

In [0]:
purchase_register = spark.createDataFrame([
    ("2024-08-01", "Dolo 650", 500),
    ("2024-08-02", "Azithromycin 500", 200)
], ["date", "medicine_invoice", "qty_purchased"])


#### Sales Register (HMS)

In [0]:
sales_register = spark.createDataFrame([
    ("B101", "Dolo", 10),
    ("B102", "Dolo 650", 15),
    ("B103", "Dolo kind", 5),
    ("B141", "Azithro 500", 6),
    ("B142", "Azithromycin", 4)
], ["bill_no", "entered_medicine", "qty_sold"])


#### Closing Inventory

In [0]:
closing_inventory = spark.createDataFrame([
    ("Dolo 650", 1440),
    ("Azithromycin 500", 480)
], ["medicine_master", "closing_stock"])


### Identify High-Risk Medicines (One-Time)

In [0]:
high_risk_medicines = spark.createDataFrame([
    ("Dolo 650", "High volume + name variations"),
    ("Azithromycin 500", "Similar abbreviations"),
    ("Pantoprazole 40", "Frequently prescribed")
], ["medicine_master", "risk_reason"])


In [0]:
high_risk_medicines.show()


### Daily Name-Variation Mapping (10 mins/day)

In [0]:
medicine_mapping = spark.createDataFrame([
    ("Dolo", "Dolo 650"),
    ("Dolo 650", "Dolo 650"),
    ("Dolo kind", "Dolo 650"),
    ("Azithro 500", "Azithromycin 500"),
    ("Azithromycin", "Azithromycin 500")
], ["entered_medicine", "medicine_master"])


#### Map Sales to Master Name

In [0]:
sales_mapped = (
    sales_register
    .join(medicine_mapping, "entered_medicine", "left")
)


#### Flag Unmapped Variants

In [0]:
unmapped_variants = sales_mapped.filter(F.col("medicine_master").isNull())
unmapped_variants.show()


### Daily Usage Reasonableness Check

#### Aggregate Sales

In [0]:
sales_agg = (
    sales_mapped
    .groupBy("medicine_master")
    .agg(F.sum("qty_sold").alias("total_sold"))
)


#### Aggregate Purchases

In [0]:
purchase_agg = (
    purchase_register
    .groupBy("medicine_invoice")
    .agg(F.sum("qty_purchased").alias("total_purchased"))
    .withColumnRenamed("medicine_invoice", "medicine_master")
)


#### Expected vs Actual Closing Stock

In [0]:
inventory_check = (
    opening_inventory
    .join(purchase_agg, "medicine_master", "left")
    .join(sales_agg, "medicine_master", "left")
    .join(closing_inventory, "medicine_master", "left")
    .fillna(0)
    .withColumn(
        "expected_closing",
        F.col("opening_stock") + F.col("total_purchased") - F.col("total_sold")
    )
    .withColumn(
        "difference",
        F.col("closing_stock") - F.col("expected_closing")
    )
)


#### Flag > Â±2% Difference

In [0]:
audit_flags = inventory_check.withColumn(
    "status",
    F.when(F.abs(F.col("difference")) > F.col("expected_closing") * 0.02, "REVIEW")
     .otherwise("OK")
)

audit_flags.show()


### Random Bill Spot Check (Daily)

In [0]:
random_bills = (
    sales_mapped
    .orderBy(F.rand())
    .limit(3)
)

random_bills.show()


### Weekly Error Pattern Tracking

In [0]:
error_log = spark.createDataFrame([
    ("2024-08-01", "Dolo 650", "Name Variant", "Yes"),
    ("2024-08-03", "Dolo 650", "Wrong Quantity", "No")
], ["date", "medicine", "error_type", "repeated"])


#### Weekly Decision Rule

In [0]:
weekly_summary = (
    error_log
    .groupBy("medicine", "error_type")
    .count()
    .filter(F.col("count") >= 3)
)

weekly_summary.show()


### Doctor Escalation Rules 

In [0]:
doctor_escalation = audit_flags.filter(
    (F.col("status") == "REVIEW") &
    (F.abs(F.col("difference")) > 50)
)

doctor_escalation.show()
