In [1]:
from pyspark.sql import SparkSession
from soda.scan import Scan

In [2]:
spark = SparkSession \
    .builder \
    .appName("DQ") \
    .getOrCreate()

24/08/24 14:36:02 WARN Utils: Your hostname, ahmad-HP-ZBook-17 resolves to a loopback address: 127.0.1.1; using 192.168.1.35 instead (on interface wlo1)
24/08/24 14:36:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 14:36:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.option("header", True).option("inferSchema", True).csv("../batchs/sampled_data_1.csv").drop('index')
new_columns = [col.replace(" ", "_").replace("-", "_") for col in df.columns]
df = df.toDF(*new_columns)
df.show()

                                                                                

+-------------------+--------+--------------------+----------+-------------+------------------+--------+----+--------------+---+--------+------+----------+----------------+----------------+------------+-----+------------+----+--------+
|           Order_ID|    Date|              Status|Fulfilment|ORDERS_Channel|ship_service_level|Category|Size|Courier_Status|Qty|currency|Amount| ship_city|      ship_state|ship_postal_code|ship_country|  B2B|fulfilled_by| New|PendingS|
+-------------------+--------+--------------------+----------+-------------+------------------+--------+----+--------------+---+--------+------+----------+----------------+----------------+------------+-----+------------+----+--------+
|408-6820155-3521913|04-30-22|             Shipped|    Amazon|    Amazon.in|         Expedited| T-shirt| 3XL|       Shipped|  1|     INR| 751.0|    TEZPUR|           ASSAM|          784001|          IN|false|        NULL|NULL|    NULL|
|405-8114272-1687512|04-30-22|             Shipped|    

In [4]:
df.createOrReplaceTempView('ORDERS')

In [5]:
df.count()

25

In [6]:
df.columns

['Order_ID',
 'Date',
 'Status',
 'Fulfilment',
 'ORDERS_Channel',
 'ship_service_level',
 'Category',
 'Size',
 'Courier_Status',
 'Qty',
 'currency',
 'Amount',
 'ship_city',
 'ship_state',
 'ship_postal_code',
 'ship_country',
 'B2B',
 'fulfilled_by',
 'New',
 'PendingS']

In [7]:
checks = """
checks for ORDERS:
  - schema:
      fail:
        when required column missing:
          - Order_ID
          - Date
          - Qty
          - Amount

  - row_count > 0

  - duplicate_count(Order_ID, Category, Date, Amount):
      warn: when > 0

  - missing_percent(Order_ID):
      fail: when > 10%
      warn: when > 5%

  - missing_percent(Amount):
      warn: when > 10%

  - missing_percent(Qty):
      warn: when > 10%

  - min(Amount):
      warn: when < 0

  - min(Qty):
      warn: when < 0

  - invalid_percent(Date):
      fail: when > 10%
      valid format: date us
  
  - missing_percent(Status):
      warn: when > 10%
  
  - missing_percent(ORDERS_Channel):
      warn: when > 10%

  - missing_percent(ship_country):
      warn: when > 5%
"""
scan = Scan()
scan.set_scan_definition_name("test")
scan.set_data_source_name("Amazon")
scan.add_spark_session(spark, data_source_name="Amazon")
scan.add_sodacl_yaml_str(checks)
scan.execute()
# Check the Scan object for methods to inspect the scan result; the following prints all logs to console
print(scan.get_logs_text())

INFO   | Soda Core 3.3.18
INFO   | Using DefaultSampler
INFO   | Scan summary:
INFO   | 11/12 checks PASSED: 
INFO   |     ORDERS in Amazon
INFO   |       Schema Check [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |       duplicate_count(Order_ID, Category, Date, Amount) warn when > 0 [PASSED]
INFO   |       missing_percent(Order_ID) warn when > 5% fail when > 10% [PASSED]
INFO   |       min(Amount) warn when < 0 [PASSED]
INFO   |       missing_percent(Qty) warn when > 10% [PASSED]
INFO   |       min(Qty) warn when < 0 [PASSED]
INFO   |       invalid_percent(Date) fail when > 10% [PASSED]
INFO   |       missing_percent(Status) warn when > 10% [PASSED]
INFO   |       missing_percent(ORDERS_Channel) warn when > 10% [PASSED]
INFO   |       missing_percent(ship_country) warn when > 5% [PASSED]
INFO   | 1/12 checks WARNED: 
INFO   |     ORDERS in Amazon
INFO   |       missing_percent(Amount) warn when > 10% [WARNED]
INFO   |         check_value: 12.0
INFO   |         row_count: 25
I