In [20]:
from kafka import KafkaConsumer
import json
import csv
import os
from datetime import datetime

KAFKA_TOPIC = 'traffy_data'
KAFKA_BROKER = 'localhost:9092'
GROUP_ID = 'traffy_group'
OUTPUT_FILE = '../../data/traffy_cleaned_data.csv'
FLUSH_INTERVAL = 50
TIMEOUT_MS = 5000

# ‡∏ä‡∏∑‡πà‡∏≠‡∏Ñ‡∏≠‡∏•‡∏±‡∏°‡∏ô‡πå (Header)
CSV_FIELDS = [
    "ticket_id", "timestamp", "type_array", "organization", 
    "comment", "latitude", "longitude", "district", 
    "subdistrict", "province", "state", "star", "processing_time"
]

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_BROKER],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id=GROUP_ID,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None,
    consumer_timeout_ms=TIMEOUT_MS
)

In [21]:
folder_path = os.path.dirname(OUTPUT_FILE)
if folder_path and not os.path.exists(folder_path):
    os.makedirs(folder_path)

csv_file = open(OUTPUT_FILE, 'w', newline='', encoding='utf-8-sig') 
writer = csv.DictWriter(csv_file, fieldnames=CSV_FIELDS)

writer.writeheader()
print("üìù ‡∏™‡∏£‡πâ‡∏≤‡∏á‡πÑ‡∏ü‡∏•‡πå‡πÉ‡∏´‡∏°‡πà (Overwrite) ‡πÄ‡∏£‡∏µ‡∏¢‡∏ö‡∏£‡πâ‡∏≠‡∏¢")

try:
    count = 0
    print(f"üéß ‡∏Å‡∏≥‡∏•‡∏±‡∏á‡∏£‡∏±‡∏ö‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏• (‡∏à‡∏∞‡∏´‡∏¢‡∏∏‡∏î‡πÄ‡∏≠‡∏á‡πÄ‡∏°‡∏∑‡πà‡∏≠‡πÑ‡∏°‡πà‡∏°‡∏µ‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÄ‡∏Å‡∏¥‡∏ô {TIMEOUT_MS/1000} ‡∏ß‡∏¥)...")
    
    for message in consumer:
        raw_data = message.value
        
        if raw_data is None:
            continue

        try:
            # --- CLEANING LOGIC ---
            raw_type = raw_data.get('type', '')
            clean_type_list = raw_type.replace('{', '').replace('}', '').split(',') if raw_type else []

            coords_str = raw_data.get('coords', '')
            latitude = None
            longitude = None
            
            if coords_str and ',' in coords_str:
                parts = coords_str.split(',')
                if len(parts) == 2:
                    try:
                        longitude = float(parts[0].strip()) 
                        latitude = float(parts[1].strip())
                    except ValueError:
                        pass 

            try:
                star = int(float(raw_data.get('star', 0)))
            except:
                star = 0

            cleaned_record = {
                "ticket_id": raw_data.get('ticket_id'),
                "timestamp": raw_data.get('timestamp'),
                "type_array": str(clean_type_list),
                "organization": raw_data.get('organization'),
                "comment": raw_data.get('comment'),
                "latitude": latitude,        
                "longitude": longitude,        
                "district": raw_data.get('district'),
                "subdistrict": raw_data.get('subdistrict'),
                "province": raw_data.get('province'),
                "state": raw_data.get('state'),
                "star": star,                  
                "processing_time": datetime.now().isoformat()
            }

            # --- WRITE & LOG ---
            writer.writerow(cleaned_record)
            count += 1
            
            if count % FLUSH_INTERVAL == 0:
                csv_file.flush()
                print(f"‚úÖ Saved {count} records... (Last: {cleaned_record['ticket_id']})", end='\r')
            
        except Exception as e:
            print(f"\n‚ö†Ô∏è Error processing row: {e}")
    
    print(f"\nüéâ ‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏´‡∏°‡∏î‡πÅ‡∏•‡πâ‡∏ß! (Timeout {TIMEOUT_MS}ms)")

except KeyboardInterrupt:
    print(f"\nüõë ‡∏ú‡∏π‡πâ‡πÉ‡∏ä‡πâ‡∏™‡∏±‡πà‡∏á‡∏´‡∏¢‡∏∏‡∏î‡∏Å‡∏≤‡∏£‡∏ó‡∏≥‡∏á‡∏≤‡∏ô")

finally:
    csv_file.flush()
    csv_file.close()
    consumer.close()
    
    print("-" * 40)
    print(f"‚úÖ ‡πÄ‡∏™‡∏£‡πá‡∏à‡∏™‡∏¥‡πâ‡∏ô! ‡∏ö‡∏±‡∏ô‡∏ó‡∏∂‡∏Å‡∏£‡∏ß‡∏°‡∏ó‡∏±‡πâ‡∏á‡∏™‡∏¥‡πâ‡∏ô: {count} ‡∏£‡∏≤‡∏¢‡∏Å‡∏≤‡∏£")
    print(f"üìÇ ‡πÑ‡∏ü‡∏•‡πå‡∏≠‡∏¢‡∏π‡πà‡∏ó‡∏µ‡πà: {OUTPUT_FILE}")

üìù ‡∏™‡∏£‡πâ‡∏≤‡∏á‡πÑ‡∏ü‡∏•‡πå‡πÉ‡∏´‡∏°‡πà (Overwrite) ‡πÄ‡∏£‡∏µ‡∏¢‡∏ö‡∏£‡πâ‡∏≠‡∏¢
üéß ‡∏Å‡∏≥‡∏•‡∏±‡∏á‡∏£‡∏±‡∏ö‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏• (‡∏à‡∏∞‡∏´‡∏¢‡∏∏‡∏î‡πÄ‡∏≠‡∏á‡πÄ‡∏°‡∏∑‡πà‡∏≠‡πÑ‡∏°‡πà‡∏°‡∏µ‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÄ‡∏Å‡∏¥‡∏ô 5.0 ‡∏ß‡∏¥)...
‚úÖ Saved 1417150 records... (Last: 2024-HYX8PP)
üõë ‡∏ú‡∏π‡πâ‡πÉ‡∏ä‡πâ‡∏™‡∏±‡πà‡∏á‡∏´‡∏¢‡∏∏‡∏î‡∏Å‡∏≤‡∏£‡∏ó‡∏≥‡∏á‡∏≤‡∏ô
----------------------------------------
‚úÖ ‡πÄ‡∏™‡∏£‡πá‡∏à‡∏™‡∏¥‡πâ‡∏ô! ‡∏ö‡∏±‡∏ô‡∏ó‡∏∂‡∏Å‡∏£‡∏ß‡∏°‡∏ó‡∏±‡πâ‡∏á‡∏™‡∏¥‡πâ‡∏ô: 1417160 ‡∏£‡∏≤‡∏¢‡∏Å‡∏≤‡∏£
üìÇ ‡πÑ‡∏ü‡∏•‡πå‡∏≠‡∏¢‡∏π‡πà‡∏ó‡∏µ‡πà: ../../data/traffy_cleaned_data.csv
