## Introduction

This is the exploratory data analysis (EDA) template. It provides some initial lines of code that will help you start to explore your data. In the process of building out a Datascience workflow, we will all be doing EDA and this is a good place codify that and allow others to repeat your EDA.

The steps here are a template/tutorial, and you should feel free to overwrite/make these your own. If you find yourself changing/deleting/adding things repeatedly for each project that you build, that may be a good candidate for something that needs to be changed with the template itself. Please post suggestions in #datascience_team or DM @Jake Smart in slack.

The flow of this template is:
1. Configure environment
2. Get data 
3. Preprocess data
4. Profile the data
5. Do custom EDA (Your code goes in this section)
6. Generate Great Expectations (Required for all dagster flows)
7. Persist them 

*When committing this notebook to the repo, it is generally preferable to clear all the output for cleaner changelogs. We may look into packages that handle commits with outputs cleanly.*

***Run this template at least once prior to running a dagster to generate your expectations***

### Import data ingestion and profiling modules

In [None]:
from drizly_dagster_utils.integrations import snowflake
import pandas as pd
import seaborn as sns
from pandas_profiling import ProfileReport
!jupyter nbextension enable --py widgetsnbextension

## Data Ingestion

### Query
Note: If you are building a project that isn't going in Dagster and you want to change the configuration file, you'll need to adapt the access to the cfg Box object (the dot notation) to match the format of your configuration file

In [None]:
# The query gets read from config.py defined in the ds_util directory.
# The configuration: cfg gets it's definition from environment variables 
# (e.g. if DAGSTER_ENVIRONMENT = prod then prod_config.yaml gets read)
import os
os.chdir('..')
from pipelines.{{cookiecutter.project_slug}}.environments.dev_run_config import dev_run_config
config = dev_run_config
query = config["solids"]["get_{{cookiecutter.project_slug}}_df"]["inputs"]["query"]["value"]

In [None]:
# For visibility into what you are loading

print(f'Config environment variable used:{config}')
print(f'Config absolute path (In docker container if running there):{os.getcwd()}')
query

In [None]:
# This uses ~.aws/credentials to access snowflake (See cookiecutter setup if you are having issues here)
df = snowflake.SnowflakeConn.from_secret('snowflake/analytics_api').get_pandas_dataframe(query)

## Data Processing

In [None]:
common_timestamp_columns = ['date', 'created_at','updated_at','dt','timestamp','event_time']
for column in df.columns:
    if str(column).lower() in common_timestamp_columns:
        print(f"Updating {column} from object to datetime")
        df[column] = pd.to_datetime(df[column])

## EDA

Basic column information and counts

In [None]:
df.info()

##### Data profiling
This can be written to disk using the to_file(filename) command or it can be displayed directly

In [None]:
pr = ProfileReport(df, explorative = True)
pr.to_file("00-EDA-template-profile.html")

#### Pairwise analysis

In [None]:
# Set the default theme for seaborn
sns.set_theme()
sns.pairplot(df[df.columns])

# Generate Great Expectations

#### Import great expectations packages

In [None]:
import great_expectations as ge
import great_expectations.jupyter_ux
from great_expectations.data_context.types.resource_identifiers import ValidationResultIdentifier
from great_expectations.dataset import (
    PandasDataset,
    MetaPandasDataset,
)

#### Convert your dataframe into a GE dataframe

In [None]:
ge_df = ge.from_pandas(df)

#### Load the template 
The directory here is built by the command `great_expectations init,` but the cookiecutter template should have this already populated for you. This is where your expectations will get saved, and this is where Dagster will pull them from to run validation against flows.

In [None]:
# Load the great expectations context from 
print(config)
great_expecations_directory = config["resources"]["ge_data_context"]["config"]["ge_root_dir"]
context = ge.data_context.DataContext(context_root_dir=great_expecations_directory)
print(f"Loaded great expectations from {great_expecations_directory}")
# Name your suite whatever you want. This is the project template name
expectation_suite_name = "{{cookiecutter.project_slug}}.basic.warning"

# The arg overwrite_existing=True can be added if you want to overwrite stuff that is already there
es = context.create_expectation_suite(expectation_suite_name)

#### Configure the data you will use for creating expectations

In [None]:
# datasource can be changed to be non-pandas (a local file, a file in s3, another database)
# and the dataset will need to change accordingly
batch_kwargs = {
    'dataset': df,
    'datasource': "pandas"
}

batch = context.get_batch(batch_kwargs, expectation_suite_name)
#Test that the batch has data and see what it is
batch.head()

## Start to explore expectations

In [None]:
df.columns

### Write some expectations
Below are a few sample explicit expectations to get started.

In [None]:
batch.expect_column_values_to_not_be_null("store_id")

In [None]:
batch.expect_column_values_to_not_be_null(column="user_id")

In [None]:
batch.expect_column_values_to_be_between("store_order_total", min_value = 0.0, max_value = 10000.0)

In [None]:
batch.expect_column_values_to_not_be_in_set(column="store_state", value_set=["fakeville"] )

In [None]:
batch.expect_compound_columns_to_be_unique(["store_id", "user_id", "is_gift", "eta"])

In [None]:
suite = batch.get_expectation_suite()

### Persist the expectations so Dagster can use them

In [None]:
context.save_expectation_suite(suite, expectation_suite_name)

### Test expectations by running validate against a dataset

In [None]:
# validate
results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch])

# build the documentation for the result
validation_result_identifier = results.list_validation_result_identifiers()[0]
context.build_data_docs()
data_docs_urls = context.get_docs_sites_urls(
            resource_identifier=validation_result_identifier,
            only_if_exists=False,
        )
urls_to_open = [site["site_url"] for site in data_docs_urls]
url = urls_to_open[0].replace('file:///work/', 'http://localhost:8888/view/')

# You'll need to paste this in your browser if you are running this in the container
# if needed you can add `+f'?token={access_token}' to the end of the above line where the access token 
# is generated by your notebook server 
url

In [None]:
test_filepath =  "pipelines/{{cookiecutter.project_slug}}/data/test.csv"
df.sample(n=5).to_csv(test_filepath, index=False)