In [30]:
import csv
import os
from kafka import KafkaProducer

# กำหนด Kafka Broker
KAFKA_BROKER = '127.0.0.1:9092'

# สร้าง Kafka Producer
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])

# กำหนดขีดจำกัดของขนาดฟิลด์ที่ csv จะอ่านได้
csv.field_size_limit(1000 * 1024 * 1024)

def get_topic_from_file(file_path):
    """
    ดึงปีจากชื่อไฟล์เพื่อใช้เป็นชื่อ topic
    """
    base_name = os.path.basename(file_path)
    year = None
    # พยายามดึงปีจากชื่อไฟล์ (รองรับทั้งชื่อที่มี '_' และไม่มี '_')
    for part in base_name.replace('.', '_').split('_'):  # แปลง '.' เป็น '_' เพื่อแยกคำ
        if part.isdigit() and len(part) == 4:  # ตรวจสอบว่าเป็นตัวเลข 4 หลัก
            year = part
            break
    return year  # คืนค่าปีเป็น topic (เช่น '2018', '2019')


def process_csv(file_path):
    """
    ประมวลผลไฟล์ CSV และส่งข้อมูลไปยัง Kafka ตาม topic
    """
    print(f"Processing file: {file_path}")
    topic_name = get_topic_from_file(file_path)  # กำหนด topic จากปีในชื่อไฟล์
    if not topic_name:
        print(f"Cannot determine topic for file: {file_path}")
        return

    print(f"Using topic: {topic_name}")

    with open(file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            key = row.get('Title', 'default_key')  # ใช้คอลัมน์ 'Title' เป็น key หรือกำหนดค่า default
            value = '฿'.join([str(row[col]) for col in reader.fieldnames])  # แปลงข้อมูลทั้งหมดเป็น string

            # ส่งข้อมูลไปยัง Kafka
            producer.send(
                topic_name,
                key=str(key).encode('utf-8'),
                value=value.encode('utf-8')
            )

# รายชื่อไฟล์ CSV ที่ต้องการประมวลผล
csv_files = [
    '../ExtractedData/2018.csv', '../ExtractedData/2019.csv', '../ExtractedData/2020.csv', 
    '../ExtractedData/2021.csv', '../ExtractedData/2022.csv', '../ExtractedData/2023.csv',
    '../scopusNewData/scopus_papers_2018_full_details.csv', '../scopusNewData/scopus_papers_2019_full_details.csv', 
    '../scopusNewData/scopus_papers_2020_full_details.csv', '../scopusNewData/scopus_papers_2021_full_details.csv', 
    '../scopusNewData/scopus_papers_2022_full_details.csv', '../scopusNewData/scopus_papers_2023_full_details.csv', 
    '../scopusNewData/scopus_papers_2024_full_details.csv'
]

# ตรวจสอบว่าไฟล์มีอยู่จริงและประมวลผลแต่ละไฟล์
for csv_file in csv_files:
    if os.path.exists(csv_file):
        process_csv(csv_file)
    else:
        print(f"File not found: {csv_file}")


Processing file: ../ExtractedData/2018.csv
Using topic: 2018
Processing file: ../ExtractedData/2019.csv
Using topic: 2019
Processing file: ../ExtractedData/2020.csv
Using topic: 2020
Processing file: ../ExtractedData/2021.csv
Using topic: 2021
Processing file: ../ExtractedData/2022.csv
Using topic: 2022
Processing file: ../ExtractedData/2023.csv
Using topic: 2023
Processing file: ../scopusNewData/scopus_papers_2018_full_details.csv
Using topic: 2018
Processing file: ../scopusNewData/scopus_papers_2019_full_details.csv
Using topic: 2019
Processing file: ../scopusNewData/scopus_papers_2020_full_details.csv
Using topic: 2020
Processing file: ../scopusNewData/scopus_papers_2021_full_details.csv
Using topic: 2021
Processing file: ../scopusNewData/scopus_papers_2022_full_details.csv
Using topic: 2022
Processing file: ../scopusNewData/scopus_papers_2023_full_details.csv
Using topic: 2023
Processing file: ../scopusNewData/scopus_papers_2024_full_details.csv
Using topic: 2024


In [31]:
import os
import csv
from kafka import KafkaConsumer

# กำหนด Kafka Broker
KAFKA_BROKER = '127.0.0.1:9092'

# กำหนดจำนวนคอลัมน์ที่ต้องการ (8 คอลัมน์)
COLUMN_HEADERS = ['Title', 'Abstract', 'Author', 'Aggregation_Type', 'Publisher', 'Publication_Date', 'Institutions', 'Keywords']
MAX_COLUMNS = len(COLUMN_HEADERS)

# สร้างโฟลเดอร์สำหรับเก็บไฟล์ CSV ที่ส่งออก
OUTPUT_DIR = './output_csv'
os.makedirs(OUTPUT_DIR, exist_ok=True)

def consume_and_write_to_csv(topic_name):
    """
    ดึงข้อมูลจาก Kafka topic และเขียนลงไฟล์ CSV โดยใช้ Title เป็น Key และจำกัดไม่เกิน 8 คอลัมน์
    """
    print(f"Starting consumer for topic: {topic_name}")

    # สร้าง Kafka Consumer สำหรับ topic ที่กำหนด
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=[KAFKA_BROKER],
        group_id=f'{topic_name}_consumer_group',
        auto_offset_reset='latest',  # อ่านจากข้อความใหม่ที่สุด
        enable_auto_commit=False,  # ปิดการ commit อัตโนมัติ
        value_deserializer=lambda x: x.decode('utf-8'),
        key_deserializer=lambda x: x.decode('utf-8'),
        consumer_timeout_ms=5000  # หยุดเมื่อไม่มีข้อความใหม่ในเวลาที่กำหนด
    )

    # ชื่อไฟล์ CSV สำหรับ topic นี้
    output_file = os.path.join(OUTPUT_DIR, f'{topic_name}.csv')

    # เปิดไฟล์ CSV เพื่อเขียนข้อมูล
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(COLUMN_HEADERS)  # เขียน header

        message_count = 0  # ตัวนับข้อความ
        for message in consumer:
            value = message.value  # อ่าน value ของ message

            # แยก value กลับเป็นแต่ละฟิลด์โดยใช้ตัวแยก '฿'
            fields = value.split('฿')

            # ตรวจสอบว่าคอลัมน์เกิน 8 คอลัมน์หรือไม่
            if len(fields) > MAX_COLUMNS:
                fields = fields[:MAX_COLUMNS]  # ตัดฟิลด์ที่เกิน
            elif len(fields) < MAX_COLUMNS:
                # ถ้าฟิลด์น้อยกว่า 8 คอลัมน์ เติมช่องว่างให้ครบ
                fields.extend([''] * (MAX_COLUMNS - len(fields)))

            # เขียนข้อมูลในแต่ละ message ลง CSV
            writer.writerow(fields)
            message_count += 1

        print(f"Finished writing to {output_file}. Total messages: {message_count}")
    consumer.commit()  # คอมมิต offset หลังจากอ่านเสร็จ

    consumer.close()

# เรียกใช้คำสั่งสำหรับแต่ละ topic
consume_and_write_to_csv('2018')
consume_and_write_to_csv('2019')
consume_and_write_to_csv('2020')
consume_and_write_to_csv('2021')
consume_and_write_to_csv('2022')
consume_and_write_to_csv('2023')
consume_and_write_to_csv('2024')


Starting consumer for topic: 2018
Finished writing to ./output_csv/2018.csv. Total messages: 3993
Starting consumer for topic: 2019
Finished writing to ./output_csv/2019.csv. Total messages: 4495
Starting consumer for topic: 2020
Finished writing to ./output_csv/2020.csv. Total messages: 4963
Starting consumer for topic: 2021
Finished writing to ./output_csv/2021.csv. Total messages: 5481
Starting consumer for topic: 2022
Finished writing to ./output_csv/2022.csv. Total messages: 5695
Starting consumer for topic: 2023
Finished writing to ./output_csv/2023.csv. Total messages: 4684
Starting consumer for topic: 2024
Finished writing to ./output_csv/2024.csv. Total messages: 2162


In [27]:
import os
import csv
from kafka import KafkaConsumer

# กำหนด Kafka Broker
KAFKA_BROKER = '127.0.0.1:9092'

# กำหนดจำนวนคอลัมน์ที่ต้องการ (8 คอลัมน์)
COLUMN_HEADERS = ['Title', 'Abstract', 'Author', 'Aggregation_Type', 'Publisher', 'Publication_Date', 'Institutions', 'Keywords']
MAX_COLUMNS = len(COLUMN_HEADERS)

# สร้างโฟลเดอร์สำหรับเก็บไฟล์ CSV ที่ส่งออก
OUTPUT_DIR = './output_csv'
os.makedirs(OUTPUT_DIR, exist_ok=True)

def consume_and_write_to_csv(topic_name):
    """
    ดึงข้อมูลจาก Kafka topic และเขียนลงไฟล์ CSV โดยใช้ Title เป็น Key และจำกัดไม่เกิน 8 คอลัมน์
    """
    print(f"Starting consumer for topic: {topic_name}")

    # สร้าง Kafka Consumer สำหรับ topic ที่กำหนด
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=[KAFKA_BROKER],
        group_id=f'{topic_name}_consumer_group',
        auto_offset_reset='earliest',  # อ่านข้อความตั้งแต่แรก
        value_deserializer=lambda x: x.decode('utf-8'),
        key_deserializer=lambda x: x.decode('utf-8'),
        consumer_timeout_ms=5000  # หยุดเมื่อไม่มีข้อความใหม่ในเวลาที่กำหนด
    )

    # ชื่อไฟล์ CSV สำหรับ topic นี้
    output_file = os.path.join(OUTPUT_DIR, f'{topic_name}.csv')

    # เปิดไฟล์ CSV เพื่อเขียนข้อมูล
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(COLUMN_HEADERS)  # เขียน header

        message_count = 0  # ตัวนับข้อความ
        for message in consumer:
            value = message.value  # อ่าน value ของ message

            # แยก value กลับเป็นแต่ละฟิลด์โดยใช้ตัวแยก '฿'
            fields = value.split('฿')

            # ตัดหรือเติมให้มีจำนวนไม่เกิน 8 คอลัมน์
            fields = fields[:MAX_COLUMNS]  # ตัดฟิลด์ที่เกิน
            while len(fields) < MAX_COLUMNS:
                fields.append('')  # เติมช่องว่างถ้าข้อมูลน้อยกว่า 8 คอลัมน์

            # เขียนข้อมูลในแต่ละ message ลง CSV
            writer.writerow(fields)
            message_count += 1

        print(f"Finished writing to {output_file}. Total messages: {message_count}")
    consumer.close()

# เรียกใช้คำสั่งสำหรับแต่ละ topic
consume_and_write_to_csv('2018')
consume_and_write_to_csv('2019')
consume_and_write_to_csv('2020')
consume_and_write_to_csv('2021')
consume_and_write_to_csv('2022')
consume_and_write_to_csv('2023')
consume_and_write_to_csv('2024')


Starting consumer for topic: 2018
Finished writing to ./output_csv/2018.csv. Total messages: 3993
Starting consumer for topic: 2019
Finished writing to ./output_csv/2019.csv. Total messages: 4495
Starting consumer for topic: 2020
Finished writing to ./output_csv/2020.csv. Total messages: 4963
Starting consumer for topic: 2021
Finished writing to ./output_csv/2021.csv. Total messages: 5481
Starting consumer for topic: 2022
Finished writing to ./output_csv/2022.csv. Total messages: 22780
Starting consumer for topic: 2023
Finished writing to ./output_csv/2023.csv. Total messages: 18736
Starting consumer for topic: 2024
Finished writing to ./output_csv/2024.csv. Total messages: 8648
