#### Parameters To Be Passed
- File path / Table Name
- `checks.yml` path
- `configuration.yml` path
- Name of view to be created (should be same as the one specified in `checks.yml`)
- Data source to refer to (should be same as the one defined in `configuration.yml`)

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# import logging
# logging.basicConfig(level=logging.DEBUG)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 16:54:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
orders_p = "data/supestore_orders.csv"
returned_orders_p = "data/supestore_returns.csv"

In [3]:
df_orders = spark.read.csv(orders_p, header=True, inferSchema=True, sep="|")
df_returned_orders = spark.read.csv(returned_orders_p, header=True, inferSchema=True, sep="|")

In [4]:
df_orders.show(2)

+------+--------------+----------+----------+------------+-----------+-------------+--------+-------------+---------+--------+-----------+------+---------------+---------+------------+--------------------+-----------------+--------+--------+------------------+
|row_id|      order_id|order_date| ship_date|   ship_mode|customer_id|customer_name| segment|      country|     city|   state|postal_code|region|     product_id| category|sub_category|        product_name|            sales|quantity|discount|            profit|
+------+--------------+----------+----------+------------+-----------+-------------+--------+-------------+---------+--------+-----------+------+---------------+---------+------------+--------------------+-----------------+--------+--------+------------------+
|     1|CA-2018-152156|2018-11-08|2018-11-07|Second Class|   CG-12520|  Claire Gute|Consumer|United States|Henderson|Kentucky|    42420.0| South|FUR-BO-10001798|Furniture|   Bookcases|Bush Somerset Col...|           2

In [5]:
df_returned_orders.show(2)

+--------+--------------+
|returned|      order_id|
+--------+--------------+
|     Yes|CA-2016-100762|
|     Yes|CA-2016-100762|
+--------+--------------+
only showing top 2 rows



In [6]:
df_orders.createOrReplaceTempView("orders_vw")

In [7]:
config_p = "configuration.yml"
checks_p = "checks/orders.yml"

In [8]:
from soda.scan import Scan

scan = Scan()

scan.set_data_source_name("spark_df")
scan.add_configuration_yaml_file(config_p)
scan.add_sodacl_yaml_file(checks_p)
scan.add_spark_session(spark)

In [9]:
scan.execute()

CodeCache: size=131072Kb used=31067Kb max_used=31079Kb free=100004Kb
 bounds [0x000000010f1e8000, 0x0000000111068000, 0x00000001171e8000]
 total_blobs=11383 nmethods=10425 adapters=866
 compilation: disabled (not enough contiguous free space left)




2

In [10]:
print(scan.get_logs_text())

INFO   | Soda Core 3.3.18
INFO   | Scan summary:
INFO   | 4/7 checks PASSED: 
INFO   |     orders_vw in spark_df
INFO   |       missing_count(row_id) = 0 [PASSED]
INFO   |       duplicate_count(row_id) = 0 [PASSED]
INFO   |       missing_count(order_id) = 0 [PASSED]
INFO   |       missing_count(order_date) = 0 [PASSED]
INFO   | 3/7 checks FAILED: 
INFO   |     orders_vw in spark_df
INFO   |       row_count > 10000 [FAILED]
INFO   |         check_value: 9993
INFO   |       count_order_date_after_ship_date = 0 [FAILED]
INFO   |         check_value: 1.0
INFO   |       Schema Check [FAILED]
INFO   |         fail_missing_column_names = [year]
INFO   |         schema_measured = [row_id int, order_id string, order_date date, ship_date date, ship_mode string, customer_id string, customer_name string, segment string, country string, city string, state string, postal_code double, region string, product_id string, category string, sub_category string, product_name string, sales double, quantity i

In [11]:
scan.get_scan_results()

{'definitionName': None,
 'defaultDataSource': 'spark_df',
 'dataTimestamp': '2024-08-24T11:24:52+00:00',
 'scanStartTimestamp': '2024-08-24T11:24:52+00:00',
 'scanEndTimestamp': '2024-08-24T11:24:53+00:00',
 'hasErrors': False,
 'hasFailures': True,
 'metrics': [{'identity': 'metric-spark_df-orders_vw-schema',
   'metricName': 'schema',
   'dataSourceName': 'spark_df',
   'tableName': 'orders_vw',
   'partitionName': None,
   'columnName': None,
   'value': [{'columnName': 'row_id', 'sourceDataType': 'int'},
    {'columnName': 'order_id', 'sourceDataType': 'string'},
    {'columnName': 'order_date', 'sourceDataType': 'date'},
    {'columnName': 'ship_date', 'sourceDataType': 'date'},
    {'columnName': 'ship_mode', 'sourceDataType': 'string'},
    {'columnName': 'customer_id', 'sourceDataType': 'string'},
    {'columnName': 'customer_name', 'sourceDataType': 'string'},
    {'columnName': 'segment', 'sourceDataType': 'string'},
    {'columnName': 'country', 'sourceDataType': 'string'},