In [36]:
import sys
from datetime import datetime
import json
import time
import pandas as pd
from kafka import KafkaProducer
from pathlib import Path
from loguru import logger
from typing import Any, Dict
import pytz
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


# Модель данных netflow
class Router(BaseModel):
    IdSession: int
    IdPSX: int
    IdSubscriber: int
    StartSession: datetime
    EndSession: datetime | None
    Duration: int
    UpTx: int
    DownTx: int
    SourceFile: str

    @field_validator('EndSession', mode='before')
    def convert_nat_to_none(cls, v: object) -> object:
        if v is pd.NaT:
            return None
        return v


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_ignore_empty=True, env_file='.env', env_file_encoding='utf-8')

    current_timezone: str = 'Europe/Moscow'
    kafka_broker: str = 'localhost:9092'
    kafka_topic: str = 'csv_data_topic'
    csv_directory: str = '../data/TelecomX/telecom100k/'
    log_file: str = "logs/netflow_producer.log"
    time_pointer_file: str = 'logs/time_pointer.json'
    wait_time: int = 10  # время ожидания перед отправкой следующей порции данных


settings = Settings()

logger.add(settings.log_file)

KAFKA_BROKER = settings.kafka_broker
KAFKA_TOPIC = settings.kafka_topic
CSV_DIRECTORY = settings.csv_directory

current_timezone = pytz.timezone(settings.current_timezone)

In [11]:
def get_kafka_producer() -> KafkaProducer:
    """
    Инициализируем KafkaProducer с базовыми настройками.
    В продакшене нужно добавить обработку ошибок подключения, настройки безопасности и т.д.
    """
    try:
        producer = KafkaProducer(
            bootstrap_servers=KAFKA_BROKER,
            # value_serializer=lambda v: json.dumps(v, allow_nan=False).encode('utf-8'),
            value_serializer=lambda v: str(v).encode('utf-8'),
            retries=5,               # Повторная отправка при сбоях
            linger_ms=10,            # Небольшая задержка перед отправкой
            max_request_size=1048576 # Ограничение размера запроса (1MB)
        )
        logger.info("KafkaProducer успешно инициализирован!")
        return producer
    except Exception as e:
        logger.error(f"Ошибка инициализации KafkaProducer: {e}")
        raise


In [42]:
def wait_until(target_time: datetime):
    """Ждёт до указанного времени (формат 'HH:MM:SS')."""
    while True:
        now = datetime.now()
        if now >= target_time:
            break  # Если время уже наступило, выходим
        time_left = (target_time - now).total_seconds()
        time.sleep(min(time_left, 1))  # Ждём не более 1 секунды за раз


def send_dataframe(dataframe: pd.DataFrame, model: BaseModel, producer: KafkaProducer, headers: dict = None):
    """Отправка датафрейма Pandas в топик Kafka"""
    headers_list = [(key, str(headers[key]).encode('utf8')) for key in headers]
    for rec in dataframe.to_dict(orient='records'):
        # print(f"{rec=}")
        producer.send(KAFKA_TOPIC, value=model(**rec).model_dump_json(), headers=headers_list)

In [26]:
r0_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.1_*.txt'))
r1_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.2_*.txt'))
r2_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.3_*.txt'))
r3_logs = sorted(Path(CSV_DIRECTORY).glob('psx_62.0_*.csv'))
r4_logs = sorted(Path(CSV_DIRECTORY).glob('psx_69.0_*.csv'))
r5_logs = sorted(Path(CSV_DIRECTORY).glob('psx_65.0_*.csv'))

# r3_logs[0:10]
# csv_file1 = Path(CSV_DIRECTORY+'psx_65.0_2024-01-04 08:20:00.csv');
csv_file0 = r0_logs[0]
df0 = pd.read_csv(csv_file0, parse_dates=['StartSession','EndSession'],sep='|').rename(columns={"Duartion": "Duration"})
df0['StartSession'] = df0['StartSession'].dt.tz_localize('Etc/GMT-5').dt.tz_convert('Europe/Moscow')
df0['EndSession'] = df0['EndSession'].dt.tz_localize('Etc/GMT-5').dt.tz_convert('Europe/Moscow')
df0.info()

df_time = pd.Timestamp(csv_file0.stem[9:], tz=current_timezone)
print(f'{df_time=}')
df_delta = datetime.now(tz=current_timezone) - df_time
df0['StartSession'].add(df_delta)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1564 entries, 0 to 1563
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype                        
---  ------        --------------  -----                        
 0   IdSession     1564 non-null   int64                        
 1   IdPSX         1564 non-null   int64                        
 2   IdSubscriber  1564 non-null   int64                        
 3   StartSession  1564 non-null   datetime64[ns, Europe/Moscow]
 4   EndSession    20 non-null     datetime64[ns, Europe/Moscow]
 5   Duration      1564 non-null   int64                        
 6   UpTx          1564 non-null   int64                        
 7   DownTx        1564 non-null   int64                        
dtypes: datetime64[ns, Europe/Moscow](2), int64(6)
memory usage: 97.9 KB
df_time=Timestamp('2024-01-01 00:10:00+0300', tz='Europe/Moscow')


  df0 = pd.read_csv(csv_file0, parse_dates=['StartSession','EndSession'],sep='|').rename(columns={"Duartion": "Duration"})


0      2025-05-01 05:07:58.230541+03:00
1      2025-05-01 05:26:41.230541+03:00
2      2025-05-01 14:39:01.230541+03:00
3      2025-05-01 16:05:22.230541+03:00
4      2025-05-01 07:26:51.230541+03:00
                     ...               
1559   2025-05-01 15:59:33.230541+03:00
1560   2025-05-01 14:18:51.230541+03:00
1561   2025-05-01 13:21:04.230541+03:00
1562   2025-05-01 09:24:47.230541+03:00
1563   2025-05-01 15:24:11.230541+03:00
Name: StartSession, Length: 1564, dtype: datetime64[ns, Europe/Moscow]

In [27]:
# Список фалов-источников
r0_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.1_*.txt'))
r1_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.2_*.txt'))
r2_logs = sorted(Path(CSV_DIRECTORY).glob('psx_66.3_*.txt'))
r3_logs = sorted(Path(CSV_DIRECTORY).glob('psx_62.0_*.csv'))
r4_logs = sorted(Path(CSV_DIRECTORY).glob('psx_69.0_*.csv'))
r5_logs = sorted(Path(CSV_DIRECTORY).glob('psx_65.0_*.csv'))

producer = get_kafka_producer()

[32m2025-05-01 19:10:17.826[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_kafka_producer[0m:[36m15[0m - [1mKafkaProducer успешно инициализирован![0m


In [49]:
def save_state(path: Path | str, current_time: pd.Timestamp, time_pointer: pd.Timestamp):
    """Сохраняет текущее время и поток в файл"""
    record = {
        'current_time': current_time.isoformat() #.strftime('%Y-%m-%d %H:%M:%S'), 
        , 'time_pointer': time_pointer.isoformat() #.strftime('%Y-%m-%d %H:%M:%S')
        }
    with open(Path(path), 'wt') as state_file:
        state_file.write(json.dumps(record))
    logger.debug('State has saved')


def load_state(path: Path | str, tz: pytz.BaseTzInfo) -> tuple[pd.Timestamp, pd.Timestamp]:
    """Возвращает текущее время и поток (current_time, time_pointer) из файла"""
    path = Path(path)
    with open(path, 'rt') as state_file:
        current_state = json.loads(state_file.readline())
    logger.debug('State has loaded')
    return pd.Timestamp(current_state['current_time'], tz=tz), pd.Timestamp(current_state['time_pointer'], tz=tz)


In [44]:


# Управление задержками передачи
# start_time = datetime.now()
# current_time = start_time
if Path(settings.time_pointer_file).exists():
    current_time, current_time_pointer = load_state(settings.time_pointer_file, tz=current_timezone)
else:
    current_time = pd.Timestamp(datetime.now(), tz=current_timezone)
    current_time_pointer = pd.Timestamp('1900-01-01 00:00:00', tz=current_timezone)

# Читаю сохраненую закладу времени
# save_state(settings.time_pointer_file)
# if Path(settings.time_pointer_file).exists():
#     with open(Path(settings.time_pointer_file), 'rt') as time_pointer_file:
#         current_time_pointer = pd.Timestamp(time_pointer_file.readline(), tz=current_timezone)
# else:
#     current_time_pointer = pd.Timestamp('1900-01-01 00:00:00', tz=current_timezone)


# Перебираем циклом каждый временной период по всем наборам источников за все время, выбираем одно время за раз
for i, (r0_file, r1_file, r2_file, r3_file, r4_file, r5_file) in enumerate(zip(r0_logs, r1_logs, r2_logs, r3_logs, r4_logs, r5_logs)):
    logger.info(f"Итерация: {i}, время: {current_time}")
    
    next_time = current_time + pd.Timedelta(minutes=settings.wait_time)

    # Берем поправку на время данных
    df_time = pd.Timestamp(r0_file.stem[9:], tz=current_timezone)
    df_delta = current_time - df_time

    # Пропускаем уже прочитанное время
    if df_time < current_time_pointer:
        logger.debug(f"Пропускаю время: {df_time.strftime('%Y-%m-%d %H:%M:%S')}")
        # current_time = next_time
        continue
    
    # берем txt источники
    for df_file in (r0_file, r1_file, r2_file):
        logger.debug(f"{df_file=}")
        df = pd.read_csv(df_file, sep='|', parse_dates=['StartSession','EndSession'], dayfirst=True).rename(columns={"Duartion": "Duration"})
        df['StartSession'] = df['StartSession'].dt.tz_localize('Etc/GMT-5').dt.tz_convert(current_timezone) + df_delta
        df['EndSession'] = df['EndSession'].dt.tz_localize('Etc/GMT-5').dt.tz_convert(current_timezone) + df_delta
        df['SourceFile'] = df_file.name;

        send_dataframe(dataframe=df, model=Router, producer=producer, headers={'df_file': df_file})
        time.sleep(settings.wait_time*60/10)
    # Берем csv источники
    for df_file in (r3_file, r4_file, r5_file):
        logger.debug(f"{df_file=}")
        df = pd.read_csv(df_file, sep=',', parse_dates=['StartSession','EndSession'], dayfirst=True).rename(columns={"Duartion": "Duration"})
        df['StartSession'] = df['StartSession'].dt.tz_localize('Etc/GMT-6').dt.tz_convert(current_timezone) + df_delta
        df['EndSession'] = df['EndSession'].dt.tz_localize('Etc/GMT-6').dt.tz_convert(current_timezone) + df_delta
        df['SourceFile'] = df_file.name;

        send_dataframe(dataframe=df, model=Router, producer=producer, headers={'df_file': df_file})
        time.sleep(settings.wait_time*60/10)
    producer.flush()

    # Сохраняю текущее состояние
    save_state(
        path = settings.time_pointer_file 
        ,current_time = next_time
        ,time_pointer = df_time +  pd.Timedelta(minutes=10)
        ,tz = settings.current_timezone
        )
    
    # # Записываем текущую временную метку
    # with open(Path(settings.time_pointer_file), 'wt') as time_pointer_file:
    #     print((df_time + pd.Timedelta(minutes=10)).strftime('%Y-%m-%d %H:%M:%S'), file=time_pointer_file)

    # Ожидаем следующий перод времени
    wait_until(next_time)
    current_time = next_time

[32m2025-05-01 19:29:07.928[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m21[0m - [1mИтерация: 0, время: 2025-05-01 19:29:07.926932+03:00[0m
[32m2025-05-01 19:29:07.932[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m37[0m - [34m[1mdf_file=PosixPath('../data/TelecomX/telecom100k/psx_66.1_2024-01-01 00:10:00.txt')[0m


KeyboardInterrupt: 

In [18]:
rt = Router(IdSession=1, IdPSX=2, IdSubscriber=3, StartSession=pd.Timestamp('2024-01-01'), EndSession=None, Duration=10, UpTx=1, DownTx=1, SourceFile='abc_file')

In [21]:
rt.model_dump_json()

'{"IdSession":1,"IdPSX":2,"IdSubscriber":3,"StartSession":"2024-01-01T00:00:00","EndSession":null,"Duration":10,"UpTx":1,"DownTx":1,"SourceFile":"abc_file"}'

In [24]:
rt.model_dump()

{'IdSession': 1,
 'IdPSX': 2,
 'IdSubscriber': 3,
 'StartSession': Timestamp('2024-01-01 00:00:00'),
 'EndSession': None,
 'Duration': 10,
 'UpTx': 1,
 'DownTx': 1,
 'SourceFile': 'abc_file'}

In [33]:
df['EndSession'][df['EndSession'].notna()]

16     2025-05-01 17:06:28.187621+03:00
82     2025-05-01 17:01:36.187621+03:00
113    2025-05-01 17:01:41.187621+03:00
214    2025-05-01 17:01:57.187621+03:00
231    2025-05-01 17:07:41.187621+03:00
273    2025-05-01 17:03:52.187621+03:00
360    2025-05-01 17:02:00.187621+03:00
488    2025-05-01 17:08:23.187621+03:00
538    2025-05-01 17:08:58.187621+03:00
742    2025-05-01 17:07:23.187621+03:00
743    2025-05-01 17:02:04.187621+03:00
800    2025-05-01 17:05:17.187621+03:00
1055   2025-05-01 17:08:33.187621+03:00
1180   2025-05-01 17:02:31.187621+03:00
1327   2025-05-01 17:00:31.187621+03:00
1335   2025-05-01 17:04:39.187621+03:00
1371   2025-05-01 17:05:28.187621+03:00
1398   2025-05-01 17:04:55.187621+03:00
1470   2025-05-01 17:04:07.187621+03:00
1546   2025-05-01 17:06:13.187621+03:00
Name: EndSession, dtype: datetime64[ns, Europe/Moscow]