In [5]:
import ijson
import requests
import json
import os
import time
from datetime import datetime

In [7]:
# Elasticsearch configuration
ES_HOST = "http://localhost:9200"
INDEX_NAME = "articles_content_title"

In [8]:
class ElasticsearchUploader:
    def __init__(self, es_host=ES_HOST):
        self.es_host = es_host
        self.session = requests.Session()
        self.session.headers.update({'Content-Type': 'application/json'})
        
    def test_connection(self):
        """Test kết nối Elasticsearch"""
        try:
            response = self.session.get(f"{self.es_host}")
            if response.status_code == 200:
                cluster_info = response.json()
                print(f" Connected to Elasticsearch {cluster_info['version']['number']}")
                return True
            else:
                print(f" Connection failed: {response.status_code}")
                return False
        except Exception as e:
            print(f" Connection error: {e}")
            return False
    
    def create_index(self, index_name=INDEX_NAME):
        """Tạo index với mapping"""
        
        # Xóa index cũ nếu tồn tại
        self.session.delete(f"{self.es_host}/{index_name}")
        
        mapping = {
            "mappings": {
                "properties": {
                    "article_id": {"type": "keyword"},
                    "url": {"type": "keyword"},
                    "title": {
                        "type": "text",
                        "analyzer": "standard",
                        "fields": {
                            "raw": {"type": "keyword"},
                            "suggest": {"type": "completion"}
                        }
                    },
                    "content": {
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "date": {
                        "type": "date",
                        "format": "yyyy-MM-dd'T'HH:mm:ss'Z'||yyyy-MM-dd||epoch_millis"
                    },
                    "images": {"type": "keyword"},
                    "created_at": {"type": "date"},
                    "updated_at": {"type": "date"}
                }
            },
            "settings": {
                "number_of_shards": 2,
                "number_of_replicas": 1,
                "refresh_interval": "30s"
            }
        }
        
        try:
            response = self.session.put(
                f"{self.es_host}/{index_name}",
                data=json.dumps(mapping)
            )
            
            if response.status_code in [200, 201]:
                print(f" Index '{index_name}' created successfully")
                return True
            else:
                print(f" Failed to create index: {response.text}")
                return False
                
        except Exception as e:
            print(f" Error creating index: {e}")
            return False
    
    def bulk_upload(self, documents):
        """Upload documents bằng bulk API"""
        
        # Tạo bulk request body
        bulk_body = ""
        for doc in documents:
            # Action line
            action = {
                "index": {
                    "_index": doc["_index"],
                    "_id": doc["_id"]
                }
            }
            bulk_body += json.dumps(action) + "\n"
            
            # Document line
            bulk_body += json.dumps(doc["_source"]) + "\n"
        
        try:
            response = self.session.post(
                f"{self.es_host}/_bulk",
                data=bulk_body,
                headers={'Content-Type': 'application/x-ndjson'},
                timeout=120
            )
            
            if response.status_code == 200:
                result = response.json()
                
                # Đếm success và errors
                success_count = 0
                errors = []
                
                for item in result.get('items', []):
                    if 'index' in item:
                        if item['index'].get('status') in [200, 201]:
                            success_count += 1
                        else:
                            errors.append(item['index'])
                
                return success_count, errors
            else:
                print(f" Bulk upload failed: {response.status_code} - {response.text}")
                return 0, documents
                
        except Exception as e:
            print(f" Bulk upload error: {e}")
            return 0, documents
    
    def search(self, query, size=10):
        """Search documents"""
        search_body = {
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["title^3", "content^1"],
                    "type": "best_fields",
                    "fuzziness": "AUTO"
                }
            },
            "highlight": {
                "fields": {
                    "title": {},
                    "content": {
                        "fragment_size": 150,
                        "number_of_fragments": 3
                    }
                }
            },
            "size": size
        }
        
        try:
            response = self.session.post(
                f"{self.es_host}/{INDEX_NAME}/_search",
                data=json.dumps(search_body)
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f" Search failed: {response.text}")
                return None
                
        except Exception as e:
            print(f" Search error: {e}")
            return None
    
    def get_index_stats(self, index_name=INDEX_NAME):
        """Lấy thống kê index"""
        try:
            response = self.session.get(f"{self.es_host}/{index_name}/_stats")
            if response.status_code == 200:
                stats = response.json()
                doc_count = stats['indices'][index_name]['total']['docs']['count']
                size_bytes = stats['indices'][index_name]['total']['store']['size_in_bytes']
                return doc_count, size_bytes
            else:
                return 0, 0
        except:
            return 0, 0

In [9]:
def stream_json_and_upload(json_file_path, batch_size=1000):
    """
    Stream JSON file và upload bằng requests
    """
    
    # Khởi tạo uploader
    uploader = ElasticsearchUploader()
    
    # Test connection
    if not uploader.test_connection():
        return False
    
    # Create index
    if not uploader.create_index():
        return False
    
    print(f" Processing file: {json_file_path}")
    print(f" File size: {os.path.getsize(json_file_path) / (1024**3):.2f} GB")
    print(f" Batch size: {batch_size}")
    print("-" * 50)
    
    # Counters
    total_processed = 0
    total_success = 0
    total_errors = 0
    batch = []
    start_time = time.time()
    
    try:
        with open(json_file_path, 'rb') as file:
            parser = ijson.kvitems(file, '')
            
            for article_id, article_data in parser:
                try:
                    # Validate
                    if not isinstance(article_data, dict):
                        continue
                    
                    # Create document
                    doc = {
                        "_index": INDEX_NAME,
                        "_id": article_id,
                        "_source": {
                            "article_id": article_id,
                            "url": article_data.get("url", ""),
                            "title": article_data.get("title", ""),
                            "content": article_data.get("content", ""),
                            "date": article_data.get("date"),
                            "images": article_data.get("images", []),
                            "created_at": datetime.now().isoformat(),
                            "updated_at": datetime.now().isoformat()
                        }
                    }
                    
                    batch.append(doc)
                    
                    # Upload batch khi đủ
                    if len(batch) >= batch_size:
                        success_count, errors = uploader.bulk_upload(batch)
                        
                        total_processed += len(batch)
                        total_success += success_count
                        total_errors += len(errors) if errors else 0
                        
                        # Progress update
                        elapsed = time.time() - start_time
                        speed = total_processed / elapsed if elapsed > 0 else 0
                        
                        print(f" Processed: {total_processed:,} |  {total_success:,} |  {total_errors} |  {speed:.0f} docs/sec")
                        
                        if errors:
                            print(f"    Last batch errors: {len(errors)}")
                        
                        batch = []  # Reset batch
                        
                except Exception as e:
                    print(f" Error processing {article_id}: {e}")
                    continue
            
            # Upload batch cuối
            if batch:
                success_count, errors = uploader.bulk_upload(batch)
                total_processed += len(batch)
                total_success += success_count
                total_errors += len(errors) if errors else 0
                
    except Exception as e:
        print(f" Error reading file: {e}")
        return False
    
    # Final stats
    elapsed = time.time() - start_time
    avg_speed = total_processed / elapsed if elapsed > 0 else 0
    success_rate = (total_success / total_processed * 100) if total_processed > 0 else 0
    
    print(f"\n UPLOAD COMPLETED!")
    print(f"⏱  Total time: {elapsed/60:.1f} minutes")
    print(f" Documents processed: {total_processed:,}")
    print(f" Successful: {total_success:,} ({success_rate:.1f}%)")
    print(f" Errors: {total_errors}")
    print(f" Average speed: {avg_speed:.0f} docs/sec")
    
    # Refresh index
    try:
        uploader.session.post(f"{uploader.es_host}/{INDEX_NAME}/_refresh")
        print(" Index refreshed")
    except:
        pass
    
    # Verify upload
    doc_count, size_bytes = uploader.get_index_stats()
    print(f" Final index stats: {doc_count:,} docs, {size_bytes/(1024**2):.1f} MB")
    
    return True

def test_search_functionality():
    """Test search sau khi upload"""
    uploader = ElasticsearchUploader()
    
    print("\n TESTING SEARCH")
    print("-" * 30)
    
    # Test search
    results = uploader.search("TuSimple autonomous truck", size=3)
    
    if results and results['hits']['total']['value'] > 0:
        print(f" Found {results['hits']['total']['value']} results")
        
        for i, hit in enumerate(results['hits']['hits'], 1):
            source = hit['_source']
            print(f"\n{i}. {source['title']}")
            print(f"   Score: {hit['_score']:.2f}")
            print(f"   URL: {source['url']}")
            
            # Highlight
            if 'highlight' in hit and 'content' in hit['highlight']:
                print(f"   Preview: {hit['highlight']['content'][0]}...")
    else:
        print(" No search results found")

In [10]:
if __name__ == "__main__":
    print(" ELASTICSEARCH UPLOADER WITH REQUESTS")
    print("=" * 50)
    
    # Configuration
    json_file = "database_article_to_alls.json"  # Thay đường dẫn file của bạn
    batch_size = 2000
    
    # Check file exists
    if not os.path.exists(json_file):
        print(f" File not found: {json_file}")
        exit(1)
    
    # Confirm upload
    file_size_gb = os.path.getsize(json_file) / (1024**3)
    print(f" File: {json_file} ({file_size_gb:.2f} GB)")
    
    response = input("Proceed with upload? (y/N): ")
    if response.lower() != 'y':
        print("Cancelled.")
        exit()
    
    # Run upload
    success = stream_json_and_upload(json_file, batch_size)
    
    if success:
        # Test search
        test_search_functionality()
        print("\n All done!")
    else:
        print("\n Upload failed!")

 ELASTICSEARCH UPLOADER WITH REQUESTS
 File: database_article_to_alls.json (1.01 GB)
 Connected to Elasticsearch 9.0.1
 Index 'articles_content_title' created successfully
 Processing file: database_article_to_alls.json
 File size: 1.01 GB
 Batch size: 2000
--------------------------------------------------
 Processed: 2,000 |  2,000 |  0 |  427 docs/sec
 Processed: 4,000 |  4,000 |  0 |  448 docs/sec
 Processed: 6,000 |  6,000 |  0 |  434 docs/sec
 Processed: 8,000 |  8,000 |  0 |  495 docs/sec
 Processed: 10,000 |  10,000 |  0 |  487 docs/sec
 Processed: 12,000 |  12,000 |  0 |  503 docs/sec
 Processed: 14,000 |  14,000 |  0 |  503 docs/sec
 Processed: 16,000 |  16,000 |  0 |  487 docs/sec
 Processed: 18,000 |  18,000 |  0 |  493 docs/sec
 Processed: 20,000 |  20,000 |  0 |  495 docs/sec
 Processed: 22,000 |  22,000 |  0 |  509 docs/sec
 Processed: 24,000 |  24,000 |  0 |  518 docs/sec
 Processed: 26,000 |  26,000 |  0 |  527 docs/sec
 Processed: 28,000 |  28,000 |  0 |  533 docs/sec