In [1]:
import pandas as pd

In [2]:
df = pd.read_csv(
    '../taxi-data/yellow_tripdata_sample_2020-09.csv', 
    nrows=100, 
    parse_dates=['pickup_datetime', 'dropoff_datetime'], 
)

In [3]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"vendor_id" REAL,
  "pickup_datetime" TIMESTAMP,
  "dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "rate_code_id" REAL,
  "store_and_fwd_flag" TEXT,
  "pickup_location_id" INTEGER,
  "dropoff_location_id" INTEGER,
  "payment_type" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [4]:
from sqlalchemy import create_engine

In [5]:
engine = create_engine('postgresql://postgres:example@db:5432/taxi_db')

In [6]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine)) 


CREATE TABLE yellow_taxi_data (
	vendor_id FLOAT(53), 
	pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	rate_code_id FLOAT(53), 
	store_and_fwd_flag TEXT, 
	pickup_location_id BIGINT, 
	dropoff_location_id BIGINT, 
	payment_type FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [7]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [9]:
df = pd.read_csv( 
    '../taxi-data/yellow_tripdata_sample_2020-09.csv', 
    parse_dates=['pickup_datetime', 'dropoff_datetime'] 
)

df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append', chunksize=1000)

10000

In [10]:
import great_expectations as gx

from great_expectations.checkpoint import Checkpoint

from sqlalchemy import create_engine

from sqlalchemy_utils import database_exists, create_database

context = gx.get_context()

PG_CONNECTION_STRING = "postgresql+psycopg2://postgres:example@db/taxi_db"

pg_datasource = context.sources.add_postgres(
    name="pg_datasource", connection_string=PG_CONNECTION_STRING
)

In [11]:
pg_datasource.add_table_asset(
    name="yellow_taxi_data", table_name="yellow_taxi_data"
)

batch_request = pg_datasource.get_asset("yellow_taxi_data").build_batch_request()

expectation_suite_name = "insert_your_expectation_suite_name_here"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

print(validator.head())

validator.expect_column_values_to_not_be_null(column="passenger_count")

validator.expect_column_values_to_be_between(
    column="congestion_surcharge", min_value=0, max_value=1000
)

validator.save_expectation_suite(discard_failed_expectations=False)

my_checkpoint_name = "my_sql_checkpoint"

checkpoint = Checkpoint(
    name=my_checkpoint_name,
    run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
    data_context=context,
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {"name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}},
    ],
)

checkpoint_result = checkpoint.run()

print(checkpoint.get_config().to_yaml_str())

# This may need to be run in a Jupyter notebook
context.open_data_docs()


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

   index  vendor_id     pickup_datetime    dropoff_datetime  passenger_count  \
0      0        2.0 2020-09-01 07:15:16 2020-09-01 07:22:47              1.0   
1      1        1.0 2020-09-16 12:50:33 2020-09-16 12:58:19              1.0   
2      2        1.0 2020-09-26 01:29:44 2020-09-26 01:44:32              1.0   
3      3        1.0 2020-09-12 17:33:04 2020-09-12 17:37:07              1.0   
4      4        2.0 2020-09-29 08:42:14 2020-09-29 08:54:59              1.0   

   trip_distance  rate_code_id store_and_fwd_flag  pickup_location_id  \
0           1.52           1.0                  N                 142   
1           0.90           1.0                  N                 244   
2           4.60           1.0                  N                 100   
3           0.80           1.0                  N                 140   
4           1.49           1.0                  N                 238   

   dropoff_location_id  payment_type  fare_amount  extra  mta_tax  tip_amount  \

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/19 [00:00<?, ?it/s]

name: my_sql_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name: insert_your_expectation_suite_name_here
batch_request:
  datasource_name: pg_datasource
  data_asset_name: yellow_taxi_data
  options: {}
  batch_slice:
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
evaluation_parameters: {}
runtime_configuration: {}
validations: []
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:



In [12]:
context.open_data_docs()