# Create a new spark Datasource
Use this notebook to configure a new spark Datasource and add it to your project.

In [2]:
import great_expectations as ge
import os
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
from ruamel import yaml

import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    InMemoryStoreBackendDefaults,
)

## Customize Your Datasource Configuration

**If you are new to Great Expectations Datasources,** you should check out our [how-to documentation](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview)

**My configuration is not so simple - are there more advanced options?**
Glad you asked! Datasources are versatile. Please see our [How To Guides](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview)!

Give your datasource a unique name:

In [27]:
datasource_name = "my_spark_dataframe"

In [None]:
store_backend_defaults = InMemoryStoreBackendDefaults()
data_context_config = DataContextConfig(
    store_backend_defaults=store_backend_defaults,
    checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
)
context = BaseDataContext(project_config=data_context_config)

### For files based Datasources:
Here we are creating an example configuration.  The configuration contains an **InferredAssetFilesystemDataConnector** which will add a Data Asset for each file in the base directory you provided. It also contains a **RuntimeDataConnector** which can accept filepaths.   This is just an example, and you may customize this as you wish!

Also, if you would like to learn more about the **DataConnectors** used in this configuration, including other methods to organize assets, handle multi-file assets, name assets based on parts of a filename, please see our docs on [InferredAssetDataConnectors](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_configure_an_inferredassetdataconnector) and [RuntimeDataConnectors](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_configure_a_runtimedataconnector).


In [None]:
datasource_yaml = f"""
name: my_spark_dataframe
class_name: Datasource
execution_engine:
    class_name: SparkDFExecutionEngine
data_connectors:
    default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
            - batch_id
"""

# Test Your Datasource Configuration
Here we will test your Datasource configuration to make sure it is valid.

This `test_yaml_config()` function is meant to enable fast dev loops. **If your
configuration is correct, this cell will show you some snippets of the data
assets in the data source.** You can continually edit your Datasource config
yaml and re-run the cell to check until the new config is valid.

If you instead wish to use python instead of yaml to configure your Datasource,
you can use `context.add_datasource()` and specify all the required parameters.

In [None]:
context.test_yaml_config(datasource_yaml)

In [None]:
context.add_datasource(**yaml.load(datasource_yaml))

#### Conneting the spark local and create my dataframe:

In [3]:
from pyspark.sql import SparkSession
import pandas as pd
context = ge.get_context()

22/08/27 16:49:29 WARN Utils: Your hostname, dell-inspiron resolves to a loopback address: 127.0.1.1; using 172.18.181.232 instead (on interface eth0)
22/08/27 16:49:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/27 16:49:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [35]:
spark = SparkSession.builder.master("spark://spark:7077").appName("great_expectation").getOrCreate()

ConnectionRefusedError: [Errno 111] Connection refused

#### Load the file and create Dataframe:

In [24]:
df = spark.read.option("header", True).csv("../../data/Educacao_Basica_2018 - Matricula_Norte.csv", sep="|")

#### TEST YOUT NEW DATASOURCE

In [25]:
type(df)

pyspark.sql.dataframe.DataFrame

In [28]:
batch_request = RuntimeBatchRequest(
    datasource_name=datasource_name,
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="my_spark_dataframe",  # This can be anything that identifies this data_asset for you
    batch_identifiers={"batch_id": "default_identifier"},
    runtime_parameters={"batch_data": df},  # Your dataframe goes here
)

#### the  load data into the Validator:

In [29]:
context.create_expectation_suite(
    expectation_suite_name="spark_educacao_basica_suite", overwrite_existing=True
)

{
  "ge_cloud_id": null,
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.15.17"
  },
  "expectation_suite_name": "spark_educacao_basica_suite",
  "expectations": []
}

## Save Your Datasource Configuration
Here we will save your Datasource in your Data Context once you are satisfied with the configuration. Note that `overwrite_existing` defaults to False, but you may change it to True if you wish to overwrite. Please note that if you wish to include comments you must add them directly to your `great_expectations.yml`.

In [30]:
validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="spark_educacao_basica_suite"
)
print(validator.head())

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

22/08/27 17:03:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
  NU_ANO_CENSO                          ID_ALUNO ID_MATRICULA NU_DIA NU_MES  \
0         2018  B587FA609EFCFCAF6786CE479E30E59E    281920109     27      9   
1         2018  D1C0B9634BCD926A0B7D35ED1C40C785    168422655      7      7   
2         2018  AEC84DC24F21E017F4C076E7C0542568    168926791     21      9   
3         2018  C938C2CD9F950AD5C17D12C630B5D0B1    281917630     19     10   
4         2018  75FE0FA9F9877DE92FFB40E79E8C1557    281909633     15      9   

  NU_ANO NU_IDADE_REFERENCIA NU_IDADE NU_DURACAO_TURMA  \
0   1989                  28       29              270   
1   2010                   7        8              255   
2   2010                   7        8              255   
3   2014                   3        4              255   
4   2010                   7        8              255 

In [31]:
context.list_datasources()

[{'execution_engine': {'module_name': 'great_expectations.execution_engine',
   'class_name': 'SparkDFExecutionEngine'},
  'data_connectors': {'default_runtime_data_connector_name': {'module_name': 'great_expectations.datasource.data_connector',
    'class_name': 'RuntimeDataConnector',
    'batch_identifiers': ['batch_id']}},
  'module_name': 'great_expectations.datasource',
  'class_name': 'Datasource',
  'name': 'my_spark_dataframe'}]

#### Create expectations in my suite:

In [32]:
suite = context.get_expectation_suite("spark_educacao_basica_suite")

In [33]:
import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset

my_df = SparkDFDataset(df)

In [34]:
my_df.expect_column_median_to_be_between(column=["NU_DURACAO_TURMA"], min_value=20, max_value=100)
my_df.expect_column_values_to_be_in_set(column=["NU_IDADE"], value_set=[29, 30, 20])

[Stage 7:>                                                         (0 + 8) / 10]

22/08/27 17:04:12 WARN MemoryStore: Not enough space to cache rdd_66_7 in memory! (computed 38.8 MiB so far)
22/08/27 17:04:12 WARN BlockManager: Persisting block rdd_66_7 to disk instead.
22/08/27 17:04:21 WARN MemoryStore: Not enough space to cache rdd_66_2 in memory! (computed 73.3 MiB so far)
22/08/27 17:04:21 WARN BlockManager: Persisting block rdd_66_2 to disk instead.
22/08/27 17:04:22 WARN MemoryStore: Not enough space to cache rdd_66_5 in memory! (computed 75.2 MiB so far)
22/08/27 17:04:22 WARN MemoryStore: Not enough space to cache rdd_66_4 in memory! (computed 74.4 MiB so far)
22/08/27 17:04:22 WARN BlockManager: Persisting block rdd_66_4 to disk instead.
22/08/27 17:04:22 WARN MemoryStore: Not enough space to cache rdd_66_1 in memory! (computed 73.0 MiB so far)
22/08/27 17:04:22 WARN BlockManager: Persisting block rdd_66_1 to disk instead.
22/08/27 17:04:22 WARN BlockManager: Persisting block rdd_66_5 to disk instead.
22/08/27 17:04:23 WARN MemoryStore: Not enough space to

[Stage 7:>                                                         (0 + 8) / 10]

22/08/27 17:05:11 WARN BlockManager: Block rdd_66_4 could not be removed as it was not found on disk or in memory
22/08/27 17:05:11 ERROR Executor: Exception in task 4.0 in stage 7.0 (TID 11)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:361)
	at org.apache.spark.sql.execution.columnar.ColumnBuilder$.ensureFreeSpace(ColumnBuilder.scala:161)
	at org.apache.spark.sql.execution.columnar.NullableColumnBuilder.appendFrom(NullableColumnBuilder.scala:57)
	at org.apache.spark.sql.execution.columnar.NullableColumnBuilder.appendFrom$(NullableColumnBuilder.scala:54)
	at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:98)
	at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder.appendFrom(CompressibleColumnBuilder.scala:78)

[Stage 7:>                                                         (0 + 9) / 10]

22/08/27 17:05:11 WARN BlockManager: Putting block rdd_66_6 failed due to exception org.apache.spark.TaskKilledException.
22/08/27 17:05:11 WARN BlockManager: Block rdd_66_6 could not be removed as it was not found on disk or in memory
22/08/27 17:05:11 WARN BlockManager: Putting block rdd_66_7 failed due to exception org.apache.spark.TaskKilledException.
22/08/27 17:05:11 WARN BlockManager: Putting block rdd_66_3 failed due to exception org.apache.spark.TaskKilledException.
22/08/27 17:05:11 WARN BlockManager: Block rdd_66_7 could not be removed as it was not found on disk or in memory
22/08/27 17:05:11 WARN TaskSetManager: Lost task 6.0 in stage 7.0 (TID 13) (172.18.181.232 executor driver): TaskKilled (Stage cancelled)
22/08/27 17:05:11 WARN TaskSetManager: Lost task 7.0 in stage 7.0 (TID 14) (172.18.181.232 executor driver): TaskKilled (Stage cancelled)
22/08/27 17:05:11 WARN BlockManager: Block rdd_66_3 could not be removed as it was not found on disk or in memory
22/08/27 17:05:1

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/airtonjr/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airtonjr/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/airtonjr/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/airtonjr/.local/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3398, in run_code
    exec(code_obj, self.user_globa

ConnectionRefusedError: [Errno 111] Connection refused