In [None]:
# Installing spark and findspark packages to run in local
!pip install pyspark==3.3
!pip install findspark

In [None]:
# installing pydeequ Python package. Currently PyDeequ supports only until Spark 3.3
!pip install pydeequ

In [68]:
import findspark
findspark.init()
findspark.find()

'/Users/akashdeepgupta/Documents/project-repos/git-repos/aws-rss-discord-bot/venv/lib/python3.12/site-packages/pyspark'

In [69]:
# Pydeequ looks for SPARK_VERSION from environment variables.
import os
os.environ["SPARK_VERSION"] = '3.3'

print(os.environ["SPARK_VERSION"])

3.3


In [70]:
from pyspark.sql import SparkSession
import pydeequ

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("pydeequ-example") \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
    .getOrCreate()

23/12/24 13:21:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Reading NYC Yellow Taxt Trip Data
- Data and Data Dictionary can be found [here](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

In [71]:
yellow_df = spark.read.parquet("nyc-taxi-trips/yellow/")
yellow_df.printSchema(), yellow_df.count()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



(None, 3522285)

In [72]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Adding a column gen_id to check the rules aroung uniqueness and column combination uniqueness in Pydeequ
sample_df = yellow_df.withColumn("gen_id", monotonically_increasing_id())

# Metrics Computation

In [73]:
from pydeequ.analyzers import AnalysisRunner, AnalyzerContext, Size, Completeness, Distinctness, Uniqueness, Compliance, Mean, Sum, Maximum

analysisResult = AnalysisRunner(spark) \
                    .onData(sample_df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("VendorID")) \
                    .addAnalyzer(Distinctness("VendorID")) \
                    .addAnalyzer(Uniqueness(["VendorID", "gen_id"])) \
                    .addAnalyzer(Compliance("payment_type", "payment_type in (1,2,3,4,5,6)")) \
                    .addAnalyzer(Mean("trip_distance")) \
                    .addAnalyzer(Sum("total_amount")) \
                    .addAnalyzer(Maximum("extra")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-----------+---------------+------------+--------------------+
|     entity|       instance|        name|               value|
+-----------+---------------+------------+--------------------+
|     Column|   total_amount|         Sum|1.0274988166009633E8|
|     Column|       VendorID|Distinctness|8.517198352773839E-7|
|     Column|  trip_distance|        Mean|   3.926694986351057|
|    Dataset|              *|        Size|           3522285.0|
|     Column|   payment_type|  Compliance|  0.9560146325467701|
|     Column|       VendorID|Completeness|                 1.0|
|Mutlicolumn|VendorID,gen_id|  Uniqueness|                 1.0|
|     Column|          extra|     Maximum|               17.25|
+-----------+---------------+------------+--------------------+





# Profiling Data

In [74]:
# Profiling Data
from pydeequ.profiles import *

# Profiling all the columns: ColumnProfilerRunner.onData returns a ColumnProfilerRunBuilder
result = ColumnProfilerRunner(spark) \
            .onData(sample_df) \
            .run()

for col_name, profile in result.profiles.items():
    print(col_name)
    print(profile)

                                                                                

DOLocationID
NumericProfiles for column: DOLocationID: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 262,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null,
    "kll": "None",
    "mean": 164.5522420814897,
    "maximum": 265.0,
    "minimum": 1.0,
    "sum": 579599894.0,
    "stdDev": 69.76912935351677,
    "approxPercentiles": []
}
improvement_surcharge
NumericProfiles for column: improvement_surcharge: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 5,
    "dataType": "Fractional",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "0.0",
            1539,
            0.00043693227549729795
        ],
        [
            "-1.0",
            36962,
            0.010493756183840887
        ],
        [
            "0.3",
            916,
            0.0002600584563713612
        ],
        [
            "1.0",
            3482867,
            0.98880896

In [75]:
# profiling only few columns present in dataframe
column_profiler = ColumnProfilerRunner(spark) \
                            .onData(sample_df) \
                            .restrictToColumns(['VendorID', 'trip_distance', 'total_amount'])\
                            .run()

for col_name, profile in column_profiler.profiles.items():
    print(col_name)
    print(profile)

VendorID
NumericProfiles for column: VendorID: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "2",
            2617320,
            0.7430744530894008
        ],
        [
            "1",
            904463,
            0.25678302579149614
        ],
        [
            "6",
            502,
            0.00014252111910308224
        ]
    ],
    "kll": "None",
    "mean": 1.7437870586849162,
    "maximum": 6.0,
    "minimum": 1.0,
    "sum": 6142115.0,
    "stdDev": 0.4397934656174618,
    "approxPercentiles": []
}
trip_distance
NumericProfiles for column: trip_distance: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 4888,
    "dataType": "Fractional",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null,
    "kll": "None",
    "mean": 3.926694986351057,
    "maximum": 205544.17,
    "minimum": 0.0,
  

                                                                                

# ConstraintSuggestion
Types of constraints Suggestions available can be seen [here](https://github.com/awslabs/python-deequ/blob/master/docs/suggestions.md)

In [76]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(yellow_df.select("fare_amount")) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(fare_amount,None))",
      "column_name": "fare_amount",
      "current_value": "Completeness: 1.0",
      "description": "'fare_amount' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"fare_amount\")"
    }
  ]
}


In [77]:
# Running specific suggestion rules, helps in knowing if a particular rule is suggested or not and how it can actually be implemented. 
# Check Airport_fee > RetainCompletenessRule()
specific_suggestion_result = ConstraintSuggestionRunner(spark) \
             .onData(sample_df) \
             .addConstraintRule(NonNegativeNumbersRule()) \
             .addConstraintRule(RetainTypeRule()) \
             .addConstraintRule(RetainCompletenessRule()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(specific_suggestion_result, indent=2))




{
  "constraint_suggestions": [
    {
      "constraint_name": "ComplianceConstraint(Compliance('DOLocationID' has no negative values,DOLocationID >= 0,None))",
      "column_name": "DOLocationID",
      "current_value": "Minimum: 1.0",
      "description": "'DOLocationID' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"DOLocationID\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('PULocationID' has no negative values,PULocationID >= 0,None))",
      "column_name": "PULocationID",
      "current_value": "Minimum: 1.0",
      "description": "'PULocationID' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constrain

                                                                                

# Constraint Verification
All the available checks that can be used for Verification can be seen [here](https://github.com/awslabs/python-deequ/blob/master/docs/checks.md)

In [78]:
from pydeequ.checks import *
from pydeequ.verification import *

# Check represents a list of constraints that can be applied to a provided Spark Dataframe
check = Check(spark, CheckLevel.Warning, "NYC Yellow Taxi Trips Oct 2023")


checkResult = VerificationSuite(spark) \
    .onData(sample_df) \
    .addCheck(
        check.isComplete("VendorID")  \
        .isUnique("gen_id")  \
        .hasUniqueness(["VendorID", "gen_id"], assertion=lambda x: x==1) \
        .hasCompleteness("Airport_fee", assertion=lambda x: x >= 0.95) \
        .isNonNegative("fare_amount")) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

                                                                                

+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check                         |check_level|check_status|constraint                                                                                                                |constraint_status|constraint_message                                                 |
+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-

In [79]:
# checking all the metrics value from Verification Results
VerificationResult.successMetricsAsDataFrame(spark, checkResult).show(truncate=False)

+-----------+---------------------------+------------+------------------+
|entity     |instance                   |name        |value             |
+-----------+---------------------------+------------+------------------+
|Column     |fare_amount is non-negative|Compliance  |0.9894670647037364|
|Column     |gen_id                     |Uniqueness  |1.0               |
|Column     |Airport_fee                |Completeness|0.9560146325467701|
|Column     |VendorID                   |Completeness|1.0               |
|Mutlicolumn|VendorID,gen_id            |Uniqueness  |1.0               |
+-----------+---------------------------+------------+------------------+



# Metrics Repositories
PyDeequ allows us to persist the metrics we computed on dataframes in a so-called `MetricsRepository`. In the following example, we showcase how to store metrics in a filesystem and query them later on.

In [80]:
# Initializing Metrics Repository: FileSystemMetricsRepository. Other Repositories are InMemoryMetricsRepository
from pydeequ.repository import FileSystemMetricsRepository, ResultKey

# metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'pydeequ_metrics/nyc_yellow_metrics.json')
# path to metrics file can be an S3 path too
metrics_file = "./nyc_metrics_yellow.json"
nyc_yellow_repository = FileSystemMetricsRepository(spark, path=metrics_file)
print(metrics_file)

./nyc_metrics_yellow.json


Each set of metrics that we computed needs be indexed by a so-called `ResultKey`, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

In [81]:
# This tag will basically uniquely identify the Analysis result
key_tags = {'tag': 'nyc_yellow_oct_2023'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

### Using this repository in Metrics Computation and Constraint Verification. 

In [82]:
from pydeequ.analyzers import AnalysisRunner, AnalyzerContext, Size, Completeness, Distinctness, Uniqueness, Compliance, Mean, Sum, Maximum

# using .useRepository and saveOrAppendResult for saving results in metrics.json file
analysisResult = AnalysisRunner(spark) \
                    .onData(sample_df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("VendorID")) \
                    .addAnalyzer(Distinctness("VendorID")) \
                    .addAnalyzer(Uniqueness(["VendorID", "gen_id"])) \
                    .addAnalyzer(Compliance("payment_type", "payment_type in (1,2,3,4,5,6)")) \
                    .addAnalyzer(Mean("trip_distance")) \
                    .addAnalyzer(Sum("total_amount")) \
                    .addAnalyzer(Maximum("extra")) \
                    .useRepository(nyc_yellow_repository) \
                    .saveOrAppendResult(resultKey) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-----------+---------------+------------+--------------------+
|     entity|       instance|        name|               value|
+-----------+---------------+------------+--------------------+
|     Column|   total_amount|         Sum|1.0274988166009633E8|
|     Column|       VendorID|Distinctness|8.517198352773839E-7|
|     Column|  trip_distance|        Mean|   3.926694986351057|
|    Dataset|              *|        Size|           3522285.0|
|     Column|   payment_type|  Compliance|  0.9560146325467701|
|     Column|       VendorID|Completeness|                 1.0|
|Mutlicolumn|VendorID,gen_id|  Uniqueness|                 1.0|
|     Column|          extra|     Maximum|               17.25|
+-----------+---------------+------------+--------------------+





### Loading this back from MetricsRepository


In [83]:
# .before: Only look at AnalysisResults with a result key with a greater value
ny_yellow_analysis_result = nyc_yellow_repository.load() \
                                .before(ResultKey.current_milli_time()) \
                                .getSuccessMetricsAsDataFrame()

ny_yellow_analysis_result.show()

+-----------+---------------+------------+--------------------+-------------+-------------------+
|     entity|       instance|        name|               value| dataset_date|                tag|
+-----------+---------------+------------+--------------------+-------------+-------------------+
|     Column|   total_amount|         Sum|1.0274988166009633E8|1703404431776|nyc_yellow_oct_2023|
|     Column|       VendorID|Distinctness|8.517198352773839E-7|1703404431776|nyc_yellow_oct_2023|
|     Column|  trip_distance|        Mean|   3.926694986351057|1703404431776|nyc_yellow_oct_2023|
|    Dataset|              *|        Size|           3522285.0|1703404431776|nyc_yellow_oct_2023|
|     Column|   payment_type|  Compliance|  0.9560146325467701|1703404431776|nyc_yellow_oct_2023|
|     Column|       VendorID|Completeness|                 1.0|1703404431776|nyc_yellow_oct_2023|
|Mutlicolumn|VendorID,gen_id|  Uniqueness|                 1.0|1703404431776|nyc_yellow_oct_2023|
|     Column|       

### What are the benefits ? 
Let's run the same analysis on NY Yellow Taxi Sep 2023 data

In [84]:
sep_yellow_df = spark.read.parquet("nyc-taxi-trips/yellow/sep-2023/")
sep_yellow_df.printSchema(), sep_yellow_df.count()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



(None, 2846722)

In [85]:
# define a new ResultKey for this analysis with new tag
key_tag_sep = {'tag': 'nyc_yellow_sep_2023'}
result_key_sep = ResultKey(spark, ResultKey.current_milli_time(), key_tag_sep)

In [86]:
sep_analysisResult = AnalysisRunner(spark) \
                    .onData(sep_yellow_df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("VendorID")) \
                    .addAnalyzer(Distinctness("VendorID")) \
                    .addAnalyzer(Compliance("payment_type", "payment_type in (1,2,3,4,5,6)")) \
                    .addAnalyzer(Mean("trip_distance")) \
                    .addAnalyzer(Sum("total_amount")) \
                    .addAnalyzer(Maximum("extra")) \
                    .useRepository(nyc_yellow_repository) \
                    .saveOrAppendResult(result_key_sep) \
                    .run()
                    
sep_analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, sep_analysisResult)
sep_analysisResult_df.show()

+-------+-------------+------------+--------------------+
| entity|     instance|        name|               value|
+-------+-------------+------------+--------------------+
| Column| total_amount|         Sum| 8.478089625005618E7|
| Column|     VendorID|Distinctness|1.053843684068904...|
| Column|trip_distance|        Mean|   4.274268084484396|
|Dataset|            *|        Size|           2846722.0|
| Column| payment_type|  Compliance|  0.9507415898004793|
| Column|     VendorID|Completeness|                 1.0|
| Column|        extra|     Maximum|             10002.5|
+-------+-------------+------------+--------------------+



In [87]:
# Loading from Repo again
ny_yellow_analysis_result_new = nyc_yellow_repository.load() \
                                .before(ResultKey.current_milli_time()) \
                                .getSuccessMetricsAsDataFrame()

ny_yellow_analysis_result_new.show()

+-----------+---------------+------------+--------------------+-------------+-------------------+
|     entity|       instance|        name|               value| dataset_date|                tag|
+-----------+---------------+------------+--------------------+-------------+-------------------+
|     Column|   total_amount|         Sum|1.0274988166009633E8|1703404431776|nyc_yellow_oct_2023|
|     Column|       VendorID|Distinctness|8.517198352773839E-7|1703404431776|nyc_yellow_oct_2023|
|     Column|  trip_distance|        Mean|   3.926694986351057|1703404431776|nyc_yellow_oct_2023|
|    Dataset|              *|        Size|           3522285.0|1703404431776|nyc_yellow_oct_2023|
|     Column|   payment_type|  Compliance|  0.9560146325467701|1703404431776|nyc_yellow_oct_2023|
|     Column|       VendorID|Completeness|                 1.0|1703404431776|nyc_yellow_oct_2023|
|Mutlicolumn|VendorID,gen_id|  Uniqueness|                 1.0|1703404431776|nyc_yellow_oct_2023|
|     Column|       

In [88]:
# filtering results from repo based on tags
ny_yellow_sep_repo = nyc_yellow_repository.load().withTagValues(key_tag_sep).getSuccessMetricsAsDataFrame()
ny_yellow_sep_repo.show()

+-------+-------------+------------+--------------------+-------------+-------------------+
| entity|     instance|        name|               value| dataset_date|                tag|
+-------+-------------+------------+--------------------+-------------+-------------------+
| Column| total_amount|         Sum| 8.478089625005618E7|1703404475815|nyc_yellow_sep_2023|
| Column|     VendorID|Distinctness|1.053843684068904...|1703404475815|nyc_yellow_sep_2023|
| Column|trip_distance|        Mean|   4.274268084484396|1703404475815|nyc_yellow_sep_2023|
|Dataset|            *|        Size|           2846722.0|1703404475815|nyc_yellow_sep_2023|
| Column| payment_type|  Compliance|  0.9507415898004793|1703404475815|nyc_yellow_sep_2023|
| Column|     VendorID|Completeness|                 1.0|1703404475815|nyc_yellow_sep_2023|
| Column|        extra|     Maximum|             10002.5|1703404475815|nyc_yellow_sep_2023|
+-------+-------------+------------+--------------------+-------------+---------

### With Constraint Verification
- Stores the success metrics and not the verificaiton results. So schema in all the cases remains the same. So same repo can be used with different tag for verification run.
- If you need to store Verification Results, write them directly into you file location or S3 using pyspark or using `pandas` if naming convention is something important.

In [89]:
# Create Result key with the tag
verify_result_key_nyc_yoct = ResultKey(spark, ResultKey.current_milli_time(), {"tag": "verify_nyc_yellow_oct_2023"})

In [90]:
check = Check(spark, CheckLevel.Warning, "NYC Yellow Taxi Trips Oct 2023")


checkResult = VerificationSuite(spark) \
    .onData(sample_df) \
    .addCheck(
        check.isComplete("VendorID")  \
        .isUnique("gen_id")  \
        .hasUniqueness(["VendorID", "gen_id"], assertion=lambda x: x==1) \
        .hasCompleteness("Airport_fee", assertion=lambda x: x >= 0.95) \
        .isNonNegative("fare_amount")) \
        .useRepository(nyc_yellow_repository) \
        .saveOrAppendResult(verify_result_key_nyc_yoct) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check                         |check_level|check_status|constraint                                                                                                                |constraint_status|constraint_message                                                 |
+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+------------------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------+-----------------+-

In [91]:
# Loading from Repo again
ny_yellow_verification_result = nyc_yellow_repository.load() \
                                .before(ResultKey.current_milli_time()) \
                                .getSuccessMetricsAsDataFrame()

ny_yellow_verification_result.show(truncate=False)

+-----------+---------------------------+------------+---------------------+-------------+--------------------------+
|entity     |instance                   |name        |value                |dataset_date |tag                       |
+-----------+---------------------------+------------+---------------------+-------------+--------------------------+
|Column     |total_amount               |Sum         |1.0274988166009633E8 |1703404431776|nyc_yellow_oct_2023       |
|Column     |VendorID                   |Distinctness|8.517198352773839E-7 |1703404431776|nyc_yellow_oct_2023       |
|Column     |trip_distance              |Mean        |3.926694986351057    |1703404431776|nyc_yellow_oct_2023       |
|Dataset    |*                          |Size        |3522285.0            |1703404431776|nyc_yellow_oct_2023       |
|Column     |payment_type               |Compliance  |0.9560146325467701   |1703404431776|nyc_yellow_oct_2023       |
|Column     |VendorID                   |Completeness|1.

## Loading Metrics from metrics JSON file.


In [93]:
# metrics file path
# metrics_s3_path = "s3://{bucket_name}/nyc_tlc/pydeequ_metrics/ny_yellow_metrics.json"

metrics_path = "./nyc_metrics_yellow.json"
repository_from_file = FileSystemMetricsRepository(spark, path = metrics_path)

metrics_df = repository_from_file.load() \
                                .before(ResultKey.current_milli_time()) \
                                .getSuccessMetricsAsDataFrame()
metrics_df.show(truncate=False)

+-----------+---------------------------+------------+---------------------+-------------+--------------------------+
|entity     |instance                   |name        |value                |dataset_date |tag                       |
+-----------+---------------------------+------------+---------------------+-------------+--------------------------+
|Column     |total_amount               |Sum         |1.0274988166009633E8 |1703404431776|nyc_yellow_oct_2023       |
|Column     |VendorID                   |Distinctness|8.517198352773839E-7 |1703404431776|nyc_yellow_oct_2023       |
|Column     |trip_distance              |Mean        |3.926694986351057    |1703404431776|nyc_yellow_oct_2023       |
|Dataset    |*                          |Size        |3522285.0            |1703404431776|nyc_yellow_oct_2023       |
|Column     |payment_type               |Compliance  |0.9560146325467701   |1703404431776|nyc_yellow_oct_2023       |
|Column     |VendorID                   |Completeness|1.