In [1]:
import os
os.chdir("../")
import logging
import csv
from datetime import datetime, timedelta, timezone
from binance_sdk_c2c.c2c import C2C, ConfigurationRestAPI, C2C_REST_API_PROD_URL
from turtle import home
from typing import List, Optional
from binance_sdk_c2c.rest_api.models import GetC2CTradeHistoryResponse, GetC2CTradeHistoryResponseDataInner
from src.utils.utils import *

In [2]:
from src.components.data_ingestion import *

In [3]:

class C2CExtended(C2C):
    """Extended C2C API with specific time-based trade history retrieval methods for Vietnam timezone (UTC+7)."""

    def __init__(self, config_rest_api: ConfigurationRestAPI = None) -> None:
        super().__init__(config_rest_api)
        self.max_records = 50  # Maximum records per request as per observed API limit
        self.tz_vietnam = timezone(timedelta(hours=7))  # UTC+7 For Vietnam timezone

    def _fetch_data(
        self,
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
    ) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Fetch all trade history records with pagination.
        
        Args:
            start_time (Optional[int]): Start timestamp in milliseconds
            end_time (Optional[int]): End timestamp in milliseconds
            
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of all trade records
        """
        fetch_data = []
        page = 1 

        while True:
            response = self.rest_api.get_c2_c_trade_history(
                start_time=start_time,
                end_time=end_time,
                page=page,
                recv_window=60000
            )

            rate_limits = response.rate_limits
            logging.info(f"Page {page} rate limits: {rate_limits}")

            # Extract the inner data list from the response
            response_data = response.data()
            if response_data.code != '000000' or not response_data.data:
                logging.info(f"Stopping at page {page}: No more data or error (code: {response_data.code})")
                break

            data = response_data.data
            # Verify that trades are within the requested time range
            filtered_data = [
                trade for trade in data
                if start_time <= trade.create_time <= end_time
            ]
            fetch_data.extend(filtered_data)
            logging.info(f"Page {page} retrieved {len(data)} records, {len(filtered_data)} within time range")

            # If fewer than max_records, we've reached the end
            if len(data) < self.max_records:
                break

            page += 1
        
        return fetch_data

    def get_latest(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history from 00:00 of current day to now in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """
        now = datetime.now(self.tz_vietnam)
        start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
        end_of_today = now.replace(hour=23, minute=59, second=59, microsecond=999000)

        start_time = get_timestamp(start_of_today)
        end_time = get_timestamp(end_of_today)

        return self._fetch_data(start_time, end_time)
    

    def get_latest_by_week(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history from start of current week (Monday) to now in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """

        now = datetime.now(self.tz_vietnam)

        # Calculate days since Monday (0 = Mon, 1 = Tues,...)
        days_since_monday = now.weekday()
        start_of_week = (now - timedelta(days=days_since_monday)).replace(hour=0, minute=0, second=0, microsecond=0)

        start_time = get_timestamp(start_of_week)
        end_time = get_timestamp(now)

        return self._fetch_data(start_time, end_time)
    
    def get_prev_week_data(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history for the entire previous week (Monday to Sunday) in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """
        now = datetime.now(self.tz_vietnam)
        days_since_monday = now.weekday()
        start_of_current_week = (now - timedelta(days=days_since_monday)).replace(hour=0, minute=0, second=0, microsecond=0)
        start_of_prev_week = start_of_current_week - timedelta(days=7)
        end_of_prev_week = start_of_prev_week + timedelta(days=7) - timedelta(milliseconds=1)

        start_time = get_timestamp(start_of_prev_week)
        end_time = get_timestamp(end_of_prev_week)

        return self._fetch_data(start_time, end_time)

    def get_latest_by_month(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history from start of current month to now in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """
        now = datetime.now(self.tz_vietnam)

        start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

        start_time = get_timestamp(start_of_month)
        end_time = get_timestamp(now)

        return self._fetch_data(start_time, end_time)

    def get_prev_month(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history for the entire previous month in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """

        now = datetime.now(self.tz_vietnam)

        # Get first day of current month
        start_of_current_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

        # Get first day of previous month
        start_of_prev_month = (start_of_current_month - timedelta(days=1)).replace(day=1)

        # Get last day of previous month
        if start_of_prev_month.month == 12:
            end_of_prev_month = start_of_prev_month.replace(year=start_of_prev_month.year + 1,
            month=1,
            day=1)
        else:
            end_of_prev_month = start_of_prev_month.replace(month=start_of_prev_month.month + 1,
            day=1)
        
        end_of_prev_month -= timedelta(milliseconds=1)

        start_time = get_timestamp(start_of_prev_month)
        end_time = get_timestamp(end_of_prev_month)

        return self._fetch_data(start_time, end_time)

    def get_yesterday(self) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history for the yesterday since today in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """

        now = datetime.now(self.tz_vietnam)
        
        # Start of yesterday
        start_of_yesterday = (now - timedelta(days=1)).replace(hour=0, 
                                                            minute=0,
                                                            second=0,
                                                            microsecond=0)
        
        # End of yesterday
        end_of_yesterday = start_of_yesterday.replace(hour=23,
                                                    minute=59,
                                                    second=59,
                                                    microsecond=999000)

        start_time = get_timestamp(start_of_yesterday)
        end_time = get_timestamp(end_of_yesterday)

        return self._fetch_data(start_time, end_time)


    def get_custom_range(self, start_date: str, end_date: str) -> List[GetC2CTradeHistoryResponseDataInner]:
        """
        Get trade history for a custom date range in Vietnam timezone (UTC+7).
        Includes both BUY and SELL trades.
        
        Args:
            start_date (str): Start date in format 'YYYY-MM-DD' (e.g., '2025-09-01')
            end_date (str): End date in format 'YYYY-MM-DD' (e.g., '2025-09-23')
        
        Returns:
            List[GetC2CTradeHistoryResponseDataInner]: List of trade records
        """
        try:
            start_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(tzinfo=self.tz_vietnam, hour=0, minute=0, second=0, microsecond=0)
            end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(tzinfo=self.tz_vietnam, hour=23, minute=59, second=59, microsecond=999000)
            
            # Validate date range (max 30 days as per API limit)
            if (end_dt - start_dt).days > 30:
                raise ValueError("Date range cannot exceed 30 days")

            start_time = get_timestamp(start_dt)
            end_time = get_timestamp(end_dt)

            return self._fetch_data(start_time, end_time)
        except ValueError as e:
            logging.error(f"Invalid date format or range: {str(e)}")
            return []


    def get_trades_for_dates(self, dates: List[str]) -> List[GetC2CTradeHistoryResponseDataInner]:
            """
            Get trade history for a list of arbitrary dates in Vietnam timezone (UTC+7).
            Each date covers from 00:00:00.000 to 23:59:59.999 of that day.
            Includes both BUY and SELL trades.
            
            Args:
                dates (List[str]): List of dates in format 'YYYY-MM-DD' (e.g., ['2025-09-20', '2025-09-23'])
            
            Returns:
                List[GetC2CTradeHistoryResponseDataInner]: List of all trade records for the specified dates
            """
            all_trades = []
            
            for date_str in dates:
                try:
                    # Parse date string to datetime
                    date = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=self.tz_vietnam)
                    start_of_day = date.replace(hour=0, minute=0, second=0, microsecond=0)
                    end_of_day = start_of_day.replace(hour=23, minute=59, second=59, microsecond=999000)

                    start_time = get_timestamp(start_of_day)
                    end_time = get_timestamp(end_of_day)

                    # Validate date range (max 30 days from earliest to latest date)
                    if all_trades:  # Check only if there are previous trades
                        earliest_date = min([datetime.strptime(d, '%Y-%m-%d') for d in dates[:len(all_trades)]])
                        if (date - earliest_date).days > 30:
                            logging.warning(f"Skipping {date_str}: Exceeds 30-day API limit from earliest date {earliest_date.strftime('%Y-%m-%d')}")
                            continue

                    logging.info(f"Fetching trades for {date_str} from "
                                f"{start_of_day.strftime('%Y-%m-%d %H:%M:%S %Z')} to "
                                f"{end_of_day.strftime('%Y-%m-%d %H:%M:%S.%f %Z')}")

                    # Fetch trades for the current date
                    trades = self._fetch_data(start_time, end_time)
                    all_trades.extend(trades)
                    
                except ValueError as e:
                    logging.error(f"Invalid date format for {date_str}: {str(e)}")
                    continue

            return all_trades

In [3]:
from binance_common.configuration import ConfigurationRestAPI
# Create configuration for the REST API
configuration_rest_api = ConfigurationRestAPI(
    api_key=os.getenv("API_KEY", ""),
    api_secret=os.getenv("API_SECRET", ""),
    base_path=os.getenv("BASE_PATH", C2C_REST_API_PROD_URL),
)
c2c = C2CExtended(configuration_rest_api)
# Ví dụ 5: Lấy giao dịch khoảng thời gian tùy chỉnh (01/09/2025 - 10/09/2025)
# Lấy giao dịch cho danh sách các ngày bất kỳ
data = c2c.get_latest_by_week()

INFO:root:Page 1 rate limits: []
INFO:root:Page 1 retrieved 50 records, 50 within time range
INFO:root:Page 2 rate limits: []
INFO:root:Page 2 retrieved 50 records, 50 within time range
INFO:root:Page 3 rate limits: []
INFO:root:Page 3 retrieved 50 records, 50 within time range
INFO:root:Page 4 rate limits: []
INFO:root:Page 4 retrieved 50 records, 50 within time range
INFO:root:Page 5 rate limits: []
INFO:root:Page 5 retrieved 50 records, 50 within time range
INFO:root:Page 6 rate limits: []
INFO:root:Page 6 retrieved 50 records, 50 within time range
INFO:root:Page 7 rate limits: []
INFO:root:Page 7 retrieved 50 records, 50 within time range
INFO:root:Page 8 rate limits: []
INFO:root:Page 8 retrieved 50 records, 50 within time range
INFO:root:Page 9 rate limits: []
INFO:root:Page 9 retrieved 50 records, 50 within time range
INFO:root:Page 10 rate limits: []
INFO:root:Page 10 retrieved 50 records, 50 within time range
INFO:root:Page 11 rate limits: []
INFO:root:Page 11 retrieved 50 rec

In [4]:
len(data)

943

In [1]:
pwd

'd:\\Working\\binance-merchant-trading-flow\\notebook'

In [2]:
import os
os.chdir("../")

In [3]:
JARS_PATH = f"{os.getcwd()}/jars/"

In [4]:
JARS_PATH

'd:\\Working\\binance-merchant-trading-flow/jars/'

In [5]:
import os
from pathlib import Path
from typing import List

from loguru import logger
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import (
    KafkaOffsetsInitializer,
    KafkaSource,
)

JARS_PATH = Path(os.getcwd()) / "jars"  # Sử dụng Path để dễ xử lý

def get_jar_uri(jar_name: str) -> str:
    """Tạo file URI đúng cho JAR trên Windows/Linux."""
    jar_path = JARS_PATH / jar_name
    if not jar_path.exists():
        raise FileNotFoundError(f"JAR không tồn tại: {jar_path}")
    posix_path = str(jar_path).replace("\\", "/")  # Chuyển sang forward slash
    if os.name == "nt":  # Windows
        return f"file:///{posix_path}"
    else:
        return f"file://{posix_path}"

def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    # Add required JARs với URI đúng
    env.add_jars(
        get_jar_uri("flink-connector-kafka-3.0.1-1.18.jar"),  # Connector chính
        get_jar_uri("kafka-clients-3.0.0.jar"),  # Thêm Kafka clients để fix lỗi OffsetResetStrategy
        get_jar_uri("flink-sql-connector-kafka-3.2.0-1.18.jar"),  # Optional
        get_jar_uri("flink-json-1.18.0.jar"),
        get_jar_uri("flink-connector-jdbc-3.2.0-1.18.jar"),
        get_jar_uri("postgresql-42.7.7.jar"),
    )

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("127.0.0.1:9092")
        .set_topics("c2c-trades")
        .set_group_id("flink-datastream-consumer-001")
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .set_value_only_deserializer(SimpleStringSchema())
        .build()
    )

    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "kafka-source")
    ds.print()

    env.execute("kafka-source-print")


if __name__ == "__main__":
    main()

  import pkg_resources


Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
	at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
	at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
	at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
	at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
	at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
	at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
	at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
	at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
	at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:126)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:328)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:642)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	... 5 more
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: kafka-source -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:408)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to 
	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
	... 6 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.kafka.common.KafkaFuture org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()'
	at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
	at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
	... 6 more


In [7]:
from src.utils.postgresql_client import create_client_from_env
client = create_client_from_env()
print(client.table_exists(table_name = "trades", schema="c2c"))

INFO:src.utils.postgresql_client:Connection pool initialized: 1-10 connections
INFO:src.utils.postgresql_client:Query returned 1 rows


True


In [None]:
from src.utils.postgresql_client import create_client_from_env
client = create_client_from_env()


summary = client.update_order_status(data)  # works with models
print(summary)

INFO:src.utils.postgresql_client:Connection pool initialized: 1-10 connections
INFO:src.utils.postgresql_client:Query returned 0 rows
INFO:src.utils.postgresql_client:[CDC] Insert new order: order_number=22811072821183188992, status=COMPLETED
INFO:src.utils.postgresql_client:Inserting row into c2c.trades: order_number=22811072821183188992
INFO:src.utils.postgresql_client:Query executed successfully. 1 rows affected
INFO:src.utils.postgresql_client:Query returned 0 rows
INFO:src.utils.postgresql_client:[CDC] Insert new order: order_number=22811072698896691200, status=COMPLETED
INFO:src.utils.postgresql_client:Inserting row into c2c.trades: order_number=22811072698896691200
INFO:src.utils.postgresql_client:Query executed successfully. 1 rows affected
INFO:src.utils.postgresql_client:Query returned 0 rows
INFO:src.utils.postgresql_client:[CDC] Insert new order: order_number=22811072528433299456, status=COMPLETED
INFO:src.utils.postgresql_client:Inserting row into c2c.trades: order_number=

In [5]:
output_file = write_to_csv(data, "thisweek")

INFO:root:Trade history written to c2c_trade_history_thisweek.csv with 943 records


In [2]:
from src.utils.minio_uploader import MinioUploader