In [6]:
from abc import abstractmethod,ABC
import pandas as pd
import yaml
from pathlib import Path
import numpy as np
from typing import Annotated
from zenml import step, pipeline
from SPARKS_project.src.utils.loggingfiles import logger
'''
This module has 2 classes
first class is the entity config , by use of   abstract method way, not the dataclass
2 nd class is the ingestion of the data from the source, which overides the first class
 zenml  is used to automatically keep a log of the artifacts instead of use having to keep it in memory
'''

class DataIngestionStrategy(ABC):
    @abstractmethod
    def ingest_data(self,data:pd.DataFrame)->Annotated[pd.DataFrame, "data"]:
        pass

class Dataingestion(DataIngestionStrategy):
    def __init__(self,datapath) :
        self.path=datapath

    def ingest_data(self)->Annotated[pd.DataFrame, "data"]:
        try:
            
            data=pd.read_csv(self.path)
            return data
        except Exception as e:
            print(f"could not read data from {self.path}   : {e}")

                                                      
    


    

# common functionalities, 
to be implemented in the common.py 

In [7]:
# we need a file to read the yaml file
#then another method to extract the  csv data, it takes input from the readyaml file
from SPARKS_project.src.CONSTANTS import CONFIG_INGESTION_YAML
def read_yaml(path=CONFIG_INGESTION_YAML):
    with open(path, "r") as file:
        data = yaml.safe_load(file)
        print(data)
        return data
    
def read_yaml_keys():
    try:
        yaml_dict = read_yaml()
        source_url =yaml_dict.get('data_ingestion', {}).get('source_url')
        print(source_url)
        return source_url
    except Exception as e:
        print(f"could not fetch csv path from yaml file   : {e}")




{'data_ingestion': {'source_url': 'C:\\Users\\User\\Downloads\\Iris.csv'}}
{'data_ingestion': {'source_url': 'C:\\Users\\User\\Downloads\\Iris.csv'}}
C:\Users\User\Downloads\Iris.csv


'C:\\Users\\User\\Downloads\\Iris.csv'

## The Data Ingestion pipeline implementation using @step
To  be implemented in the steps subpackage

In [8]:

@step(enable_cache=False)
def data_ingestion_pipeline()-> Annotated[pd.DataFrame,"csv_file"]:
    try:
        path=read_yaml_keys()
        print(path)
        path_obj=Dataingestion(path)
        data=path_obj.ingest_data()
        print(data.head(5))
        return data
       
    except Exception as e:
      
        print(f"an error     {e}    occured while ingesting data    :")

In [9]:
data_ingestion_pipeline()

{'data_ingestion': {'source_url': 'C:\\Users\\User\\Downloads\\Iris.csv'}}
C:\Users\User\Downloads\Iris.csv
C:\Users\User\Downloads\Iris.csv
   Id  SepalLengthCm  SepalWidthCm  PetalLengthCm  PetalWidthCm      Species
0   1            5.1           3.5            1.4           0.2  Iris-setosa
1   2            4.9           3.0            1.4           0.2  Iris-setosa
2   3            4.7           3.2            1.3           0.2  Iris-setosa
3   4            4.6           3.1            1.5           0.2  Iris-setosa
4   5            5.0           3.6            1.4           0.2  Iris-setosa


Unnamed: 0,Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,1,5.1,3.5,1.4,0.2,Iris-setosa
1,2,4.9,3.0,1.4,0.2,Iris-setosa
2,3,4.7,3.2,1.3,0.2,Iris-setosa
3,4,4.6,3.1,1.5,0.2,Iris-setosa
4,5,5.0,3.6,1.4,0.2,Iris-setosa
...,...,...,...,...,...,...
145,146,6.7,3.0,5.2,2.3,Iris-virginica
146,147,6.3,2.5,5.0,1.9,Iris-virginica
147,148,6.5,3.0,5.2,2.0,Iris-virginica
148,149,6.2,3.4,5.4,2.3,Iris-virginica
