In [4]:
import pandas as pd 
import os

def split_dataset(dataset_path, target_folder, num_files):
    # Ensure the target folder exists
    os.makedirs(target_folder, exist_ok=True)
    
    # Load the dataset
    data = pd.read_csv(dataset_path)
    
    # Split the dataset into smaller chunks
    chunk_size = len(data) // num_files
    chunks = [data[i:i + chunk_size] for i in range(0, data.shape[0], chunk_size)]
    
    # Save each chunk to a new file in the target folder
    for i, chunk in enumerate(chunks):
        chunk.to_csv(f"{target_folder}/data_chunk_{i}.csv", index=False)

# Example usage
split_dataset('/home/sachin/DSP/car_price_prediction/data/car_details.csv', '/home/sachin/DSP/airflow-spike-master/input_data/raw-data', 1000)

In [3]:
df = pd.read_csv('/home/sachin/DSP/car_price_prediction/data/car_details.csv')
len(df)

8128

In [5]:
df.head()

Unnamed: 0,name,year,selling_price,km_driven,fuel,seller_type,transmission,owner,mileage,engine,max_power,seats,car_company_name
0,Maruti Swift Dzire VDI,2014,450000,145500,Diesel,Individual,Manual,First Owner,23.4,1248.0,74.0,5.0,Maruti
1,Skoda Rapid 1.5 TDI Ambition,2014,370000,120000,Diesel,Individual,Manual,Second Owner,21.14,1498.0,103.52,5.0,Skoda
2,Honda City 2017-2020 EXi,2006,158000,140000,Petrol,Individual,Manual,Third Owner,17.7,1497.0,78.0,5.0,Honda
3,Hyundai i20 Sportz Diesel,2010,225000,127000,Diesel,Individual,Manual,First Owner,23.0,1396.0,90.0,5.0,Hyundai
4,Maruti Swift VXI BSIII,2007,130000,120000,Petrol,Individual,Manual,First Owner,16.1,1298.0,88.2,5.0,Maruti


In [19]:
import great_expectations as ge
import numpy as np
import datetime

In [24]:

# Pull the file path from the previous task using XCom
#file_path = kwargs['ti'].xcom_pull(key='file_path', task_ids='read-data')

# Load the dataset into a Pandas DataFrame
df1 = pd.read_csv('/home/sachin/DSP/airflow-spike-master/input_data/raw-data/data_chunk_5.csv')
df = ge.from_pandas(df1)


# Expect the "name" column to exist and be a string
df.expect_column_to_exist("name")
df.expect_column_values_to_be_of_type("name", 'str')

# Expect the "year" column to be integers between 1990 and the current year

current_year = datetime.datetime.now().year
df.expect_column_values_to_be_between("year", 1990, current_year)



# Expect "fuel" to be one of the specified options
df.expect_column_values_to_be_in_set("fuel", ["Diesel", "Petrol", "CNG", "LPG", "Electric"])

# Expect "seller_type" to be one of the specified options
df.expect_column_values_to_be_in_set("seller_type", ["Individual", "Dealer", "Trustmark Dealer"])

# Expect "transmission" to be one of the specified options
df.expect_column_values_to_be_in_set("transmission", ["Manual", "Automatic"])

# Expect "owner" to be one of the specified options
df.expect_column_values_to_be_in_set("owner", ["First Owner", "Second Owner", "Third Owner", "Fourth & Above Owner", "Test Drive Car"])


# Expect "seats" to be a non-negative integer and commonly between 2 to 8
df.expect_column_values_to_be_of_type("seats", "int")
df.expect_column_values_to_be_between("seats", 2, 8)

# Expect "car_company_name" to be a string
df.expect_column_values_to_be_of_type("car_company_name", "str")

df.save_expectation_suite('/home/sachin/DSP/airflow-spike-master/output_data/car_data.json')
#kwargs['ti'].xcom_push(key='validation_results', value=validation_results)

In [26]:

# Pull the file path from the previous task using XCom
#file_path = kwargs['ti'].xcom_pull(key='file_path', task_ids='read-data')

# Load the dataset into a Pandas DataFrame
df = pd.read_csv('/home/sachin/DSP/airflow-spike-master/input_data/raw-data/data_chunk_15.csv')

context = ge.data_context.DataContext('/home/sachin/DSP/airflow-spike-master/output_data')
expectation_suite = context.get_expectation_suite('car_data')

# Create a Great Expectations dataset from your DataFrame
ge_df = ge.from_pandas(df, expectation_suite=expectation_suite)

# Validate the dataset against your Expectation Suite
validation_results = ge_df.validate()

# Initialize the storage paths for good and bad data
good_data_path = '/home/sachin/DSP/airflow-spike-master/output_data/good_data/'
bad_data_path = '/home/sachin/DSP/airflow-spike-master/output_data/bad_data/'

# Check if validation passed or failed
# Check if validation passed or failed
if validation_results["success"]:
    # No data quality issues found, store the file in good_data
    print("Success")
    df.to_csv(os.path.join(good_data_path, os.path.basename(file_path)), index=False)
else:
    # Since Great Expectations does not provide row-level failures, we assume all rows failed
    df.to_csv(os.path.join(bad_data_path, os.path.basename(file_path)), index=False)

# Push the validation results to XCom for other tasks to use
#kwargs['ti'].xcom_push(key='validation_results', value=validation_results)

ConfigNotFoundError: Error: No gx directory was found here!
    - Please check that you are in the correct directory or have specified the correct directory.
    - If you have never run Great Expectations in this project, please run `great_expectations init` to get started.
