In [None]:
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics.pairwise import cosine_similarity
import re
import time
from collections import Counter
import json
import boto3
import zstandard as zstd
from io import BytesIO
import os
import pickle

def cluster_material_descriptions(input_file, material_type='FIN', n_clusters=3000, 
                                 similarity_threshold=0.75, min_cluster_size=5,
                                 bucket=None, bucket_prefix='',
                                 deployment_path='deployment_package'):
    """
    Efficient clustering approach with minimal normalization and optimized merging.
    Embeddings are stored in S3, models and metadata are stored locally.
    """
    start_time = time.time()
    print(f"Starting clustering for {material_type} with initial K={n_clusters}")
    
    # Load and filter data
    print("Loading data...")
    df = pd.read_csv(input_file)
    df_fin = df[df['material_type'] == material_type].copy().reset_index(drop=True)
    print(f"Found {len(df_fin)} {material_type} records")
    
    # Minimal text cleaning
    def clean_text(text):
        text = str(text).lower()
        text = re.sub(r'[^a-z0-9\s\/\-\.]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    
    df_fin['clean_desc'] = df_fin['material_description'].apply(clean_text)
    
    # Generate embeddings
    print("Generating embeddings...")
    model = SentenceTransformer('all-MiniLM-L6-v2')
    batch_size = 2000
    embeddings = []
    
    for i in range(0, len(df_fin), batch_size):
        batch_descs = df_fin['clean_desc'].iloc[i:i+batch_size].tolist()
        batch_embeddings = model.encode(batch_descs, show_progress_bar=False)
        embeddings.extend(batch_embeddings)
        if (i // batch_size) % 10 == 0:
            print(f"Processed {min(i+batch_size, len(df_fin))}/{len(df_fin)}")
    
    embeddings = np.array(embeddings)
    
    # Store embeddings in S3 with zstandard compression
    if bucket:
        print("Storing embeddings in S3 with zstandard compression...")
        s3_client = boto3.client('s3')
        
        # Convert to float16 for size reduction
        embeddings_16 = embeddings.astype(np.float16)
        
        # Serialize to bytes
        buffer = BytesIO()
        np.save(buffer, embeddings_16)
        serialized_data = buffer.getvalue()
        
        # Compress with zstandard
        compressor = zstd.ZstdCompressor(level=3)
        compressed_data = compressor.compress(serialized_data)
        
        # Create S3 path for embeddings
        prefix = f"{bucket_prefix}/" if bucket_prefix else ""
        embeddings_s3_key = f"{prefix}embeddings/{material_type}/embeddings.npy.zst"
        
        # Store in S3
        s3_client.put_object(
            Bucket=bucket,
            Key=embeddings_s3_key,
            Body=compressed_data,
            Metadata={
                "material_type": material_type,
                "dimensions": f"{embeddings_16.shape}",
                "dtype": "float16",
                "compression": "zstandard"
            }
        )
        
        # Create metadata.json file (stored locally in deployment package)
        metadata = {
            "material_type": material_type,
            "embedding_path": embeddings_s3_key,
            "embedding_bucket": bucket,
            "model_version": "v1",
            "training_date": time.strftime("%Y-%m-%d"),
            "num_samples": len(embeddings)
        }
        
        # Ensure the material type directory exists in deployment package
        material_dir = os.path.join(deployment_path, material_type)
        os.makedirs(material_dir, exist_ok=True)
        
        with open(os.path.join(material_dir, "metadata.json"), "w") as f:
            json.dump(metadata, f, indent=4)
        
        print(f"Embeddings stored at: s3://{bucket}/{embeddings_s3_key}")
    
    # Perform initial clustering
    print(f"Clustering with K={n_clusters}...")
    kmeans = MiniBatchKMeans(n_clusters=n_clusters, random_state=42, 
                            batch_size=500, n_init=3, max_iter=50)
    cluster_labels = kmeans.fit_predict(embeddings)
    
    # Calculate cluster sizes
    cluster_sizes = pd.Series(cluster_labels).value_counts()
    
    # Optimized merging approach
    print("Performing optimized merging...")
    
    # Only merge very small clusters with their nearest neighbor
    small_clusters = cluster_sizes[cluster_sizes < min_cluster_size].index.tolist()
    centroids = kmeans.cluster_centers_
    
    # Precompute similarities for small clusters
    small_cluster_centroids = centroids[small_clusters]
    all_centroids = centroids
    
    # Use matrix operations for faster similarity calculation
    similarity_matrix = cosine_similarity(small_cluster_centroids, all_centroids)
    
    # For each small cluster, find the most similar non-small cluster
    for idx, small_cluster in enumerate(small_clusters):
        # Set self-similarity to -1 to avoid matching with itself
        similarity_matrix[idx, small_cluster] = -1
        
        # Find the most similar cluster
        most_similar = np.argmax(similarity_matrix[idx])
        similarity_score = similarity_matrix[idx, most_similar]
        
        # Merge if similarity is above threshold
        if similarity_score > similarity_threshold:
            cluster_labels[cluster_labels == small_cluster] = most_similar
    
    # Renumber clusters
    unique_clusters = np.unique(cluster_labels)
    mapping = {old: new for new, old in enumerate(unique_clusters)}
    final_labels = np.array([mapping[label] for label in cluster_labels])
    
    # Generate cluster names
    print("Generating cluster names...")
    cluster_names = {}
    
    for cluster_id in range(len(unique_clusters)):
        cluster_indices = np.where(final_labels == cluster_id)[0]
        cluster_descs = df_fin['material_description'].iloc[cluster_indices].tolist()
        
        if not cluster_descs:
            cluster_names[cluster_id] = f"cluster_{cluster_id}"
            continue
        
        # Use the most common description as the cluster name
        desc_counter = Counter(cluster_descs)
        most_common_desc = desc_counter.most_common(1)[0][0]
        
        # Clean for use as a cluster name
        clean_name = clean_text(most_common_desc)
        clean_name = re.sub(r'\s+', '_', clean_name)
        clean_name = clean_name[:50]  # Limit length
        
        cluster_names[cluster_id] = clean_name
    
    # Add results to dataframe
    df_fin['proposedkey'] = [cluster_names[label] for label in final_labels]
    df_fin['cluster'] = final_labels
    
    # Sort the dataframe by cluster number
    df_fin = df_fin.sort_values('cluster').reset_index(drop=True)
    
    # Create cluster_keywords.json file for manual editing (stored locally)
    cluster_keywords = df_fin[['cluster', 'proposedkey']].drop_duplicates().set_index('cluster')['proposedkey'].to_dict()
    
    # Ensure the material type directory exists in deployment package
    material_dir = os.path.join(deployment_path, material_type)
    os.makedirs(material_dir, exist_ok=True)
    
    with open(os.path.join(material_dir, "cluster_keywords.json"), "w") as f:
        json.dump(cluster_keywords, f, indent=4)
    
    # Save the model (stored locally in deployment package)
    model_path = os.path.join(material_dir, "model.pkl")
    with open(model_path, "wb") as f:
        pickle.dump(kmeans, f)
    
    # Save output (sorted by cluster)
    output_cols = ['material_number', 'material_type', 'material_description', 'proposedkey', 'cluster']
    output_file = f'output_optimized_clustering_{material_type}.csv'
    df_fin[output_cols].to_csv(output_file, index=False)
    
    # Print statistics
    end_time = time.time()
    total_time = end_time - start_time
    cluster_sizes = df_fin['cluster'].value_counts()
    print(f"\nCluster Statistics:")
    print(f"Total clusters: {len(cluster_sizes)}")
    print(f"Clusters with > 10 items: {len(cluster_sizes[cluster_sizes > 10])}")
    print(f"Clusters with > 5 items: {len(cluster_sizes[cluster_sizes > 5])}")
    print(f"Clusters with 1 item: {len(cluster_sizes[cluster_sizes == 1])}")
    print(f"Clusters with 2 items: {len(cluster_sizes[cluster_sizes == 2])}")
    print(f"Processing time: {total_time/60:.2f} minutes")
    print(f"Output saved to {output_file} (sorted by cluster number)")
    
    # Print deployment information
    print(f"\nDeployment Files Created:")
    print(f"- Model: {model_path}")
    print(f"- Cluster keywords: {os.path.join(material_dir, 'cluster_keywords.json')}")
    print(f"- Metadata: {os.path.join(material_dir, 'metadata.json')}")
    if bucket:
        print(f"- Embeddings: s3://{bucket}/{embeddings_s3_key}")
    
    return df_fin

if __name__ == "__main__":
    # Parameters
    input_file = 'input.csv'
    material_type = 'FIN'
    initial_k = 3000
    similarity_threshold = 0.75
    min_cluster_size = 5
    
    # S3 parameters
    bucket = 'your-embedding-bucket'  # Replace with your S3 bucket name
    bucket_prefix = 'keyword-recommendation'  # Optional prefix
    
    # Use your actual deployment package path
    deployment_path = '/home/sagemaker-user/keyword-recommendation/sample_deployment/deployment_package'
    
    result = cluster_material_descriptions(
        input_file, 
        material_type, 
        n_clusters=initial_k, 
        similarity_threshold=similarity_threshold,
        min_cluster_size=min_cluster_size,
        bucket=bucket,
        bucket_prefix=bucket_prefix,
        deployment_path=deployment_path
    )