In [2]:
import pandas as pd
import pandera as pa
import json
from typing import Optional, Any

class DataSchemaAndStatistics:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.schema: Optional[pa.DataFrameSchema] = None

    def infer_schema(self) -> pa.DataFrameSchema:
        """
        Infers a schema from the dataset based on column data types.
        """
        # Automatically infer the schema based on the data's structure
        self.schema = pa.infer_schema(self.data)
        print("Schema inferred automatically from dataset.")
        return self.schema

    def validate_data(self, new_data: pd.DataFrame) -> Optional[pd.DataFrame]:
        """
        Validates new data against the inferred schema.
        """
        if not self.schema:
            raise ValueError("Schema has not been inferred. Run infer_schema() first.")
        
        try:
            validated_data = self.schema.validate(new_data, lazy=True)
            print("New data validated successfully.")
            return validated_data
        except pa.errors.SchemaErrors as e:
            print("Schema validation errors found:")
            print(e)
            return None

    def save_schema(self, file_path: str):
        """
        Saves the inferred schema to a specified file path in JSON format.
        """
        if not self.schema:
            raise ValueError("Schema has not been inferred. Run infer_schema() first.")
        
        # Convert schema to a dictionary and save it as JSON
        schema_dict = self.schema.to_json()
        with open(file_path, 'w') as file:
            json.dump(schema_dict, file, indent=4)
        print(f"Schema saved successfully at {file_path}")

    def load_schema(self, file_path: str):
        """
        Loads a schema from a specified file path in JSON format.
        """
        with open(file_path, 'r') as file:
            schema_dict = json.load(file)
        self.schema = pa.DataFrameSchema.from_json(schema_dict)
        print(f"Schema loaded successfully from {file_path}")


In [3]:
# Load your data
data = pd.read_csv("all_merged_zones_weather_demand_data.csv")
exclude_columns = ["value-units", "zone", "subba-name", "datetime"]
float_cols = ["precipMM", "precipInches"]

# Convert all other columns to integers
for col in data.columns:
    if col not in exclude_columns and col not in float_cols:
        data[col] = pd.to_numeric(data[col], errors='coerce').fillna(0).astype(int)  # Convert and handle any NaNs
    elif col in float_cols:
        data[col] = pd.to_numeric(data[col], errors='coerce').fillna(0).astype(float)  # Convert and handle any NaNs
    else:
        data[col] = data[col].astype(str)


# Initialize the schema and stats generator
schema_stats_generator = DataSchemaAndStatistics(data)


  data = pd.read_csv("all_merged_zones_weather_demand_data.csv")


In [28]:
stats = schema_stats_generator.generate_statistics()  # Already calculated stats in Airflow
tfdv.visualize_statistics(stats)

Statistics generated successfully.


In [None]:
schema = schema_stats_generator.infer_schema()

# After new data validation, you can visualize anomalies if they exist
sample_data = {
    "datetime": ["2019-06-05T17", "2019-06-05T18"],
    "tempF": [82, 85],
    "windspeedMiles": [12, 6],
    "weatherCode": [176, 113],
    "precipMM": [0.3, 0.0],
    "precipInches": [0.0, 0.0],
    "humidity": [81, 25],
    "visibility": [9, 10],
    "visibilityMiles": [5, 6],
    "pressure": [1008, 1008],
    "pressureInches": [30, 30],
    "cloudcover": [87, 9],
    "HeatIndexC": [32, 29],
    "HeatIndexF": [90, 84],
    "DewPointC": [24, 12],
    "DewPointF": [76, 53],
    "WindChillC": [28, 30],
    "WindChillF": [82, 85],
    "WindGustMiles": [23, 9],
    "WindGustKmph": [36, 14],
    "FeelsLikeC": [32, 29],
    "FeelsLikeF": [90, 84],
    "uvIndex": [6, 7],
    "subba-name": ["ERCO - Coast", "ERCO - Far West"],
    "value": [13395, 3442],
    "value-units": ["megawatthours", "megawatthours"],
    "zone": ["COAS", "FWES"]
}

# Convert the dictionary to a DataFrame
new_data = pd.DataFrame(sample_data)
anomalies = schema_stats_generator.validate_data(new_data)

Schema inferred automatically from dataset.




New data validated successfully.
