# Data Pipeline and Data Engineering Best Practices

This notebook serves as the next step following our data exploration and the completion of the Exploratory Data Analysis (EDA) process, as detailed in the [Data Exploration Notebook](https://github.com/Elghandour-eng/Large-Purchases-by-the-State-of-CA-EDA/blob/main/Data-Exploration.ipynb).

## Objective

The primary goal of this notebook is to implement a data pipeline for cleaning and reformatting the data structure. This will ensure that the data is more manageable and does not affect our original dataset. By storing the processed data in a separate collection in MongoDB, we create a "data lake" that allows us to perform operations on a different dataset. This approach adheres to data engineering best practices by ensuring separation and maintaining the integrity of the original data.

## Key Steps

1. **Data Cleaning**: Remove any inconsistencies or errors in the data to ensure accuracy.
2. **Data Reformatting**: Adjust the data structure to enhance usability and accessibility.
3. **Data Storage**: Store the cleaned and reformatted data in a separate MongoDB collection to preserve the original dataset.
4. **Data Lake Creation**: Establish a data lake to facilitate operations on the processed data without impacting the base data.
5. **Best Practices**: Follow data engineering best practices to ensure efficient and effective data management.

By following these steps, we aim to optimize our data handling processes and support robust data analysis and engineering efforts.

---

## 1- FirstStage

This module defines a framework for creating and executing data pipelines.

### Classes

#### `Step`

- **Description**: An abstract class representing a step in a data pipeline.
- **Method**: 
  - `run(data)`: Abstract method to be implemented by subclasses, defining the operation on the input data.

#### `Pipeline`

- **Description**: Represents a data pipeline composed of multiple steps.
- **Methods**:
  - `__init__(steps)`: Initializes the pipeline with a list of `Step` instances.
  - `execute(data)`: Runs each step in sequence on the input data, returning the final processed data.


In [45]:
from abc import ABC, abstractmethod

import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Don't print warnings
import warnings
warnings.filterwarnings("ignore")

class Step(ABC):
    """
    Abstract class representing a step in a data pipeline.
    """

    @abstractmethod
    def run(self, data):
        """
        Abstract method to be implemented by subclasses.
        This method will define the operation to be performed on the data.
        
        :param data: The input data to be processed.
        :return: The processed data.
        """
        pass


class Pipeline:
    """
    Class representing a data pipeline consisting of multiple steps.
    """

    def __init__(self, steps):
        """
        Initialize the pipeline with a list of steps.

        :param steps: A list of Step instances to be executed in sequence.
        """
        self.steps = steps

    def execute(self, data):
        """
        Execute the pipeline by running each step in sequence.

        :param data: The initial input data to be processed through the pipeline.
        :return: The final processed data after all steps have been executed.
        """
        logging.info("Starting pipeline execution")
        for step in self.steps:
            try:
                logging.info(f"Executing step: {step.__class__.__name__}")
                data = step.run(data)
                logging.info(f"Completed step: {step.__class__.__name__}")
            except Exception as e:
                logging.error(f"Error in step {step.__class__.__name__}: {e}")
                raise
        logging.info("Pipeline execution completed")
        return data

## 2- SecondStage

The `SecondStage` is a critical phase in the data management process, utilizing the `MongoManagement` class to streamline operations involving MongoDB. This stage encompasses the following key activities:

- **Data Export**: 
  - Data is extracted from a specified MongoDB collection and converted into a Pandas DataFrame. This conversion facilitates easy manipulation and analysis of the data.

- **Data Processing**:
  - The extracted DataFrame is processed through a predefined pipeline. This pipeline consists of a series of steps designed to transform and clean the data according to the specific requirements of the data model. Each step in the pipeline applies a particular operation, ensuring the data is prepared for its intended use.

- **Data Insertion**:
  - After processing, the transformed data is inserted into a different MongoDB collection. This step ensures that the processed data is stored efficiently, making it readily available for further analysis or application use.

Overall, the `SecondStage` ensures that data is effectively managed, transformed, and stored, enhancing its utility and accessibility for subsequent operations.

In [65]:
from pymongo import MongoClient
import pandas as pd
from typing import Optional

class MongoManagement:
    """
    Class to manage MongoDB operations including exporting data to a DataFrame,
    processing it through a pipeline, and inserting it into another collection.
    """

    def __init__(self, mongo_client: MongoClient, database: str):
        """
        Initialize with a MongoDB client and database name.

        :param mongo_client: The MongoDB client instance.
        :param database: The name of the database.
        """
        self.mongo_client = mongo_client
        self.database = mongo_client[database]

    def export_dataset(self, collection: str, path: str) -> pd.DataFrame:
        """
        Export dataset from MongoDB collection to a DataFrame and save as CSV.

        :param collection: The name of the collection to export.
        :param path: The file path to save the CSV.
        :return: The exported DataFrame.
        """
        try:
            collection = self.database[collection]
            df = pd.DataFrame(list(collection.find()))
            df.to_csv(path, index=False)
            print("Data exported successfully.")
            return df
        except Exception as e:
            print(e)
            return pd.DataFrame()  # Return an empty DataFrame in case of an error

    def process_and_insert(self, df: pd.DataFrame, pipeline: 'Pipeline', target_collection: str):
        """
        Process the DataFrame through a pipeline and insert the result into a target collection.

        :param df: The DataFrame to process.
        :param pipeline: The pipeline to process the data.
        :param target_collection: The name of the target collection to insert data.
        """
        try:
            processed_data = pipeline.execute(df)
            self._prepare_and_insert(processed_data, target_collection)
            print("Data processed and inserted successfully.")
        except Exception as e:
            print(e)

    def add_df_to_collection(self, df: pd.DataFrame, collection: str):
        """
        Add a DataFrame to a collection in MongoDB.

        :param df: The DataFrame to add.
        :param collection: The name of the collection to add the DataFrame to.
        """
        try:
            self._prepare_and_insert(df, collection)
            print("Data added to collection successfully.")
        except Exception as e:
            print(e)

    def _prepare_and_insert(self, df: pd.DataFrame, collection: str):
        """
        Prepare the DataFrame by handling NaT values and ensuring datetime columns are timezone-naive,
        then insert it into the specified MongoDB collection.

        :param df: The DataFrame to prepare and insert.
        :param collection: The name of the collection to insert the DataFrame into.
        """
        try:
            # Handle NaT values and ensure datetime columns are timezone-naive
            datetime_columns = df.select_dtypes(include=['datetime64[ns]']).columns
            for col in datetime_columns:
                # Fill NaT with a default date
                df[col] = df[col].fillna(pd.Timestamp('1970-01-01'))
                # Ensure datetime columns are timezone-naive
                df[col] = df[col].dt.tz_localize(None)

            collection = self.database[collection]
            collection.insert_many(df.to_dict('records'))
        except Exception as e:
            print(f"Error inserting data into collection {collection}: {e}")

## 3- ThirdStage

The `Identify Model` stage is focused on defining and understanding the data models involved in the data pipeline process. This stage is crucial for ensuring that data is accurately represented and processed according to its structure and intended use. It involves the following components:

- **Input Model**:
  - The input model represents the structure of the data as it is initially extracted from the source. It includes various fields that describe the attributes of the data, such as:
    - `_id`: Unique identifier for each record.
    - `creation_date`: The date the record was created.
    - `purchase_date`: Optional date of purchase.
    - `fiscal_year`: The fiscal year associated with the record.
    - `lpa_number`, `purchase_order_number`, `requisition_number`: Various identifiers related to the acquisition process.
    - `acquisition_type`, `sub_acquisition_type`, `acquisition_method`, `sub_acquisition_method`: Details about the acquisition process.
    - `department_name`, `supplier_code`, `supplier_name`, `supplier_qualifications`, `supplier_zip_code`: Information about the department and supplier.
    - `calcard`, `item_name`, `item_description`: Details about the items involved.
    - `quantity`, `unit_price`, `total_price`: Financial details of the transaction.
    - `classification_codes`, `normalized_unspsc`, `commodity_title`, `class_number`, `class_title`, `family`, `family_title`, `segment`, `segment_title`: Classification and categorization details.
    - `location`: Optional location information.

- **Output Model**:
  - The output model will be identified and defined after applying the necessary processing steps in the pipeline. This model will reflect the transformed state of the data, tailored to meet the specific requirements of subsequent operations or analyses.

This stage ensures that both the input and output data models are clearly defined, facilitating effective data processing and transformation throughout the pipeline.

` Input Model `

In [None]:
from dataclasses import dataclass
from typing import Optional

@dataclass
class InputModel:
    _id: str
    creation_date: str
    purchase_date: Optional[str]
    fiscal_year: str
    lpa_number: Optional[str]
    purchase_order_number: str
    requisition_number: Optional[str]
    acquisition_type: str
    sub_acquisition_type: Optional[str]
    acquisition_method: str
    sub_acquisition_method: Optional[str]
    department_name: str
    supplier_code: Optional[float]
    supplier_name: Optional[str]
    supplier_qualifications: Optional[str]
    supplier_zip_code: Optional[str]
    calcard: str
    item_name: Optional[str]
    item_description: Optional[str]
    quantity: Optional[float]
    unit_price: Optional[str]
    total_price: Optional[str]
    classification_codes: Optional[str]
    normalized_unspsc: Optional[float]
    commodity_title: Optional[str]
    class_number: Optional[float]
    class_title: Optional[str]
    family: Optional[float]
    family_title: Optional[str]
    segment: Optional[float]
    segment_title: Optional[str]
    location: Optional[str]

## 4- FourthStage

In the `Identifying Steps` stage, specific actions are determined based on data exploration and insights. This involves:

- **Data Exploration**: Analyzing the data to understand its characteristics and identify areas needing transformation.
- **Insight Gathering**: Documenting observations to inform decision-making.
- **Decision Making**: Determining necessary transformations based on insights.
- **Step Definition**: Defining specific steps for data cleaning, normalization, or feature extraction.
- **Pipeline Integration**: Incorporating these steps into the pipeline in the correct sequence.

This stage ensures the data pipeline is customized to improve data quality and usability for further analysis.

`Step1` <br>
This step defines a pipeline step to remove specific columns from a DataFrame, as more than **50% of those columns = `null`

In [None]:
class RemoveSpecificColumnsStep(Step):
    """
    Pipeline step to remove specific columns from the DataFrame.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # List of columns to remove
        columns_to_remove = [
            'LPA Number',
            'Requisition Number',
            'Sub-Acquisition Type',
            'Sub-Acquisition Method',
            'Supplier Qualifications'
        ]
        
        # Drop the specified columns
        data = data.drop(columns=columns_to_remove, errors='ignore')
        
        return data

`Step2` <br>
Will remove `Normalized UNSPSC` as it same value `Classification Codes`

In [None]:
class RemoveNormalizedUNSPSCStep(Step):
    """
    Removes the 'Normalized UNSPSC' column from the DataFrame.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Drop the 'Normalized UNSPSC' column, ignoring errors if it doesn't exist
        data = data.drop(columns=['Normalized UNSPSC'], errors='ignore')
        return data

`Step3`<br>

will remove the rows containg missing price values as those rows is samll values, and from `EDA` those meaningless


In [29]:
class RemoveMissingPriceRowsStep(Step):
    """
    Removes rows with missing values in the 'Price' column.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Drop rows where 'Price' is missing
        data = data.dropna(subset=['Unit Price'])
        return data

`Step4`<br>
Perform One Hot Encoding to Faciel Year 2012-2013, 2013-2014, 2015 as 0,1,2

In [31]:
class OneHotEncodeFacielYearStep(Step):
    """
    Performs one-hot encoding on the 'Faciel Year' column, mapping specific years to 0, 1, and 2.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Define the mapping for the 'Faciel Year' column
        year_mapping = {
            '2012-2013': 0,
            '2013-2014': 1,
            '2014-2015': 2
        }
        
        # Map the 'Faciel Year' column using the defined mapping
        data['Fiscal Year'] = data['Fiscal Year'].map(year_mapping)
        

        return data

`Step5`<br>
Transfer Creation Date and Purchase Date from str to datetime


In [None]:
class ConvertToDateTimeStep(Step):
    """
    Converts 'Creation Date' and 'Purchase Date' columns from strings to datetime, ignoring NaN values.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Convert 'Creation Date' to datetime, ignoring errors (e.g., NaN values)
        data['Creation Date'] = pd.to_datetime(data['Creation Date'], errors='coerce')
        
        # Convert 'purchase_date' to datetime, ignoring errors (e.g., NaN values)
        data['Purchase Date'] = pd.to_datetime(data['Purchase Date'], errors='coerce')
        
        return data

`Step6`<br>
Fill Missing Values in `Purchase Date` With The Mean Of each `Department Name`, take to fill the Purchase Date.<br>
It is the difference in days between the Creation Date and the Purchase Date.

In [36]:
class FillMissingPurchaseDateStep(Step):
    """
    Fills missing values in 'Purchase Date' with the mean difference in days
    between 'Creation Date' and 'Purchase Date' for each 'Department Name'.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Ensure 'Creation Date' and 'Purchase Date' are datetime objects
        data['Creation Date'] = pd.to_datetime(data['Creation Date'], format='%m/%d/%Y', errors='coerce')
        data['Purchase Date'] = pd.to_datetime(data['Purchase Date'], errors='coerce')

        # Calculate the difference in days between 'Creation Date' and 'Purchase Date'
        data['days_diff'] = (data['Purchase Date'] - data['Creation Date']).dt.days

        # Calculate the mean difference in days for each 'Department Name'
        mean_days_diff = data.groupby('Department Name')['days_diff'].mean()

        # Function to fill missing 'Purchase Date' values
        def fill_purchase_date(row):
            if pd.isna(row['Purchase Date']) and not pd.isna(row['Creation Date']):
                mean_diff = mean_days_diff.get(row['Department Name'], 0)
                # Ensure mean_diff is a valid number
                if pd.notna(mean_diff):
                    return row['Creation Date'] + pd.Timedelta(days=int(mean_diff))
            return row['Purchase Date']

        # Apply the function to fill missing 'Purchase Date' values
        data['Purchase Date'] = data.apply(fill_purchase_date, axis=1)

        # Drop the temporary 'days_diff' column
        data = data.drop(columns=['days_diff'])

        return data

`Unit Test For Step6`

In [39]:
import numpy as np
from datetime import datetime

def test_fill_missing_purchase_date():
    # Create a sample DataFrame with 'Creation Date' in "MM/DD/YYYY" format
    data = pd.DataFrame({
        'Department Name': ['Dept A', 'Dept A', 'Dept B', 'Dept B', 'Dept C'],
        'Creation Date': [
            '01/01/2023', '01/05/2023', '02/01/2023', '02/10/2023', '03/01/2023'
        ],
        'Purchase Date': [
            '01/03/2023', np.nan, '02/05/2023', np.nan, '03/05/2023'
        ]
    })

    # Convert date columns to datetime with the specified format
    data['Creation Date'] = pd.to_datetime(data['Creation Date'], format='%m/%d/%Y', errors='coerce')
    data['Purchase Date'] = pd.to_datetime(data['Purchase Date'], format='%m/%d/%Y', errors='coerce')

    # Expected results
    expected_purchase_dates = [
        datetime(2023, 1, 3),  # No change
        datetime(2023, 1, 7),  # Filled with mean difference for Dept A
        datetime(2023, 2, 5),  # No change
        datetime(2023, 2, 14), # Filled with mean difference for Dept B
        datetime(2023, 3, 5)   # No change
    ]

    # Instantiate the step
    step = FillMissingPurchaseDateStep()

    # Apply the transformation
    processed_data = step.run(data)

    # Check the results
    result_purchase_dates = processed_data['Purchase Date'].tolist()
    assert result_purchase_dates == expected_purchase_dates, f"Test failed! Expected {expected_purchase_dates} but got {result_purchase_dates}"

    print("Test passed!")

test_fill_missing_purchase_date()

Test passed!


`Step7`<br>

Perform One-Hot-Encoding 0 for 'NO` and 1 for `YES`

In [20]:
class OneHotEncodeYesNoStep(Step):
    """
    Performs one-hot encoding on a specified column, mapping 'NO' to 0 and 'YES' to 1.
    """

    def __init__(self, column_name: str):
        self.column_name = column_name

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Map 'NO' to 0 and 'YES' to 1
        data[self.column_name] = data[self.column_name].map({'NO': 0, 'YES': 1})
        
        return data

`Step8`<br>
Remove the column `Requisition Number` as it dublicate with `Purchase Order Number`

In [21]:
class RemoveRequisitionNumberStep(Step):
    """
    Removes the 'Requisition Number' column from the DataFrame.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Drop the 'Requisition Number' column, ignoring errors if it doesn't exist
        data = data.drop(columns=['Requisition Number'], errors='ignore')
        return data

`Step9`<br>
Remove the duplicated rows that is counted 2084

In [22]:
class RemoveDuplicateRowsStep(Step):
    """
    Removes duplicate rows from the DataFrame, ignoring the '_Id' column.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Remove duplicates, ignoring the '_Id' column
        data = data.loc[:, data.columns != '_id'].drop_duplicates().join(data['_id'])
        return data


`Step10`<br>
Split the Location string into 3 columns, `Zip Code`, `Latitude` and `Longitude`.

In [52]:
class SplitLocationStep(Step):
    """
    Splits the 'Location' string into 'Zip Code', 'Latitude', and 'Longitude' columns,
    and removes the original 'Location' column.
    """

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        # Initialize new columns with NaN
        data['Zip Code'] = pd.NA
        data['Latitude'] = pd.NA
        data['Longitude'] = pd.NA

        # Process only non-NaN 'Location' entries
        non_nan_locations = data['Location'].dropna()

        # Step 1: Split 'Location' into 'Zip Code' and 'Coordinates'
        location_df = non_nan_locations.str.split('\n', expand=True)
        location_df.columns = ['Zip Code', 'Coordinates']

        # Step 2: Split 'Coordinates' into 'Latitude' and 'Longitude'
        coordinates_df = location_df['Coordinates'].str.split(',', expand=True)
        coordinates_df.columns = ['Latitude', 'Longitude']
        coordinates_df['Latitude'] = coordinates_df['Latitude'].str.replace('(', '', regex=False)
        coordinates_df['Longitude'] = coordinates_df['Longitude'].str.replace(')', '', regex=False)

        # Convert 'Latitude' and 'Longitude' to float with errors='coerce'
        coordinates_df['Latitude'] = pd.to_numeric(coordinates_df['Latitude'], errors='coerce')
        coordinates_df['Longitude'] = pd.to_numeric(coordinates_df['Longitude'], errors='coerce')

        # Assign the extracted values back to the original DataFrame
        data.loc[non_nan_locations.index, 'Zip Code'] = location_df['Zip Code']
        data.loc[non_nan_locations.index, 'Latitude'] = coordinates_df['Latitude']
        data.loc[non_nan_locations.index, 'Longitude'] = coordinates_df['Longitude']

        # Remove the original 'Location' column
        data = data.drop(columns=['Location'])

        return data

`Unit Test For Step10`

In [None]:
import unittest
from pandas.testing import assert_frame_equal


class TestSplitLocationStep(unittest.TestCase):
    def setUp(self):
        # Sample data for testing
        self.data = pd.DataFrame({
            'Location': [
                '12345\n(38.662263, -121.346136)',
                '67890\n(34.052235, -118.243683)',
                None,
                '54321\n(40.712776, -74.005974)',
                'InvalidFormat'
            ]
        })

        # Expected output
        self.expected_data = pd.DataFrame({
            'Zip Code': ['12345', '67890', None, '54321', None],
            'Latitude': [38.662263, 34.052235, None, 40.712776, None],
            'Longitude': [-121.346136, -118.243683, None, -74.005974, None]
        })

    def test_split_location(self):
        # Instantiate the step
        step = SplitLocationStep()

        # Apply the transformation
        result_data = step.run(self.data)

        # Check the results
        try:
            assert_frame_equal(result_data, self.expected_data)
            print("Test passed!")
        except AssertionError as e:
            print("Test failed!")
            print(e)

---

## 5- FifthStage
Connect to MongoDB


In [None]:
import os
from pymongo import MongoClient
from dotenv import load_dotenv

# Load environment variables from a .env file
load_dotenv()

# Get the MongoDB URI from the environment variable
# Get the MongoDB URI from the environment variable
mongo_uri = os.getenv('MONGO_URI')

# Connect to MongoDB
client = MongoClient(mongo_uri)

# Initialize MongoManagement
mongo_manager = MongoManagement(client, 'penny')

# Export data from the 'penny' collection
df = mongo_manager.export_dataset('penny', 'exported_data.csv')

In [23]:
df.shape # Check the shape of the DataFrame

(346018, 32)

## 6- SixStage

This stage is performing all `PipelineSteps` to Exported DB

In [53]:
# Identify the steps to be executed
steps = [
    RemoveSpecificColumnsStep(),
    RemoveNormalizedUNSPSCStep(),
    RemoveMissingPriceRowsStep(),
    OneHotEncodeFacielYearStep(),
    ConvertToDateTimeStep(),
    FillMissingPurchaseDateStep(),
    OneHotEncodeYesNoStep('CalCard'),
    RemoveRequisitionNumberStep(),
    RemoveDuplicateRowsStep(),
    SplitLocationStep()


]

# Create the pipeline
pipeline = Pipeline(steps)

# Apply the pipeline
cleaned_df = pipeline.execute(df)


    

2024-11-09 17:31:35,718 - INFO - Starting pipeline execution
2024-11-09 17:31:35,720 - INFO - Executing step: RemoveSpecificColumnsStep
2024-11-09 17:31:35,949 - INFO - Completed step: RemoveSpecificColumnsStep
2024-11-09 17:31:35,950 - INFO - Executing step: RemoveNormalizedUNSPSCStep
2024-11-09 17:31:36,225 - INFO - Completed step: RemoveNormalizedUNSPSCStep
2024-11-09 17:31:36,226 - INFO - Executing step: RemoveMissingPriceRowsStep
2024-11-09 17:31:36,579 - INFO - Completed step: RemoveMissingPriceRowsStep
2024-11-09 17:31:36,581 - INFO - Executing step: OneHotEncodeFacielYearStep
2024-11-09 17:31:36,630 - INFO - Completed step: OneHotEncodeFacielYearStep
2024-11-09 17:31:36,631 - INFO - Executing step: ConvertToDateTimeStep
2024-11-09 17:31:37,220 - INFO - Completed step: ConvertToDateTimeStep
2024-11-09 17:31:37,221 - INFO - Executing step: FillMissingPurchaseDateStep
2024-11-09 17:31:40,330 - INFO - Completed step: FillMissingPurchaseDateStep
2024-11-09 17:31:40,331 - INFO - Exec

In [55]:
cleaned_df.shape # Check the shape of the cleaned DataFrame

(343904, 28)

## 7- SevenStage 

In [75]:
''' Export the cleaned DataFrame to a MonoDB new collection'''
mongo_manager.process_and_insert(df, pipeline, 'Pipeline')

2024-11-09 17:48:43,351 - INFO - Starting pipeline execution
2024-11-09 17:48:43,352 - INFO - Executing step: RemoveSpecificColumnsStep
2024-11-09 17:48:43,709 - INFO - Completed step: RemoveSpecificColumnsStep
2024-11-09 17:48:43,709 - INFO - Executing step: RemoveNormalizedUNSPSCStep
2024-11-09 17:48:43,992 - INFO - Completed step: RemoveNormalizedUNSPSCStep
2024-11-09 17:48:43,993 - INFO - Executing step: RemoveMissingPriceRowsStep
2024-11-09 17:48:44,350 - INFO - Completed step: RemoveMissingPriceRowsStep
2024-11-09 17:48:44,350 - INFO - Executing step: OneHotEncodeFacielYearStep
2024-11-09 17:48:44,400 - INFO - Completed step: OneHotEncodeFacielYearStep
2024-11-09 17:48:44,400 - INFO - Executing step: ConvertToDateTimeStep
2024-11-09 17:48:44,955 - INFO - Completed step: ConvertToDateTimeStep
2024-11-09 17:48:44,956 - INFO - Executing step: FillMissingPurchaseDateStep
2024-11-09 17:48:48,466 - INFO - Completed step: FillMissingPurchaseDateStep
2024-11-09 17:48:48,466 - INFO - Exec

Data processed and inserted successfully.


## 8- Eight Step 

Indetify `OutPutModel`

In [76]:
cleaned_df.dtypes # Check the data types of the cleaned DataFrame

Creation Date            datetime64[ns]
Purchase Date            datetime64[ns]
Fiscal Year                       int64
Purchase Order Number            object
Acquisition Type                 object
Acquisition Method               object
Department Name                  object
Supplier Code                   float64
Supplier Name                    object
Supplier Zip Code                object
CalCard                           int64
Item Name                        object
Item Description                 object
Quantity                        float64
Unit Price                       object
Total Price                      object
Classification Codes             object
Commodity Title                  object
Class                           float64
Class Title                      object
Family                          float64
Family Title                     object
Segment                         float64
Segment Title                    object
_id                              object


In [77]:
from dataclasses import dataclass
from typing import Optional
import datetime

@dataclass
class OutputModel:
    creation_date: Optional[datetime.datetime]
    purchase_date: Optional[datetime.datetime]
    fiscal_year: Optional[int]
    purchase_order_number: Optional[str]
    acquisition_type: Optional[str]
    acquisition_method: Optional[str]
    department_name: Optional[str]
    supplier_code: Optional[float]
    supplier_name: Optional[str]
    supplier_zip_code: Optional[str]
    calcard: Optional[int]
    item_name: Optional[str]
    item_description: Optional[str]
    quantity: Optional[float]
    unit_price: Optional[str]
    total_price: Optional[str]
    classification_codes: Optional[str]
    commodity_title: Optional[str]
    class_: Optional[float]  # 'class' is a reserved keyword in Python, so use 'class_'
    class_title: Optional[str]
    family: Optional[float]
    family_title: Optional[str]
    segment: Optional[float]
    segment_title: Optional[str]
    _id: Optional[str]
    zip_code: Optional[str]
    latitude: Optional[str]
    longitude: Optional[str]