## Simple ETL ingestor pipeline to upload data to Zilliz cloud vector database

#### Step 1 : Text processing

In [11]:
import pandas as pd


In [12]:
# load data
file_name = "crops_state (1).csv"

metadata = {
    "source": "Department of Statistics Malaysia",
    "dataset_name": "Crop Area and Production by State",
    "source_url": "https://www.dosm.gov.my",
    "data_year": "2017-2022",
    "file_name" : f'{file_name}'
}

df = pd.read_csv(f'/home/digital_dungeon01/rag-chatbot-mvp/database/data-collection/{file_name}')
print(df.head())


      state        date   crop_type  planted_area  production
0  Malaysia  2017-01-01  cash_crops       20763.1    217847.5
1  Malaysia  2018-01-01  cash_crops       19028.2    215087.6
2  Malaysia  2019-01-01  cash_crops       17082.0    221310.2
3  Malaysia  2020-01-01  cash_crops       19090.9    223506.8
4  Malaysia  2021-01-01  cash_crops       20070.3    239011.9


In [13]:
def row_to_text(row):
    return (
        f"In {row['state']}, on {row['date']}, "
        f"the crop type is {row['crop_type']} with a planted area of "
        f"{row['planted_area']} hectares that produced {row['production']} tonnes."
    )

In [14]:
df["text"] = df.apply(row_to_text, axis=1) 
df_processed = df 
df_processed.head()

Unnamed: 0,state,date,crop_type,planted_area,production,text
0,Malaysia,2017-01-01,cash_crops,20763.1,217847.5,"In Malaysia, on 2017-01-01, the crop type is c..."
1,Malaysia,2018-01-01,cash_crops,19028.2,215087.6,"In Malaysia, on 2018-01-01, the crop type is c..."
2,Malaysia,2019-01-01,cash_crops,17082.0,221310.2,"In Malaysia, on 2019-01-01, the crop type is c..."
3,Malaysia,2020-01-01,cash_crops,19090.9,223506.8,"In Malaysia, on 2020-01-01, the crop type is c..."
4,Malaysia,2021-01-01,cash_crops,20070.3,239011.9,"In Malaysia, on 2021-01-01, the crop type is c..."


#### Step 2 : Text Chunking

In [15]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from datetime import datetime

In [None]:
splitter = RecursiveCharacterTextSplitter (chunk_size=200, chunk_overlap=50)

chunks = []
chunk_counter = 0

# Loop each row
for _, row in df_processed.iterrows():
    split_texts = splitter.split_text(row["text"])
    chunk_counter += 1
    
    for chunk_text in split_texts:
        chunks.append({
            "text": chunk_text,
            
            # data fields
            "state": str(row["state"]),
            "date": str(row["date"]),
            "crop_type": str(row["crop_type"]),
            "planted_area": float(row["planted_area"]),
            "production": float(row["production"]),
            
            # metadata
            "source": metadata["source"],
            "dataset_name": metadata["dataset_name"],
            "source_url": metadata["source_url"],
            "data_year": metadata["data_year"],
            
            # tracking
            "file_name" : metadata["file_name"],
            "chunk_id": f"chunk_{chunk_counter:06d}",
            "created_at": datetime.now().isoformat()
            
        })


In [26]:
print(f"total : {len(chunks)}")
print(chunks[:2])

total : 864
[{'text': 'In Malaysia, on 2017-01-01, the crop type is cash_crops with a planted area of 20763.1 hectares that produced 217847.5 tonnes.', 'state': 'Malaysia', 'date': '2017-01-01', 'crop_type': 'cash_crops', 'planted_area': 20763.1, 'production': 217847.5, 'source': 'Department of Statistics Malaysia', 'dataset_name': 'Crop Area and Production by State', 'source_url': 'https://www.dosm.gov.my', 'data_year': '2017-2022', 'file_name': 'crops_state (1).csv', 'chunk_id': 'chunk_000001', 'created_at': '2025-10-25T02:15:57.455629'}, {'text': 'In Malaysia, on 2018-01-01, the crop type is cash_crops with a planted area of 19028.2 hectares that produced 215087.6 tonnes.', 'state': 'Malaysia', 'date': '2018-01-01', 'crop_type': 'cash_crops', 'planted_area': 19028.2, 'production': 215087.6, 'source': 'Department of Statistics Malaysia', 'dataset_name': 'Crop Area and Production by State', 'source_url': 'https://www.dosm.gov.my', 'data_year': '2017-2022', 'file_name': 'crops_state (1

#### Step 3 : Embedding
> - gonna use local embedding = transformer - cuda 

In [27]:
import torch
from sentence_transformers import SentenceTransformer

# cuda
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer('all-MiniLM-L6-v2', device=device)

# model.to(device)

In [28]:
texts = [chunk["text"] for chunk in chunks]
embeddings = model.encode(texts, show_progress_bar=True, convert_to_tensor=False)

for i, chunk in enumerate(chunks):
    chunk["vector"] = embeddings[i].tolist()


Batches:   0%|          | 0/27 [00:00<?, ?it/s]

Batches: 100%|██████████| 27/27 [00:01<00:00, 16.25it/s]


In [33]:
print(chunks[0])
print(f"total : {len(chunks)} chunks")

{'text': 'In Malaysia, on 2017-01-01, the crop type is cash_crops with a planted area of 20763.1 hectares that produced 217847.5 tonnes.', 'state': 'Malaysia', 'date': '2017-01-01', 'crop_type': 'cash_crops', 'planted_area': 20763.1, 'production': 217847.5, 'source': 'Department of Statistics Malaysia', 'dataset_name': 'Crop Area and Production by State', 'source_url': 'https://www.dosm.gov.my', 'data_year': '2017-2022', 'file_name': 'crops_state (1).csv', 'chunk_id': 'chunk_000001', 'created_at': '2025-10-25T02:15:57.455629', 'vector': [0.009799950756132603, 0.04136119782924652, -0.020146677270531654, -0.006750763393938541, 0.01361558586359024, -0.061234068125486374, 0.019660567864775658, -0.0285214651376009, -0.05232812464237213, -0.009885789826512337, 0.08972034603357315, -0.14824290573596954, -0.02272200584411621, -0.027730803936719894, -0.030332693830132484, 0.02045491151511669, 0.009871172718703747, -0.06126254424452782, -0.04810376465320587, -0.008865313604474068, 0.096079006791

#### Step 4 : Upload to Zilliz Cloud and Upload

In [None]:
from pymilvus import MilvusClient
from agent.config import settings

client = MilvusClient(
    uri=settings.ZILLIZ_URL,
    token=settings.ZILLIZ_TOKEN
)

In [38]:
collection_name = "crop_by_state_data"

# drop collection if exists
if collection_name in client.list_collections():
    client.drop_collection(collection_name)

# Create collection if not exists
if collection_name not in client.list_collections():
    client.create_collection(
        collection_name=collection_name,
        dimension=384,  
        metric_type="COSINE",
        auto_id=True
    )

In [39]:
result = client.insert(collection_name=collection_name, data=chunks)
print(result)

{'insert_count': 864, 'ids': [461027871141728175, 461027871141728176, 461027871141728177, 461027871141728178, 461027871141728179, 461027871141728180, 461027871141728181, 461027871141728182, 461027871141728183, 461027871141728184, 461027871141728185, 461027871141728186, 461027871141728187, 461027871141728188, 461027871141728189, 461027871141728190, 461027871141728191, 461027871141728192, 461027871141728193, 461027871141728194, 461027871141728195, 461027871141728196, 461027871141728197, 461027871141728198, 461027871141728199, 461027871141728200, 461027871141728201, 461027871141728202, 461027871141728203, 461027871141728204, 461027871141728205, 461027871141728206, 461027871141728207, 461027871141728208, 461027871141728209, 461027871141728210, 461027871141728211, 461027871141728212, 461027871141728213, 461027871141728214, 461027871141728215, 461027871141728216, 461027871141728217, 461027871141728218, 461027871141728219, 461027871141728220, 461027871141728221, 461027871141728222, 4610278711

In [40]:
client.flush(collection_name)

In [41]:
# print stats
print(client.get_collection_stats(collection_name))


{'row_count': 864}


#### Step 5 : Test the semantic search

In [None]:
from langchain_huggingface import HuggingFaceEmbeddings


device = "cuda" if torch.cuda.is_available() else "cpu"
device = "cpu"
embeddings_model = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2', model_kwargs={'device': device})
client = MilvusClient(uri=settings.ZILLIZ_URL, token=settings.ZILLIZ_TOKEN)
collection_name = "crop_by_state_data"

In [43]:
def semantic_search(query, k):
  
    query_vector = embeddings_model.embed_query(query)
    
    results = client.search(
        collection_name=collection_name,
        data=[query_vector],
        limit=k,
        output_fields=[
            "text", "state", "date", "crop_type", "planted_area", "production",
            "source", "dataset_name", "source_url", "data_year",
            "chunk_id", "created_at"
        ]
    )  
    
     # json output
    json_results = []
    for hits in results:
        for hit in hits:
            json_results.append({
                "text": hit['entity']['text'],
                
                # data fields
                "state": hit['entity']['state'],
                "date": hit['entity']['date'],
                "crop_type": hit['entity']['crop_type'],
                "planted_area": hit['entity']['planted_area'],
                "production": hit['entity']['production'],
                
                # metadata
                "source": hit['entity']['source'],
                "dataset_name": hit['entity']['dataset_name'],
                "source_url": hit['entity']['source_url'],
                "data_year": hit['entity']['data_year'],
                
                # tracking
                "chunk_id": hit['entity']['chunk_id'],
                "created_at": hit['entity']['created_at'],
                
                # score
                "score": hit['distance']
            })
    
    return json_results

In [44]:
results = semantic_search("crop production in Johor", 12)

In [45]:
results

[{'text': 'In Johor, on 2021-01-01, the crop type is industrial_crops with a planted area of 12975.7 hectares that produced 123560.5 tonnes.',
  'state': 'Johor',
  'date': '2021-01-01',
  'crop_type': 'industrial_crops',
  'planted_area': 12975.7,
  'production': 123560.5,
  'source': 'Department of Statistics Malaysia',
  'dataset_name': 'Crop Area and Production by State',
  'source_url': 'https://www.dosm.gov.my',
  'data_year': '2017-2022',
  'chunk_id': 'chunk_000089',
  'created_at': '2025-10-25T02:15:57.474848',
  'score': 0.8633418679237366},
 {'text': 'In Johor, on 2020-01-01, the crop type is industrial_crops with a planted area of 12463.1 hectares that produced 110575.5 tonnes.',
  'state': 'Johor',
  'date': '2020-01-01',
  'crop_type': 'industrial_crops',
  'planted_area': 12463.1,
  'production': 110575.5,
  'source': 'Department of Statistics Malaysia',
  'dataset_name': 'Crop Area and Production by State',
  'source_url': 'https://www.dosm.gov.my',
  'data_year': '2017