# Pandas and Dask Tutorial

## Importing the libraries

In [1]:
import dask.datasets
import pandas as pd
from soda.scan import Scan

## Create Artifical Pandas and Dask Dataframes

In [2]:
# Load timeseries data from dask datasets
df_timeseries = dask.datasets.timeseries().reset_index()
df_timeseries["email"] = "a@soda.io"

df_timeseries.head()

Unnamed: 0,timestamp,name,id,x,y,email
0,2000-01-01 00:00:00,Kevin,993,0.961304,-0.794708,a@soda.io
1,2000-01-01 00:00:01,Kevin,1024,0.458458,0.976622,a@soda.io
2,2000-01-01 00:00:02,Michael,1028,0.008827,0.327489,a@soda.io
3,2000-01-01 00:00:03,Sarah,977,-0.025025,0.848088,a@soda.io
4,2000-01-01 00:00:04,Charlie,958,-0.955403,-0.577273,a@soda.io


In [3]:
# Create an artificial pandas dataframe
df_employee = pd.DataFrame(
    {
        "name": ["Bastien", "Titus", "Baturay"],
        "email": ["a@soda.io", "b@soda.io", "c@soda.io"],
    }
)

df_employee.head()

Unnamed: 0,name,email
0,Bastien,a@soda.io
1,Titus,b@soda.io
2,Baturay,c@soda.io


## Create a soda scan object

In [4]:
scan = Scan()
scan.set_scan_definition_name("dask and pandas tutorial")
scan.set_data_source_name("dask")

### Add dataframes to the soda scan object

In [5]:
# Add dask dataframe to scan and assign a dataset name to refer from checks yaml
scan.add_dask_dataframe(dataset_name="timeseries", dask_df=df_timeseries)

# Add pandas dataframe to scan and assign a dataset name to refer from checks yaml
scan.add_pandas_dataframe(dataset_name="employee", pandas_df=df_employee)

### Define checks in yaml format

In the first example, we will check row counts of the two dataframes.

In [6]:
# Define checks in yaml format
# alternatively you can refer to a yaml file using scan.add_sodacl_yaml_file(<filepath>)
row_count_checks = """
for each dataset T:
  datasets:
    - include %
  checks:
    - row_count > 0
"""
scan.add_sodacl_yaml_str(row_count_checks)
scan.execute()
print(scan.get_logs_text())


INFO:soda.scan:Instantiating for each for ['timeseries', 'employee']
INFO:soda.scan:[12:09:04] Scan summary:
INFO:soda.scan:[12:09:04] 2/2 checks PASSED: 
INFO:soda.scan:[12:09:04]     timeseries in dask
INFO:soda.scan:[12:09:04]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:04]     employee in dask
INFO:soda.scan:[12:09:04]       row_count > 0 [PASSED]


INFO   | Soda Core 3.0.46
INFO   | Scan summary:
INFO   | 2/2 checks PASSED: 
INFO   |     timeseries in dask
INFO   |       row_count > 0 [PASSED]
INFO   |     employee in dask
INFO   |       row_count > 0 [PASSED]


Now, we will apply a cross check between pandas and dask dataframes. We will check if the values of `employee.email` exist in `timeseries.email` dataframe. It is expected that the check will fail because `b@soda.io` and `c@soda.io` are not present in `timeseries.email` dataframe.

In [7]:
cross_table_checks = """
checks for employee:
    - values in (email) must exist in timeseries (email) # Error expected
    - row_count same as timeseries # Error expected
"""
scan.add_sodacl_yaml_str(cross_table_checks)
scan.execute()
print(scan.get_logs_text())

INFO:soda.scan:Instantiating for each for ['timeseries', 'employee', 'showtables']
INFO:soda.scan:[12:09:11] Using DefaultSampler
INFO:soda.scan:[12:09:11] Scan summary:
INFO:soda.scan:[12:09:11] 7/9 checks PASSED: 
INFO:soda.scan:[12:09:11]     timeseries in dask
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]     employee in dask
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11]     showtables in dask
INFO:soda.scan:[12:09:11]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:11] 2/9 checks FAILED: 
INFO:soda.scan:[12:09:11]     employee in dask
INFO:soda.scan:[12:09:11]       values in (email) must exist in timeseries (email) [FAILED]
INFO:soda.scan:[12:09:11]         value: 2
INFO:soda.scan:[12:09:11] 

INFO   | Soda Core 3.0.46
INFO   | Scan summary:
INFO   | 2/2 checks PASSED: 
INFO   |     timeseries in dask
INFO   |       row_count > 0 [PASSED]
INFO   |     employee in dask
INFO   |       row_count > 0 [PASSED]
INFO   | Using DefaultSampler
INFO   | Scan summary:
INFO   | 7/9 checks PASSED: 
INFO   |     timeseries in dask
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |     employee in dask
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |     showtables in dask
INFO   |       row_count > 0 [PASSED]
INFO   | 2/9 checks FAILED: 
INFO   |     employee in dask
INFO   |       values in (email) must exist in timeseries (email) [FAILED]
INFO   |         value: 2
INFO   |       row_count same as timeseries [FAILED]
INFO   |         value: -2591997
INFO   |         rowCount: 3
INFO   |         otherRowCount: 2592000


Add some custom checks for timeseries data

In [8]:
timeseries_checks = """
checks for timeseries:
  - invalid_count(email) = 0:
      valid format: email
  - valid_count(email) > 0:
      valid format: email
  - duplicate_count(name) < 4:
      samples limit: 2
  - missing_count(y):
      warn: when > -1
  - missing_percent(x) < 5%
  - missing_count(y) = 0
  - avg(x) between -1 and 1
  - max(x) > 0
  - min(x) < 1:
      filter: x > 0.2
  - freshness(timestamp) < 1d
  - values in (email) must exist in employee (email)
"""
scan.add_sodacl_yaml_str(timeseries_checks)
scan.execute()
print(scan.get_logs_text())

INFO:soda.scan:Instantiating for each for ['timeseries', 'employee', 'showtables']
INFO:soda.scan:[12:09:37] Using DefaultSampler
INFO:soda.scan:[12:09:43] Using DefaultSampler
INFO:soda.scan:[12:09:45] Scan summary:
INFO:soda.scan:[12:09:45] 23/30 checks PASSED: 
INFO:soda.scan:[12:09:45]     timeseries in dask
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       values in (email) must exist in employee (email) [PASSED]
INFO:soda.scan:[12:09:45]       row_count > 0 [PASSED]
INFO:soda.scan:[12:09:45]       invalid_count(email) = 0 [PASSED]
INFO:soda.scan:[12:09:45]       valid_count(email) > 0 [PASSED]
INFO:soda.scan:[12:09:45]       missing_count(y) = 0 [PASSED]
INFO:soda.scan:[12:09:45]       missing_percent(x) < 5% [PASSED]
INFO:soda

INFO   | Soda Core 3.0.46
INFO   | Scan summary:
INFO   | 2/2 checks PASSED: 
INFO   |     timeseries in dask
INFO   |       row_count > 0 [PASSED]
INFO   |     employee in dask
INFO   |       row_count > 0 [PASSED]
INFO   | Using DefaultSampler
INFO   | Scan summary:
INFO   | 7/9 checks PASSED: 
INFO   |     timeseries in dask
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |     employee in dask
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |       row_count > 0 [PASSED]
INFO   |     showtables in dask
INFO   |       row_count > 0 [PASSED]
INFO   | 2/9 checks FAILED: 
INFO   |     employee in dask
INFO   |       values in (email) must exist in timeseries (email) [FAILED]
INFO   |         value: 2
INFO   |       row_count same as timeseries [FAILED]
INFO   |         value: -2591997
INFO   |         rowCount: 3
INFO   |         otherRowCount: 2592000
INFO   | Using DefaultSampl