In [1]:
from ruamel import yaml

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context.types.base import (
    DataContextConfig,
    InMemoryStoreBackendDefaults,
)
from great_expectations.util import get_context

In [9]:
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig

data_context_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "my_spark_datasource": DatasourceConfig(
            class_name="Datasource",
            execution_engine={
                "class_name": "SparkDFExecutionEngine"
            },
        )
    },
    anonymous_usage_statistics={
      "enabled": True
    }
)

In [11]:
project_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "my_spark_datasource": {
            "data_asset_type": {
                "class_name": "SparkDFDataset",
                "module_name": "great_expectations.dataset",
            },
            "class_name": "SparkDFDatasource",
            "module_name": "great_expectations.datasource",
            "batch_kwargs_generators": {},
        }
    },
)

In [13]:
context = gx.get_context()

In [14]:
datasource_config = {
    "name": "my_spark_dataframe",
    "class_name": "Datasource",
    "execution_engine": {"class_name": "SparkDFExecutionEngine"},
    "data_connectors": {
        "default_runtime_data_connector_name": {
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": ["batch_id"],
        }
    },
}

In [15]:
context.test_yaml_config(yaml.dump(datasource_config))

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
	Successfully instantiated Datasource


ExecutionEngine class name: SparkDFExecutionEngine
Data Connectors:
	default_runtime_data_connector_name:RuntimeDataConnector

	Available data_asset_names (0 of 0):
		Note : RuntimeDataConnector will not have data_asset_names until they are passed in through RuntimeBatchRequest

	Unmatched data_references (0 of 0): []



<great_expectations.datasource.new_datasource.Datasource at 0x7f3a6f8ad600>

In [16]:
context.add_datasource(**datasource_config)

<great_expectations.datasource.new_datasource.Datasource at 0x7f3a702227d0>

In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [
    {"a": 1, "b": 2, "c": 3},
    {"a": 4, "b": 5, "c": 6},
    {"a": 7, "b": 8, "c": 9},
]

df = spark.createDataFrame(data)

In [22]:
batch_request = RuntimeBatchRequest(
    datasource_name="my_spark_dataframe",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="asset_spark_data_oeee",  # This can be anything that identifies this data_asset for you
    batch_identifiers={"batch_id": "default_identifier"},
    runtime_parameters={"batch_data": df},  # Your dataframe goes here
)

In [23]:
context.create_expectation_suite(
    expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

   a  b  c
0  1  2  3
1  4  5  6
2  7  8  9


+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

