# Determining daily sum fares by borough

We have been tasked with building a simple data engineering pipeline that takes [NYC Taxi data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)  and extracts the daily sum of fares for each day in each borough.

We are going to start by importing just the January 2021 data and manually scripting on it to determine this information for that month. We'll then package the scripts that we write as an imitation of a data pipeline, and run February 2021 through this data pipeline.

To show what can happen if data issues are introduced upstream or in a data pipeline (and how they can be resolved with whylogs), we will then take the March 2021 data and modify it in a way that breaks our estimates.

## January 2021 - manual analysis

In [None]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.csv
#!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

In [None]:
import pandas as pd
import numpy as np

trip_data_01 = pd.read_csv('green_tripdata_2021-01.csv')
lookup_table = pd.read_csv('taxi+_zone_lookup.csv')

trip_data_01 

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2021-01-01 00:15:56,2021-01-01 00:19:52,N,1.0,43,151,1.0,1.01,5.50,0.5,0.5,0.00,0.00,,0.3,6.80,2.0,1.0,0.00
1,2.0,2021-01-01 00:25:59,2021-01-01 00:34:44,N,1.0,166,239,1.0,2.53,10.00,0.5,0.5,2.81,0.00,,0.3,16.86,1.0,1.0,2.75
2,2.0,2021-01-01 00:45:57,2021-01-01 00:51:55,N,1.0,41,42,1.0,1.12,6.00,0.5,0.5,1.00,0.00,,0.3,8.30,1.0,1.0,0.00
3,2.0,2020-12-31 23:57:51,2021-01-01 00:04:56,N,1.0,168,75,1.0,1.99,8.00,0.5,0.5,0.00,0.00,,0.3,9.30,2.0,1.0,0.00
4,2.0,2021-01-01 00:16:36,2021-01-01 00:16:40,N,2.0,265,265,3.0,0.00,-52.00,0.0,-0.5,0.00,0.00,,-0.3,-52.80,3.0,1.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
76513,,2021-01-15 10:35:00,2021-01-15 10:51:00,,,3,147,,5.97,17.01,0.0,0.0,0.00,0.00,,0.3,17.31,,,
76514,,2021-01-15 10:25:00,2021-01-15 10:34:00,,,242,213,,3.83,27.27,0.0,0.0,2.75,0.00,,0.3,30.32,,,
76515,,2021-01-15 10:16:00,2021-01-15 10:20:00,,,181,181,,0.45,12.89,0.0,0.0,2.75,0.00,,0.3,15.94,,,
76516,,2021-01-15 10:16:00,2021-01-15 10:58:00,,,244,72,,22.21,50.67,0.0,0.0,2.75,6.12,,0.3,59.84,,,


In [None]:
#convert dropoff, pickup datetime strings to datetime objects
trip_data_01['lpep_pickup_datetime'] = pd.to_datetime(trip_data_01['lpep_pickup_datetime'])
trip_data_01['lpep_dropoff_datetime'] = pd.to_datetime(trip_data_01['lpep_dropoff_datetime'])

#remove extraneous observations
cleaned = trip_data_01.loc[trip_data_01["lpep_pickup_datetime"].dt.year == 2021]

#calculate duration
duration = cleaned['lpep_dropoff_datetime'] - cleaned['lpep_pickup_datetime']
with_duration = cleaned.assign(duration = duration / np.timedelta64(1, 's'))

#enrich with pickup location data
first_join = with_duration.join(lookup_table.set_index('LocationID'), on='PULocationID')
first_join = first_join.rename(columns={"Borough": "PU_Borough", "Zone": "PU_Zone", "service_zone": "PU_service_zone"})

#enrich with dropoff location data
second_join = first_join.join(lookup_table.set_index('LocationID'), on='DOLocationID')
second_join = second_join.rename(columns={"Borough": "DO_Borough", "Zone": "DO_Zone", "service_zone": "DO_service_zone"})

#drop useless columns
final_dataset_01 = second_join.drop(columns = ['store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'RatecodeID', 'payment_type', 'ehail_fee', 'trip_type', 'congestion_surcharge'])

final_dataset_01

In [None]:
deliverable_01 = final_dataset_01.groupby([pd.Grouper(key = 'lpep_pickup_datetime',freq='D'), pd.Grouper('PU_Borough')]).sum().total_amount.reset_index()

deliverable_01

Unnamed: 0,lpep_pickup_datetime,PU_Borough,total_amount
0,2021-01-01,Bronx,3980.35
1,2021-01-01,Brooklyn,7737.21
2,2021-01-01,Manhattan,6736.69
3,2021-01-01,Queens,6178.81
4,2021-01-01,Staten Island,235.70
...,...,...,...
182,2021-01-31,Brooklyn,9638.95
183,2021-01-31,Manhattan,9625.82
184,2021-01-31,Queens,5999.33
185,2021-01-31,Staten Island,184.84


## February 2021 - functioning pipeline

### Defining the pipeline

In [None]:
def deal_with_time(df):
    #convert dropoff, pickup datetime strings to datetime objects
    df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])

    #remove extraneous observations
    df = df.loc[df["lpep_pickup_datetime"].dt.year == 2021]

    #calculate duration
    duration = df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']
    df = df.assign(duration = duration / np.timedelta64(1, 's'))
    return df

In [None]:
def enrich_data(df, lookup):
    #enrich with pickup location data
    df = df.join(lookup.set_index('LocationID'), on='PULocationID')
    df = df.rename(columns={"Borough": "PU_Borough", "Zone": "PU_Zone", "service_zone": "PU_service_zone"})

    #enrich with dropoff location data
    df = df.join(lookup.set_index('LocationID'), on='DOLocationID')
    df = df.rename(columns={"Borough": "DO_Borough", "Zone": "DO_Zone", "service_zone": "DO_service_zone"})
    return df

In [None]:
def drop_columns(df):
    #drop useless columns
    df = df.drop(columns = ['store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'RatecodeID', 'payment_type', 'ehail_fee', 'trip_type', 'congestion_surcharge'])
    return df

### Executing the pipeline

In [None]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-02.csv

In [None]:
trip_data_02 = pd.read_csv('green_tripdata_2021-02.csv')

In [None]:
with_duration = deal_with_time(trip_data_02)

second_join = enrich_data(with_duration, lookup_table)

final_dataset_02 = drop_columns(second_join)

In [None]:
deliverable_02 = final_dataset_02.groupby([pd.Grouper(key = 'lpep_pickup_datetime',freq='D'), pd.Grouper('PU_Borough')]).sum().total_amount.reset_index()

deliverable_02

Unnamed: 0,lpep_pickup_datetime,PU_Borough,total_amount
0,2021-02-01,Bronx,874.50
1,2021-02-01,Brooklyn,2413.46
2,2021-02-01,Manhattan,1754.01
3,2021-02-01,Queens,1875.52
4,2021-02-01,Unknown,38.54
...,...,...,...
162,2021-02-28,Manhattan,10707.10
163,2021-02-28,Queens,6810.17
164,2021-02-28,Staten Island,188.34
165,2021-02-28,Unknown,251.26


## March 2021 - loudly broken pipeline

In [None]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-03.csv

trip_data_03 = pd.read_csv('green_tripdata_2021-03.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [None]:
trip_data_03['PULocationID'] = trip_data_03['PULocationID'].apply(str)

In [None]:
with_duration = deal_with_time(trip_data_03)

second_join = enrich_data(with_duration, lookup_table)

final_dataset_03 = drop_columns(second_join)

ValueError: You are trying to merge on object and int64 columns. If you wish to proceed you should use pd.concat

In [None]:
def enrich_data(df, lookup):
    #ensure PULocation_ID is an int
    df['PULocationID'] = df['PULocationID'].apply(int)

    #enrich with pickup location data
    df = df.join(lookup.set_index('LocationID'), on='PULocationID')
    df = df.rename(columns={"Borough": "PU_Borough", "Zone": "PU_Zone", "service_zone": "PU_service_zone"})

    #enrich with dropoff location data
    df = df.join(lookup.set_index('LocationID'), on='DOLocationID')
    df = df.rename(columns={"Borough": "DO_Borough", "Zone": "DO_Zone", "service_zone": "DO_service_zone"})
    return df

In [None]:
with_duration = deal_with_time(trip_data_03)

second_join = enrich_data(with_duration, lookup_table)

final_dataset_03 = drop_columns(second_join)

final_dataset_03

In [None]:
deliverable_03 = final_dataset_03.groupby([pd.Grouper(key = 'lpep_pickup_datetime',freq='D'), pd.Grouper('PU_Borough')]).sum().total_amount.reset_index()

deliverable_03

Unnamed: 0,lpep_pickup_datetime,PU_Borough,total_amount
0,2021-03-01,Bronx,13919.45
1,2021-03-01,Brooklyn,23220.32
2,2021-03-01,Manhattan,20582.48
3,2021-03-01,Queens,14329.67
4,2021-03-01,Staten Island,472.24
...,...,...,...
183,2021-03-31,Brooklyn,21562.46
184,2021-03-31,Manhattan,20373.86
185,2021-03-31,Queens,14289.92
186,2021-03-31,Staten Island,469.02


## April 2021 - silently broken pipeline

In [None]:
#!wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-04.csv

trip_data_04 = pd.read_csv('green_tripdata_2021-04.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [None]:
trip_data_04['total_amount'] = 100*trip_data_04['total_amount'].apply(int)

In [None]:
with_duration = deal_with_time(trip_data_04)

second_join = enrich_data(with_duration, lookup_table)

final_dataset_04 = drop_columns(second_join)

In [None]:
deliverable_04 = final_dataset_04.groupby([pd.Grouper(key = 'lpep_pickup_datetime',freq='D'), pd.Grouper('PU_Borough')])['total_amount'].sum().reset_index()

deliverable_04

Unnamed: 0,lpep_pickup_datetime,PU_Borough,total_amount
0,2021-03-31,Brooklyn,4100
1,2021-03-31,Manhattan,1200
2,2021-04-01,Bronx,1508200
3,2021-04-01,Brooklyn,2246800
4,2021-04-01,Manhattan,2165200
...,...,...,...
183,2021-04-30,Unknown,42100
184,2021-05-01,Brooklyn,600
185,2021-05-01,Manhattan,10800
186,2021-05-01,Queens,900


### Using visualizations to identify issues

In [None]:
from whylogs import get_or_create_session
from whylogs.viz import NotebookProfileViewer
session = get_or_create_session()

with session.logger(dataset_name="deliverable_01") as logger:
    logger.log_dataframe(deliverable_01)
    profile_01 = logger.profile

with session.logger(dataset_name="deliverable_04") as logger:
    logger.log_dataframe(deliverable_04)
    profile_04 = logger.profile

visualization = NotebookProfileViewer()
visualization.set_profiles(target_profile=profile_01, reference_profile=profile_04)

  from .autonotebook import tqdm as notebook_tqdm
WARN: Missing config


In [None]:
visualization.summary_drift_report(preferred_cell_height="1000px")

In [None]:
visualization.distribution_chart(feature_names="PU_Borough")

In [None]:
visualization.double_histogram(feature_names="total_amount")

In [None]:
visualization.feature_statistics(feature_name="total_amount", profile="target")

In [None]:
visualization.feature_statistics(feature_name="total_amount", profile="reference")

### Using constraints to identify issues

In [None]:
from whylogs.core.statistics.constraints import (
    columnValuesTypeEqualsConstraint,
    columnValuesInSetConstraint,
    minGreaterThanEqualConstraint,
    maxLessThanEqualConstraint,
    DatasetConstraints
)

def get_sample_dataset_constraints():
    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.columnValuesTypeEqualsConstraint
    cvtec = columnValuesTypeEqualsConstraint(expected_type = 2)

    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.columnValuesInSetConstraint
    cvisc = columnValuesInSetConstraint(value_set={"Manhattan", "Brooklyn", "Queens", "Bronx", "Unknown", "Staten Island", "EWR"})

    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.minGreaterThanEqualConstraint
    migtec = minGreaterThanEqualConstraint(value = 20)
    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.maxLessThanEqualConstraint
    magtec = maxLessThanEqualConstraint(value = 50000)

    return DatasetConstraints(
        None,
        value_constraints={"PU_Borough": [cvisc]},
        summary_constraints={"total_amount": [migtec, magtec,cvtec],
                             }
    )

session = get_or_create_session()

dc = get_sample_dataset_constraints()
constraints_profile = session.log_dataframe(deliverable_04, "manual_constraints", constraints=dc)
constraints_profile.apply_summary_constraints()
constraints_profile.apply_table_shape_constraints()
session.close()

visualization.constraints_report(dc)

In [None]:
dc = profile_01.generate_constraints()

session = get_or_create_session()

constraints_profile = session.log_dataframe(deliverable_04, "automatic_constraints", constraints=dc)
constraints_profile.apply_summary_constraints()
constraints_profile.apply_table_shape_constraints()
session.close()

visualization.constraints_report(dc)

WARN: Missing config


## May 2021 - create streaming pipeline

In [None]:
# !wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-05.csv

trip_data_05 = pd.read_csv('green_tripdata_2021-05.csv')
trip_data_05.sample(n=100, random_state=0).to_csv('green_tripdata_2021-05.csv')

In [None]:
!python producer.py -f confluent_credentials.txt -t taxi_data_05 -d "green_tripdata_2021-05.csv"

%4|1648527969.035|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1648527969.048|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
Producing record: trip	b'\x80\x03cpandas.core.series\nSeries\nq\x00)\x81q\x01}q\x02(X\x04\x00\x00\x00_mgrq\x03cpandas.core.internals.managers\nSingleBlockManager\nq\x04)\x81q\x05(]q\x06cpandas.core.indexes.base\n_new_Index\nq\x07cpandas.core.indexes.base\nIndex\nq\x08}q\t(X\x04\x00\x00\x00dataq\ncnumpy.core.multiarray\n_reconstruct\nq\x0bcnumpy\nndarray\nq\x0cK\x00\x85q\rC\x01bq\x0e\x87q\x0fRq\x10(K\x01K\'\x85q\x11cnumpy\ndtype\nq\x12X\x02\x00\x00\x00O8q\x13\x89\x88\x87q\x14Rq\x15(K\x03X\x01\x00\x00\x00|q\x16NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?tq\x17b\x89]q\x18(X\n\x00\x00\x00Unnamed: 0q\x19X\x0c\x00\x00\x00Unnamed: 0.1q\x1aX\x0e\x00\x00\x00Unnamed:

In [None]:
# https://github.com/confluentinc/examples/blob/0a22a9de96757eb772eb34569c9b7ad72821099d/clients/cloud/python/consumer.py

from confluent_kafka import Consumer
import pickle
import ccloud_lib


if __name__ == '__main__':
    
    # Read arguments and configurations and initialize
    config_file = "confluent_credentials.txt"
    topic = "taxi_data_05"
    conf = ccloud_lib.read_ccloud_config(config_file)

    # Create Consumer instance
    # 'auto.offset.reset=earliest' to start reading from the beginning of the
    #   topic if no committed offsets exist
    consumer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
    consumer_conf['group.id'] = 'python_example_group_1'
    consumer_conf['auto.offset.reset'] = 'earliest'
    consumer = Consumer(consumer_conf)

    # Subscribe to topic
    consumer.subscribe([topic])

    # Process messages
    final_dataset_05 = pd.DataFrame()
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No message available within timeout.
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting for message or event/error in poll()")
                continue
            elif msg.error():
                print('error: {}'.format(msg.error()))
            else:
                # Check for Kafka message
                record_key = msg.key()
                record_value = msg.value()
                data = pickle.loads(record_value)
                temp_df = data.to_frame().T

                with_duration = deal_with_time(temp_df)
                second_join = enrich_data(with_duration, lookup_table)
                final_temp = drop_columns(second_join)

                final_dataset_05 = pd.concat([final_dataset_05, final_temp], sort = False)
                print(data)
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

Unnamed: 0.1.1.1.1                                                  8
Unnamed: 0.1.1.1.1.1                                               54
Unnamed: 0.1.1.1.1.1.1                                             28
Unnamed: 0.1.1.1.1.1.1.1                                           82
Unnamed: 0.1.1.1.1.1.1.1.1                                         81
Unnamed: 0.1.1.1.1.1.1.1.1.1                                       37
Unnamed: 0.1.1.1.1.1.1.1.1.1.1                                     56
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1                                   11
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1                                558
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1.1                              892
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1.1.1                            791
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1                         3206
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1                       2298
Unnamed: 0.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1.1                     3432
Unnamed: 0.1.1.1.1.1

In [None]:
deliverable_05 = final_dataset_04.groupby([pd.Grouper(key = 'lpep_pickup_datetime',freq='D'), pd.Grouper('PU_Borough')])['total_amount'].sum().reset_index()

deliverable_05

Unnamed: 0,lpep_pickup_datetime,PU_Borough,total_amount
0,2021-03-31,Brooklyn,4100
1,2021-03-31,Manhattan,1200
2,2021-04-01,Bronx,1508200
3,2021-04-01,Brooklyn,2246800
4,2021-04-01,Manhattan,2165200
...,...,...,...
183,2021-04-30,Unknown,42100
184,2021-05-01,Brooklyn,600
185,2021-05-01,Manhattan,10800
186,2021-05-01,Queens,900


In [None]:
session = get_or_create_session()

with session.logger(dataset_name="deliverable_01") as logger:
    logger.log_dataframe(deliverable_01)
    profile_01 = logger.profile

with session.logger(dataset_name="deliverable_05") as logger:
    logger.log_dataframe(deliverable_05)
    profile_05 = logger.profile

visualization = NotebookProfileViewer()
visualization.set_profiles(target_profile=profile_01, reference_profile=profile_05)

visualization.summary_drift_report(preferred_cell_height="1000px")

WARN: Missing config


## June 2021 - validate streaming pipeline

In [None]:
# !wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-06.csv

trip_data_06 = pd.read_csv('green_tripdata_2021-06.csv')
trip_data_06.sample(n=100, random_state=0).to_csv('green_tripdata_2021-06.csv')

In [None]:
!python producer.py -f confluent_credentials.txt -t taxi_data_06 -d "green_tripdata_2021-06.csv"

%4|1648528010.882|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1648528010.895|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
Producing record: trip	b'\x80\x03cpandas.core.series\nSeries\nq\x00)\x81q\x01}q\x02(X\x04\x00\x00\x00_mgrq\x03cpandas.core.internals.managers\nSingleBlockManager\nq\x04)\x81q\x05(]q\x06cpandas.core.indexes.base\n_new_Index\nq\x07cpandas.core.indexes.base\nIndex\nq\x08}q\t(X\x04\x00\x00\x00dataq\ncnumpy.core.multiarray\n_reconstruct\nq\x0bcnumpy\nndarray\nq\x0cK\x00\x85q\rC\x01bq\x0e\x87q\x0fRq\x10(K\x01K$\x85q\x11cnumpy\ndtype\nq\x12X\x02\x00\x00\x00O8q\x13\x89\x88\x87q\x14Rq\x15(K\x03X\x01\x00\x00\x00|q\x16NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?tq\x17b\x89]q\x18(X\n\x00\x00\x00Unnamed: 0q\x19X\x0c\x00\x00\x00Unnamed: 0.1q\x1aX\x0e\x00\x00\x00Unnamed: 

In [None]:
session = get_or_create_session()
with session.logger(dataset_name="final_dataset_01") as logger:
    logger.log_dataframe(final_dataset_01.head(2000))
    final_dataset_01_profile = logger.profile

total_amount = final_dataset_01_profile.columns['total_amount'].to_summary()

max_plus_twostds = total_amount.number_summary.max + 2*total_amount.number_summary.stddev

def get_sample_dataset_constraints():
    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.columnValuesTypeEqualsConstraint
    cvtec = columnValuesTypeEqualsConstraint(expected_type = 2)

    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.columnValuesInSetConstraint
    cvisc = columnValuesInSetConstraint(value_set={"Manhattan", "Brooklyn", "Queens", "Bronx", "Unknown", "Staten Island", "EWR"})

    # https://whylogs.readthedocs.io/en/latest/autoapi/whylogs/core/statistics/constraints/index.html#whylogs.core.statistics.constraints.maxLessThanEqualConstraint
    magtec = maxLessThanEqualConstraint(value = max_plus_twostds)

    return DatasetConstraints(
        None,
        value_constraints={"PU_Borough": [cvisc]},
        summary_constraints={"total_amount": [magtec,cvtec],
                             }
    )

dc = get_sample_dataset_constraints()

In [None]:
# https://github.com/confluentinc/examples/blob/0a22a9de96757eb772eb34569c9b7ad72821099d/clients/cloud/python/consumer.py

from confluent_kafka import Consumer
import pickle
import ccloud_lib

import pandas as pd
from whylogs import get_or_create_session

if __name__ == '__main__':
    
    # Read arguments and configurations and initialize
    config_file = "confluent_credentials.txt"
    topic = "taxi_data_06"
    conf = ccloud_lib.read_ccloud_config(config_file)

    # Create Consumer instance
    # 'auto.offset.reset=earliest' to start reading from the beginning of the
    #   topic if no committed offsets exist
    consumer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
    consumer_conf['group.id'] = 'python_example_group_1'
    consumer_conf['auto.offset.reset'] = 'earliest'
    consumer = Consumer(consumer_conf)

    # Subscribe to topic
    consumer.subscribe([topic])

    # Process messages
    final_dataset_06 = pd.DataFrame()
    try:
        session = get_or_create_session()
        with session.logger(dataset_name="merger", with_rotation_time="5s", constraints = dc) as logger: # 5s is a very short rotation time in general; we recommend over 30 seconds, but for the sakes of this demo, we're doing it fast
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = msg.key()
                    record_value = msg.value()
                    data = pickle.loads(record_value)
                    temp_df = data.to_frame().T

                    with_duration = deal_with_time(temp_df)
                    second_join = enrich_data(with_duration, lookup_table)
                    final_temp = drop_columns(second_join)

                    final_dataset_06 = pd.concat([final_dataset_06, final_temp], sort = False)
                    logger.log_dataframe(final_temp)

                    for contraint_name, constraint in dc.value_constraint_map.items():
                        report = constraint.report()[0]
                        print(f"Checking that {report[0]}: {report[1]} items and {report[2]} failed.")

                    if 'profile_06' not in locals() and 'profile_06' not in globals():
                        profile_06 = logger.profile
                    else:
                        profile_temp_06 = logger.profile
                        profile_06 = profile_06.merge(profile_temp_06)
        session.close()
        
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 1 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 2 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 3 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 4 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 5 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 6 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bronx', 'Unknown', 'Queens'}: 7 items and 0 failed.
Checking that values are in {'Brooklyn', 'Manhattan', 'EWR', 'Staten Island', 'Bron

In [None]:
visualization = NotebookProfileViewer()
visualization.set_profiles(target_profile=profile_01, reference_profile=profile_06)

visualization.summary_drift_report(preferred_cell_height="1000px")

In [None]:
constraints_profile.apply_summary_constraints()
constraints_profile.apply_table_shape_constraints()

visualization.constraints_report(dc)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=b2f248fb-8ce7-44e4-9ced-6f2ccd20eb10' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>