[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Praanshu101/Data_Validation/blob/main/Lab2.ipynb)

#### Team members (Team 21):

Praanshu Patel (23110249)

Rishank Soni (23110277)

### Dataset Preparation

In [11]:
# Extraction of the data from the zip file (To be run in Google Colab)

import os
import zipfile

if not os.path.exists("/202412-citibike-tripdata"):
  !wget https://s3.amazonaws.com/tripdata/202412-citibike-tripdata.zip

zip_file_path = '/content/202412-citibike-tripdata.zip'
extraction_folder = '/202412-citibike-tripdata'

# Create the extraction folder
os.makedirs(extraction_folder, exist_ok=True)

# Extract into the folder
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extract('202412-citibike-tripdata_1.csv', path=extraction_folder)


In [12]:
!pip install pandera -q # (To be run in Google Colab)

In [13]:
import pandas as pd
import pandera as pa
import json

import os
import zipfile
import warnings

warnings.filterwarnings('ignore')

In [14]:
# Load the csv files from folder 202412-citibike-tripdata

path = '/202412-citibike-tripdata'
files = os.listdir(path)

# Load the csv files into a dataframe
df = pd.concat([pd.read_csv(path + '/' + f) for f in files], ignore_index = True)

# Display the first 5 rows of the dataframe
df.head()

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,B44E5B10AEE58AD0,classic_bike,2024-12-14 10:58:18.153,2024-12-14 11:11:11.308,Frederick Douglass Blvd & W 145 St,7954.12,E 138 St & 5 Ave,7809.13,40.823061,-73.941928,40.81449,-73.936153,member
1,BC252DC6A6011556,electric_bike,2024-12-12 14:46:12.473,2024-12-12 16:45:37.777,Madison Ave & E 99 St,7443.01,,,40.789485,-73.952429,40.78,-73.96,member
2,6FBE55EF6FE8736D,electric_bike,2024-12-11 07:55:18.770,2024-12-11 08:02:23.460,Columbia St & Kane St,4422.05,,,40.687632,-74.001626,40.69,-74.0,member
3,908890DE7FDCF9FE,electric_bike,2024-12-09 22:51:11.668,2024-12-09 22:57:43.495,E 13 St & 2 Ave,5820.08,E 10 St & 2 Ave,5746.02,40.731539,-73.985302,40.729708,-73.986598,member
4,D5D366379A4DC0A8,classic_bike,2024-12-10 18:48:40.063,2024-12-10 19:10:32.264,11 Ave & W 41 St,6726.01,E 25 St & 1 Ave,6004.07,40.760301,-73.998842,40.738177,-73.977387,member


#### Validation of key fields

Validating the following fields (using pandera):
- ride_id (string)
- rideable_type (categorical)
- started_at (string)
- ended_at (string)
- start_station_name (string)
- start_station_id (string)
- end_station_name (string)
- end_station_id (string)
- start_lat, start_lng (float)
- end_lat, end_lng (float)
- member_casual (categorical)

In [15]:
# Original dataframe columns and data types

df.dtypes

Unnamed: 0,0
ride_id,object
rideable_type,object
started_at,object
ended_at,object
start_station_name,object
start_station_id,object
end_station_name,object
end_station_id,object
start_lat,float64
start_lng,float64


##### Q1 a) Validating the datatype of each column

In [16]:
# Validating the schema of the dataframe (data types of the columns)

schema = pa.DataFrameSchema({
    "ride_id": pa.Column(pa.String),
    "rideable_type": pa.Column(pa.String),
    "started_at": pa.Column(pa.String),
    "ended_at": pa.Column(pa.String),
    "start_station_name": pa.Column(pa.String),
    "start_station_id": pa.Column(pa.String),
    "end_station_name": pa.Column(pa.String),
    "end_station_id": pa.Column(pa.String),
    "start_lat": pa.Column(pa.Float),
    "start_lng": pa.Column(pa.Float),
    "end_lat": pa.Column(pa.Float),
    "end_lng": pa.Column(pa.Float),
    "member_casual": pa.Column(pa.String)
})

try:
    schema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))

{
  "SCHEMA": {
    "SERIES_CONTAINS_NULLS": [
      {
        "schema": null,
        "column": "start_station_name",
        "check": "not_nullable",
        "error": "non-nullable series 'start_station_name' contains null values:7183      NaN8010      NaN9244      NaN10186     NaN10791     NaN         ... 995036    NaN999616    NaN999641    NaN999668    NaN999688    NaNName: start_station_name, Length: 625, dtype: object"
      },
      {
        "schema": null,
        "column": "start_station_id",
        "check": "not_nullable",
        "error": "non-nullable series 'start_station_id' contains null values:7183      NaN8010      NaN9244      NaN10186     NaN10791     NaN         ... 995036    NaN999616    NaN999641    NaN999668    NaN999688    NaNName: start_station_id, Length: 625, dtype: object"
      },
      {
        "schema": null,
        "column": "end_station_name",
        "check": "not_nullable",
        "error": "non-nullable series 'end_station_name' contains null val

Fixing the datatypes and errors:

In [33]:
# Converting the start_station_id and end_station_id to string
df['start_station_id'] = df['start_station_id'].astype(str)
df['end_station_id'] = df['end_station_id'].astype(str)

# Validate the schema of the dataframe
try:
    schema.validate(df, lazy=True)
    print("The dataframe is valid")
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))


{
  "SCHEMA": {
    "WRONG_DATATYPE": [
      {
        "schema": null,
        "column": "started_at",
        "check": "dtype('str')",
        "error": "expected series 'started_at' to have type str:failure cases:         index            failure_case0            0 2024-12-14 10:58:18.1531            3 2024-12-09 22:51:11.6682            4 2024-12-10 18:48:40.0633            5 2024-12-03 13:14:09.0264            6 2024-12-13 16:07:22.623...        ...                     ...996038  999995 2024-12-06 18:43:51.866996039  999996 2024-12-10 10:34:58.071996040  999997 2024-12-03 14:02:29.375996041  999998 2024-12-05 07:03:08.210996042  999999 2024-12-09 08:33:59.397[996043 rows x 2 columns]"
      },
      {
        "schema": null,
        "column": "ended_at",
        "check": "dtype('str')",
        "error": "expected series 'ended_at' to have type str:failure cases:         index            failure_case0            0 2024-12-14 11:11:11.3081            3 2024-12-09 22:57:43.4952       

##### Q1 b) Analyzing features and writing appropriate checks

In [18]:
# Analyzing each feature

# Rideaable type (only two types of rideable type)
print(df['rideable_type'].value_counts())

# Member_casual (only two types of member_casual)
print(df['member_casual'].value_counts())

# Checking if the start_station_id and end_station_id are the same
print("Number of rows with same start_station_id and end_station_id:")
print(df[df['start_station_id'] == df['end_station_id']].shape[0])

# Checking the range of latitude and longitude
print("Number of rows with latitude and longitude out of range:")
print(df[(df['start_lat'] > 90) | (df['start_lat'] < -90)].shape[0])
print(df[(df['start_lng'] > 180) | (df['start_lng'] < -180)].shape[0])
print(df[(df['end_lat'] > 90) | (df['end_lat'] < -90)].shape[0])
print(df[(df['end_lng'] > 180) | (df['end_lng'] < -180)].shape[0])

# Checking if the start_lat, start_lng and end_lat, end_lng are same
print("Number of rows with same start_lat and start_lng:")
print(df[(df['start_lat'] == df['end_lat']) & (df['start_lng'] == df['end_lng'])].shape[0])

# Checking if the start_time is greater than end_time
print("Number of rows with start_time greater than end_time:")
print(df[df['started_at'] > df['ended_at']].shape[0])

rideable_type
electric_bike    689945
classic_bike     306098
Name: count, dtype: int64
member_casual
member    882038
casual    114005
Name: count, dtype: int64
Number of rows with same start_station_id and end_station_id:
14470
Number of rows with latitude and longitude out of range:
0
0
0
0
Number of rows with same start_lat and start_lng:
14565
Number of rows with start_time greater than end_time:
0


Validating using a schema:

In [19]:
# Validating the above checks using schema

schema2 = pa.DataFrameSchema({
    "ride_id": pa.Column(pa.String),
    "rideable_type": pa.Column(pa.String, checks=[pa.Check.isin(["classic_bike", "electric_bike"])]),
    "started_at": pa.Column(pa.String),
    "ended_at": pa.Column(pa.String),
    "start_station_name": pa.Column(pa.String),
    "start_station_id": pa.Column(pa.String),
    "end_station_name": pa.Column(pa.String),
    "end_station_id": pa.Column(pa.String),
    "start_lat": pa.Column(pa.Float, checks=[pa.Check.greater_than(-90), pa.Check.less_than(90)]),
    "start_lng": pa.Column(pa.Float, checks=[pa.Check.greater_than(-180), pa.Check.less_than(180)]),
    "end_lat": pa.Column(pa.Float, checks=[pa.Check.greater_than(-90), pa.Check.less_than(90)]),
    "end_lng": pa.Column(pa.Float, checks=[pa.Check.greater_than(-180), pa.Check.less_than(180)]),
    "member_casual": pa.Column(pa.String, checks=[pa.Check.isin(["member", "casual"])])
})

try:
    schema2.validate(df, lazy=True)
    print("The data is valid")
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))


The data is valid


##### Q1 c) Adding data validation rules to verify that start time is lesser than end time (Using Pandera Decorators)

In [20]:
# Validate if the start_time is lesser than end_time using pandera decorators

# Defining the schema for the output of the function (time column should be greater than or equal to 0)
schema3 = pa.DataFrameSchema({
    'time': pa.Column(pa.Float, checks=[pa.Check.ge(0)], nullable=False)
})

@pa.check_output(schema3) # To Validate the output of the function

def validate_schema(df: pd.DataFrame) -> pd.DataFrame:
    # Convert the started_at and ended_at to datetime
    df['started_at'] = pd.to_datetime(df['started_at'])
    df['ended_at'] = pd.to_datetime(df['ended_at'])

    # Calculate the time taken for the ride
    df['time'] = (df['ended_at'] - df['started_at']).dt.total_seconds().astype(float)

    return df

# Validate the schema of the dataframe
df3 = validate_schema(df)
try:
    schema3.validate(df3, lazy=True)
    print("The data is valid")
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))

The data is valid


Q2
a)
Validate the datatype of each column using Great Expectations

In [21]:
# Remove any existing great_expectations directory
%rm -rf gx

# Install pandas and great_expectations

%pip install pandas
%pip install great_expectations -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.0/5.0 MB[0m [31m38.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m813.6/813.6 kB[0m [31m34.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.2/12.2 MB[0m [31m60.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.7/64.7 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m117.7/117.7 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m739.1/739.1 kB[0m [31m32.0 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following de

In [22]:
# Initialize the data context
import great_expectations as gx
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.checkpoint import CheckpointResult

import pandas as pd

In [23]:


# Get the Great Expectations context in file mode
context = gx.get_context(mode="file")

# Add a new data source for pandas dataframes
data_source_name = "my_data_source"
data_source = context.data_sources.add_pandas(name=data_source_name)

# Add a new data asset to the data source
data_asset_name = "my_dataframe_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)

# Define a batch for the whole dataframe
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")

# Set the batch parameters with the dataframe
batch_parameters = {"dataframe": df}

# Get the batch using the batch definition and parameters
batch = batch_definition.get_batch(batch_parameters)
print(batch.head(4))

  and should_run_async(code)


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

            ride_id  rideable_type              started_at  \
0  B44E5B10AEE58AD0   classic_bike 2024-12-14 10:58:18.153   
1  908890DE7FDCF9FE  electric_bike 2024-12-09 22:51:11.668   
2  D5D366379A4DC0A8   classic_bike 2024-12-10 18:48:40.063   
3  D56FA800710E6478   classic_bike 2024-12-03 13:14:09.026   

                 ended_at                  start_station_name  \
0 2024-12-14 11:11:11.308  Frederick Douglass Blvd & W 145 St   
1 2024-12-09 22:57:43.495                     E 13 St & 2 Ave   
2 2024-12-10 19:10:32.264                    11 Ave & W 41 St   
3 2024-12-03 13:16:23.278                     E 13 St & 2 Ave   

  start_station_id  end_station_name end_station_id  start_lat  start_lng  \
0          7954.12  E 138 St & 5 Ave        7809.13  40.823061 -73.941928   
1          5820.08   E 10 St & 2 Ave        5746.02  40.731539 -73.985302   
2          6726.01   E 25 St & 1 Ave        6004.07  40.760301 -73.998842   
3          5820.08   E 10 St & 2 Ave        5746.02  40

In [24]:
# Create an expectation suite
suite_name = "datatype_validation_suite"
suite = gx.ExpectationSuite(name=suite_name)
context.suites.add(suite)

  and should_run_async(code)


{
  "name": "datatype_validation_suite",
  "id": "41a445e3-6dfc-4ff3-b76b-862c72691135",
  "expectations": [],
  "meta": {
    "great_expectations_version": "1.3.2"
  },
  "notes": null
}

In [25]:
# Add expectations to the suite
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="ride_id", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="rideable_type", value_set=["docked_bike", "electric_bike", "classic_bike"]
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="started_at", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="ended_at", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="start_station_name", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="start_station_id", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="end_station_name", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="end_station_id", type_="str"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="start_lat", type_="float"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="start_lng", type_="float"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="end_lat", type_="float"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="end_lng", type_="float"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="member_casual", value_set=["member", "casual"]
    )
)





ExpectColumnValuesToBeInSet(id='345d2ab9-4128-426a-ae3e-41c6cf5873b0', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='member_casual', mostly=1, row_condition=None, condition_parser=None, value_set=['member', 'casual'])

In [26]:
# define the validation definition with the batch definition and suite
definition_name = "my_validation_definition"
validation_definition = gx.ValidationDefinition(
    data=batch_definition, suite=suite, name=definition_name
)
validation_definition = context.validation_definitions.add(validation_definition) # Add the validation definition to the context

In [27]:
validation_definition_name = "my_validation_definition"
validation_definition = context.validation_definitions.get(validation_definition_name) # Get the validation definition from the context

validation_results = validation_definition.run(batch_parameters=batch_parameters)
print(validation_results)

Calculating Metrics:   0%|          | 0/52 [00:00<?, ?it/s]

{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "batch_id": "my_data_source-my_dataframe_data_asset",
          "column": "ride_id",
          "type_": "str"
        },
        "meta": {},
        "id": "a14246bc-54ab-42ae-86ab-a06212b11166"
      },
      "result": {
        "element_count": 996043,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": [],
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_counts": [],
        "partial_unexpected_index_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config"

### Validation Results


The validation results for the dataset are as follows:

- **Total Expectations Evaluated**: 13
- **Successful Expectations**: 11
- **Unsuccessful Expectations**: 2


#### Successful Expectations:
1. `expect_column_values_to_be_of_type` for column `ride_id`
2. `expect_column_values_to_be_in_set` for column `rideable_type`
3. `expect_column_values_to_be_of_type` for column `start_station_name`
4. `expect_column_values_to_be_of_type` for column `start_station_id`
5. `expect_column_values_to_be_of_type` for column `end_station_name`
6. `expect_column_values_to_be_of_type` for column `end_station_id`
7. `expect_column_values_to_be_of_type` for column `start_lat`
8. `expect_column_values_to_be_of_type` for column `start_lng`
9. `expect_column_values_to_be_of_type` for column `end_lat`
10. `expect_column_values_to_be_of_type` for column `end_lng`
11. `expect_column_values_to_be_in_set` for column `member_casual`

#### Unsuccessful Expectations:
1. `expect_column_values_to_be_of_type` for column `started_at` (Expected: `str`, Observed: `datetime64`)
2. `expect_column_values_to_be_of_type` for column `ended_at` (Expected: `str`, Observed: `datetime64`)

The validation was partially successful with a majority of the expectations being met. However, there were issues with the data types of the `started_at` and `ended_at` columns, which were expected to be strings but were observed as datetime objects.

Q2. b) A mail when encountering a failure

In [28]:
# impot the actions from great_expectations
from great_expectations.checkpoint import (
    EmailAction
)

  and should_run_async(code)


In [29]:
context = gx.get_context()
validation_definitions = [
    context.validation_definitions.get("my_validation_definition")
]

In [30]:
email_action =[EmailAction(
    name="send_email_on_failure",
    smtp_address='smtp.gmail.com',
    smtp_port=587, # 587 is the default port for TLS
    receiver_emails="23110277@iitgn.ac.in", # Receiver email
    sender_login="23110249@iitgn.ac.in", # Sender email
    sender_password="yjxa mofg fvdt orhe",
    use_tls=True,
    notify_on="failure"# Notify on all results
)]

In [31]:
checkpoint_name = "my_checkpoint"
checkpoint = gx.Checkpoint(
    name=checkpoint_name,
    validation_definitions=validation_definitions, # List of ValidationDefinition objects
    actions=email_action, # List of Action objects
    result_format={"result_format": "COMPLETE"},
)

# Save the Checkpoint to the Data Context
context.checkpoints.add(checkpoint)

# Retrieve the Checkpoint later
checkpoint_name = "my_checkpoint"
checkpoint = context.checkpoints.get(checkpoint_name)

In [32]:
# Run the Check and send the results to the email action
validation_results = checkpoint.run(
    batch_parameters=batch_parameters, expectation_parameters=suite
)

Calculating Metrics:   0%|          | 0/52 [00:00<?, ?it/s]

ERROR:great_expectations.checkpoint.actions:Failed to authenticate to the SMTP server at address: smtp.gmail.com
