In [1]:
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0

import io
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from haystack import Document, component, logging
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
from haystack.dataclasses import ByteStream

logger = logging.getLogger(__name__)


@component
class CSVToDocument:
    """
    Converts CSV files to Documents.

    By default, it uses UTF-8 encoding when converting files but
    you can also set a custom encoding.
    It can attach metadata to the resulting documents.

    ### Usage example

    ```python
    from haystack.components.converters.csv import CSVToDocument
    converter = CSVToDocument()
    results = converter.run(sources=["sample.csv"], meta={"date_added": datetime.now().isoformat()})
    documents = results["documents"]
    print(documents[0].content)
    # 'col1,col2\now1,row1\nrow2row2\n'
    ```
    """

    def __init__(self, encoding: str = "utf-8"):
        """
        Creates a CSVToDocument component.

        :param encoding:
            The encoding of the csv files to convert.
            If the encoding is specified in the metadata of a source ByteStream,
            it overrides this value.
        """
        self.encoding = encoding

    @component.output_types(documents=List[Document])
    def run(
        self,
        sources: List[Union[str, Path, ByteStream]],
        meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
    ):
        """
        Converts a CSV file to a Document.

        :param sources:
            List of file paths or ByteStream objects.
        :param meta:
            Optional metadata to attach to the documents.
            This value can be either a list of dictionaries or a single dictionary.
            If it's a single dictionary, its content is added to the metadata of all produced documents.
            If it's a list, the length of the list must match the number of sources, because the two lists will
            be zipped.
            If `sources` contains ByteStream objects, their `meta` will be added to the output documents.
        :returns:
            A dictionary with the following keys:
            - `documents`: Created documents
        """
        documents = []

        meta_list = normalize_metadata(meta, sources_count=len(sources))

        for source, metadata in zip(sources, meta_list):
            try:
                bytestream = get_bytestream_from_source(source)
            except Exception as e:
                logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
                continue
            try:
                encoding = bytestream.meta.get("encoding", self.encoding)
                data = io.BytesIO(bytestream.data).getvalue().decode(encoding=encoding)
            except Exception as e:
                logger.warning(
                    "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e
                )
                continue

            merged_metadata = {**bytestream.meta, **metadata}
            document = Document(content=data, meta=merged_metadata)
            documents.append(document)

        return {"documents": documents}

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import warnings
from helper import load_env
from haystack import Pipeline
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
from haystack_integrations.components.embedders.cohere.document_embedder import CohereDocumentEmbedder
warnings.filterwarnings('ignore')
load_env()

document_store = PineconeDocumentStore(
		index="industries",
		namespace="Classification",
        dimension=1024,
        spec={"serverless": {"region": "us-east-1", "cloud": "aws"}},
)

pipeline = Pipeline()
pipeline.add_component("converter", CSVToDocument())
pipeline.add_component("splitter", DocumentSplitter(split_by="passage", split_length=10, split_overlap=0))
pipeline.add_component("embedder", CohereDocumentEmbedder(model="embed-english-v3.0"))
pipeline.add_component("writer", DocumentWriter(document_store=document_store))

pipeline.connect("converter", "splitter")
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "writer")

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'
* 'smart_union' has been removed


<haystack.core.pipeline.pipeline.Pipeline object at 0x28022c910>
🚅 Components
  - converter: CSVToDocument
  - splitter: DocumentSplitter
  - embedder: CohereDocumentEmbedder
  - writer: DocumentWriter
🛤️ Connections
  - converter.documents -> splitter.documents (List[Document])
  - splitter.documents -> embedder.documents (List[Document])
  - embedder.documents -> writer.documents (List[Document])

In [3]:
from datetime import datetime

In [4]:
import pandas as pd
import time
# pipeline.run(sources=["sample.csv"], meta={"date_added": datetime.now().isoformat()})
df = pd.read_csv("Gics_modified_gpt4_v2.csv")
for index, row in df.iterrows():
    row.to_csv("temp_row.csv", index=False)
    small_df = pd.read_csv("temp_row.csv")
    print(small_df.tail(5))
    pipeline.run({"converter": {"sources": ["temp_row.csv"], "meta": {}}})
    time.sleep(1)


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  1.33it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  3.63it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.91it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  3.29it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.71it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  4.47it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.46it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  7.97it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  1.70it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  5.78it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  6.92it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  7.41it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.92it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  6.94it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.39it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  2.63it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.62it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  6.17it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.37it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  3.03it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.19it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  2.61it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.37it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  6.65it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.22it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00, 13.19it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.33it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  4.62it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.55it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  7.95it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  7.39it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00, 12.68it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.66it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  7.88it/s]


                                                    0
7                         Energy Equipment & Services
8                                            10101010
9                                  Oil & Gas Drilling
10  Drilling contractors or owners of drilling rig...
11  [\n  {\n    "risk": "Higher frequency of extre...


Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  8.03it/s]
Upserted vectors: 100%|██████████| 1/1 [00:00<00:00,  9.36it/s]


KeyboardInterrupt: 