In [10]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy

import json
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import Counter
from IPython.display import HTML


In [11]:
# 初始化执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)

env.add_jars("file:///opt/flink-1.20.0/lib/flink-sql-connector-kafka-1.17.2.jar")

# Kafka 源配置
kafka_servers = "209a01:9092,209a02:9092,209a03:9092"
# kafka_servers = "209a01:9092"
source_topic = "word_topic"
consume_group_id = "emotion_test_handler"


In [12]:
# 创建 Kafka Source
kafka_source = KafkaSource.builder()\
    .set_bootstrap_servers(kafka_servers)\
    .set_topics(source_topic).set_group_id(consume_group_id)\
    .set_starting_offsets(KafkaOffsetsInitializer.latest())\
    .set_value_only_deserializer(SimpleStringSchema())\
    .build()

In [13]:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.functions import ProcessFunction, MapFunction
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.state import ValueStateDescriptor
from datetime import datetime

# 创建 Flink 流执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)  # 设置为事件时间
env.add_jars("file:///opt/flink-1.20.0/lib/flink-sql-connector-kafka-1.17.2.jar")

# 创建 Flink 表环境
t_env = StreamTableEnvironment.create(env)



# 创建一个数据流：模拟数据流从 Kafka 或其他源获取
class EventTimeAssigner(MapFunction):
    def map(self, value):
        # 提取时间戳并返回原始数据和时间戳
        timestamp = datetime.strptime(str(value['time']), '%Y-%m-%dT%H:%M:%S').timestamp() * 1000  # 转换为毫秒
        del value['time']  # 删除时间戳字段（因为水印策略不需要）
        return (value, timestamp)


# 统计每个词的总频次
class WordCountProcessFunction(ProcessFunction):
    def __init__(self):
        self.state_descriptor = ValueStateDescriptor("word_count", dict)  # 存储每个词的累计次数
        self.state = None

    def open(self, runtime_context):
        self.state = runtime_context.get_state(self.state_descriptor)

    def process_element(self, value, ctx: 'ProcessFunction.Context'):
        # 从当前元素提取词和频次
        word_counts = value[0]
        current_state = self.state.value() or {}

        # 更新每个词的频次
        for word, count in word_counts.items():
            current_state[word] = current_state.get(word, 0) + count
        
        # 更新状态
        self.state.update(current_state)

        # 每次处理后输出当前词频统计（也可以按时间窗口或周期性输出）
        print(f"Current Word Count: {current_state}")

In [14]:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from datetime import datetime, timedelta
import json
ds = env.from_source(source=kafka_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name="Kafka Source"
)
#ds.print()
# 初始化全局字典，用于存储各字段的计数
field_counts = {}
last_timestamp = None  # 用来记录上一条数据的时间
window_duration = timedelta(seconds=10)  # 设定时间窗口为10秒

# 定义一个简单的 MapFunction 来处理数据并手动管理时间窗口
class CountFieldsInWindow(MapFunction):
    def map(self, value):
        global field_counts, last_timestamp
        
        # 解析 JSON 数据
        data = json.loads(value)  # 假设数据是 JSON 格式
        current_time = datetime.strptime(data['time'], "%Y-%m-%d %H:%M:%S")  # 提取事件时间

        # 如果时间窗口已经过去 10 秒，重置计数器
        if last_timestamp is None or current_time - last_timestamp > window_duration:
            last_timestamp = current_time
            print(f"Resetting counts at {current_time}")
            # 打印当前统计的计数
            print(f"Current field counts: {field_counts}")
            # 使用 json.dumps 将数据序列化为 JSON 字符串
            json_str = json.dumps(field_counts, ensure_ascii=False, indent=4)

            # 将 JSON 字符串写入文件
            with open('data.json', 'w', encoding='utf-8') as file:
                file.write(json_str)
            field_counts.clear()  # 重置字典

        # 更新计数
        for key in data:
            if key != 'time':  # 排除时间字段
                if key in field_counts:
                    field_counts[key] += 1
                else:
                    field_counts[key] = 1  # 如果是新字段，初始化为 1

        # 更新 last_timestamp
        

        # 打印更新后的计数（你可以根据需要输出其他信息）
        #print(f"Updated field counts: {field_counts}")
        
        return value  # 返回原始数据，或者你可以在这里做其他操作

# 使用自定义的 MapFunction
ds.map(CountFieldsInWindow())

# 执行流处理
env.execute("Count Fields in Time Window")

Resetting counts at 2024-12-10 16:57:23
Current field counts: {}
Resetting counts at 2024-12-10 16:58:00
Current field counts: {'满意': 680, '但': 12, '为什么': 4, '比': 20, '国产': 12, '主流': 2, '安卓': 10, '手机': 26, '贵': 6, '两千': 2, '蒆': 2, '已': 2, '入手': 4, '谷歌': 20, '拿': 12, '我': 30, '给': 8, '自己': 22, '长脸': 2, '真': 10, '不要脸': 4, '额': 2, '再': 4, '怎么样': 2, '是': 86, '我们': 6, '中国': 2, '人': 8, '的': 172, '东西': 8, '赢': 6, '学家': 2, '觉得': 2, '已经': 16, '把': 4, '高通': 8, '压着': 2, '打': 10, '了': 200, '幽默': 72, '爵士': 4, '哥': 18, '达到': 2, '做': 6, '才': 8, '点': 8, '时间': 4, '发育': 2, '啊': 54, '挺': 10, '好': 20, '追上': 2, '曾经': 2, '不断进步': 2, '就行': 4, '全自研': 4, '含金量': 2, '就是': 16, '太贵': 2, '不': 52, '知道': 8, '还': 42, '以为': 4, '菜': 2, '拒绝': 2, '垃圾': 12, '华为': 46, '好怕': 2, '碰掉': 2, '三星': 8, '产': 4, '很': 6, '合理': 4, '真不容易': 2, '吃瓜': 2, '吃': 4, '瓜': 2, '非常': 4, '继续': 6, '加油': 18, '諴': 2, '不亏': 2, '神笔': 2, '马良': 6, '完全': 2, '接受': 6, '不了': 2, '哈哈哈': 14, '纳米': 8, '跟': 8, '中': 2, '画质': 2, '哈哈': 8, '高画质': 2, '带不动': 2, '小时': 4, 

KeyboardInterrupt: 