# Create a new spark Datasource
Use this notebook to configure a new spark Datasource and add it to your project.

In [1]:
import great_expectations as ge
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
#context = ge.get_context()

In [13]:
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import S3StoreBackendDefaults

data_context_config = DataContextConfig(
    datasources={},
    store_backend_defaults=S3StoreBackendDefaults(default_bucket_name="lake-dev"),
)
context = BaseDataContext(project_config=data_context_config)

TypeError: __init__() got an unexpected keyword argument 'endpoint_url'

21/10/02 22:45:05 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
21/10/02 22:45:05 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:873)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:154)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:262)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:169)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess

## Customize Your Datasource Configuration

**If you are new to Great Expectations Datasources,** you should check out our [how-to documentation](https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_datasources.html)

**My configuration is not so simple - are there more advanced options?**
Glad you asked! Datasources are versatile. Please see our [How To Guides](https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_datasources.html)!

Give your datasource a unique name:

In [3]:
datasource_name = "minio_5"

### For files based Datasources:
Here we are creating an example configuration.  The configuration contains an **InferredAssetFilesystemDataConnector** which will add a Data Asset for each file in the base directory you provided. It also contains a **RuntimeDataConnector** which can accept filepaths.   This is just an example, and you may customize this as you wish!

Also, if you would like to learn more about the **DataConnectors** used in this configuration, including other methods to organize assets, handle multi-file assets, name assets based on parts of a filename, please see our docs on [InferredAssetDataConnectors](https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_datasources/how_to_configure_an_inferredassetdataconnector.html) and [RuntimeDataConnectors](https://docs.greatexpectations.io/en/latest/guides/how_to_guides/creating_batches/how_to_configure_a_runtime_data_connector.html). 


In [4]:
#spark.hadoop.fs.s3a.access.key": minio_conn.login,
#spark.hadoop.fs.s3a.secret.key": minio_conn.password,
#spark.hadoop.fs.s3a.endpoint": "http{}://{}:{}".format("", minio_conn.host, minio_conn.port),
example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
    class_name: SparkDFExecutionEngine
    spark_config: 
        spark.master: "spark://0.0.0.0:7077"
        spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
        spark.hadoop.fs.s3a.path.style.access: true
        spark.hadoop.fs.s3a.access.key: "admin"
        spark.hadoop.fs.s3a.secret.key: "q8Lyl30TwfiOsr7qzeim"
        spark.hadoop.fs.s3a.endpoint": "http://rpi.home.net:9000"
        spark.jars.packages: "org.apache.hadoop:hadoop-aws:2.7.3"
data_connectors:
    default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
            - default_identifier_name
    default_inferred_data_connector_name:
        class_name: InferredAssetS3DataConnector
        bucket: lake-dev
        prefix: /02_silver/
        endpoint_url: http://rpi.home.net:9000
        boto3_options:
          endpoint_url: http://rpi.home.net:9000
          aws_access_key_id: admin
          aws_secret_access_key: q8Lyl30TwfiOsr7qzeim
        default_regex:
            pattern: (.*)\.parquet
            group_names:
                - data_asset_name

"""
print(example_yaml)


name: minio_5
class_name: Datasource
execution_engine:
    class_name: SparkDFExecutionEngine
    spark_config: 
        spark.master: "spark://0.0.0.0:7077"
        spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
        spark.hadoop.fs.s3a.path.style.access: true
        spark.hadoop.fs.s3a.access.key: "admin"
        spark.hadoop.fs.s3a.secret.key: "q8Lyl30TwfiOsr7qzeim"
        spark.hadoop.fs.s3a.endpoint": "http://rpi.home.net:9000"
        spark.jars.packages: "org.apache.hadoop:hadoop-aws:2.7.3"
data_connectors:
    default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
            - default_identifier_name
    default_inferred_data_connector_name:
        class_name: InferredAssetS3DataConnector
        bucket: lake-dev
        prefix: /02_silver/
        boto3_options:
          endpoint_url: http://rpi.home.net:9000
          aws_access_key_id: admin
          aws_secret_access_key: q8Lyl30TwfiOsr7qzeim
  

# Test Your Datasource Configuration
Here we will test your Datasource configuration to make sure it is valid.

This `test_yaml_config()` function is meant to enable fast dev loops. **If your
configuration is correct, this cell will show you some snippets of the data
assets in the data source.** You can continually edit your Datasource config
yaml and re-run the cell to check until the new config is valid.

If you instead wish to use python instead of yaml to configure your Datasource,
you can use `context.add_datasource()` and specify all the required parameters.

In [5]:
context.test_yaml_config(yaml_config=example_yaml)

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
:: loading settings :: url = jar:file:/home/melo/miniconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/melo/.ivy2/cache
The jars for the packages stored in: /home/melo/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0a8af237-61ff-435b-bdca-281fa8c595bc;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.3 in central
	found org.apache.hadoop#hadoop-common;2.7.3 in central
	found org.apache.hadoop#hadoop-annotations;2.7.3 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-collections

	Successfully instantiated Datasource


ExecutionEngine class name: SparkDFExecutionEngine
Data Connectors:
	default_inferred_data_connector_name : InferredAssetS3DataConnector

	Available data_asset_names (3 of 3):
		02_silver/awl_bg/part-00000-42655813-c3f6-408e-9fea-b11c5f1f605a-c000.snappy (1 of 1): ['02_silver/awl_bg/part-00000-42655813-c3f6-408e-9fea-b11c5f1f605a-c000.snappy.parquet']
		02_silver/awl_bg/part-00000-73c20957-7bd7-4eb3-b7bc-a6a65503fc2a-c000.snappy (1 of 1): ['02_silver/awl_bg/part-00000-73c20957-7bd7-4eb3-b7bc-a6a65503fc2a-c000.snappy.parquet']
		02_silver/awl_bg/part-00000-f565697a-6f21-488a-9217-979226a95002-c000.snappy (1 of 1): ['02_silver/awl_bg/part-00000-f565697a-6f21-488a-9217-979226a95002-c000.snappy.parquet']

	Unmatched data_references (3 of 3):['02_silver/awl_bg/_delta_log/00000000000000000000.json', '02_silver/awl_bg/_delta_log/00000000000000000001.json', '02_silver/awl_bg/_delta_log/00000000000000000002.json']

	default_runtime_data_connector_name:Run

<great_expectations.datasource.new_datasource.Datasource at 0x7f9e12074fa0>

## Save Your Datasource Configuration
Here we will save your Datasource in your Data Context once you are satisfied with the configuration. Note that `overwrite_existing` defaults to False, but you may change it to True if you wish to overwrite. Please note that if you wish to include comments you must add them directly to your `great_expectations.yml`.

In [6]:
sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=True)
context.list_datasources()

21/10/02 22:36:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/10/02 22:36:09 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


[{'class_name': 'Datasource',
  'module_name': 'great_expectations.datasource',
  'data_connectors': {'default_runtime_data_connector_name': {'module_name': 'great_expectations.datasource.data_connector',
    'batch_identifiers': ['default_identifier_name'],
    'class_name': 'RuntimeDataConnector'},
   'default_inferred_data_connector_name': {'module_name': 'great_expectations.datasource.data_connector',
    'default_regex': {'pattern': '(.*)\\.parquet',
     'group_names': ['data_asset_name']},
    'prefix': '/02_silver/',
    'bucket': 'lake-dev',
    'boto3_options': {'endpoint_url': 'http://rpi.home.net:9000',
     'aws_access_key_id': 'admin',
     'aws_secret_access_key': 'q8Lyl30TwfiOsr7qzeim'},
    'class_name': 'InferredAssetS3DataConnector'}},
  'execution_engine': {'class_name': 'SparkDFExecutionEngine',
   'module_name': 'great_expectations.execution_engine',
   'spark_config': {'spark.master': 'spark://0.0.0.0:7077',
    'spark.hadoop.fs.s3a.impl': 'org.apache.hadoop.fs.s

In [7]:
from great_expectations.core.batch import Batch, BatchRequest, RuntimeBatchRequest

In [8]:
batch_request = BatchRequest(
    datasource_name="minio_5",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name="02_silver/awl_bg/part-00000-42655813-c3f6-408e-9fea-b11c5f1f605a-c000.snappy",
    batch_spec_passthrough={
        "reader_method": "parquet",
        "boto3_options": {
            "endpoint_url": "http://rpi.home.net:9000"
        },
        "endpoint_url": "http://rpi.home.net:9000"
    }
)

In [11]:
batch_request = RuntimeBatchRequest(
    datasource_name="minio_5",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="teste_asset",  # this can be anything that identifies this data_asset for you
    runtime_parameters={"path": "02_silver/awl_bg/part-00000-42655813-c3f6-408e-9fea-b11c5f1f605a-c000.snappy.parquet", "boto3_options": {
            "endpoint_url": "http://rpi.home.net:9000"
        },
        "endpoint_url": "http://rpi.home.net:9000"},  # Add your S3 path here.
    batch_identifiers={"default_identifier_name": "default_identifier"},
)

In [21]:
import os
os.environ["AWS_ACCESS_KEY_ID"] = "admin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "q8Lyl30TwfiOsr7qzeim"
os.environ["AWS_ENDPOINT_URL"] = "http://rpi.home.net:9000"
os.environ["BOTO_CONFIG"] = "~/.aws/credential"

In [12]:
context.create_expectation_suite(
    expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())

ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjectsV2 operation: The AWS Access Key Id you provided does not exist in our records.

Now you can close this notebook and delete it!