Skip to content

Commit

Permalink
chore: merge parent branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotatek-ThinhVu committed May 6, 2024
2 parents a84447c + 4c4cf3b commit 07e2433
Show file tree
Hide file tree
Showing 33 changed files with 3,520 additions and 3,522 deletions.
136 changes: 68 additions & 68 deletions src/main/java/org/cardanofoundation/job/listener/ReportsListener.java
Original file line number Diff line number Diff line change
@@ -1,68 +1,68 @@
//package org.cardanofoundation.job.listener;
//
//import lombok.RequiredArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//
//import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.kafka.support.Acknowledgment;
//import org.springframework.stereotype.Component;
//
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//
//import org.cardanofoundation.explorer.common.entity.explorer.PoolReportHistory;
//import org.cardanofoundation.explorer.common.entity.explorer.ReportHistory;
//import org.cardanofoundation.explorer.common.entity.explorer.StakeKeyReportHistory;
//import org.cardanofoundation.explorer.common.model.ReportMessage;
//import org.cardanofoundation.job.repository.explorer.PoolReportHistoryRepository;
//import org.cardanofoundation.job.repository.explorer.StakeKeyReportHistoryRepository;
//import org.cardanofoundation.job.service.PoolReportService;
//import org.cardanofoundation.job.service.StakeKeyReportService;
//
//@Component
//@Slf4j
//@RequiredArgsConstructor
//@ConditionalOnProperty(
// value = "kafka.configuration-enabled",
// matchIfMissing = true,
// havingValue = "true")
//public class ReportsListener {
//
// private final StakeKeyReportHistoryRepository stakeKeyReportHistoryRepository;
// private final PoolReportHistoryRepository poolReportHistoryRepository;
// private final StakeKeyReportService stakeKeyReportService;
// private final PoolReportService poolReportService;
//
// @KafkaListener(topics = "${kafka.listeners.topics.reports}")
// public void consume(
// ConsumerRecord<String, ReportMessage> consumerRecord, Acknowledgment acknowledgment) {
// try {
// ReportMessage reportMessage = consumerRecord.value();
// ReportHistory reportHistory = reportMessage.getReportHistory();
// log.info(
// "Receive report history {} with type {}", reportHistory.getId(), reportHistory.getType());
//
// switch (reportHistory.getType()) {
// case STAKE_KEY:
// StakeKeyReportHistory stakeKeyReportHistory =
// stakeKeyReportHistoryRepository.findByReportHistoryId(reportHistory.getId());
// stakeKeyReportService.exportStakeKeyReport(
// stakeKeyReportHistory, reportMessage.getZoneOffset(), reportMessage.getTimePattern());
// break;
// case POOL_ID:
// PoolReportHistory poolReportHistory =
// poolReportHistoryRepository.findByReportHistoryId(reportHistory.getId());
// poolReportService.exportPoolReport(
// poolReportHistory, reportMessage.getZoneOffset(), reportMessage.getTimePattern());
// break;
// default:
// break;
// }
//
// acknowledgment.acknowledge();
// log.info("Acknowledge report history {}", reportHistory.getId());
// } catch (Exception e) {
// log.error("Consume report history failure: {}", e.getMessage());
// }
// }
//}
package org.cardanofoundation.job.listener;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.cardanofoundation.explorer.common.entity.explorer.PoolReportHistory;
import org.cardanofoundation.explorer.common.entity.explorer.ReportHistory;
import org.cardanofoundation.explorer.common.entity.explorer.StakeKeyReportHistory;
import org.cardanofoundation.explorer.common.model.ReportMessage;
import org.cardanofoundation.job.repository.explorer.PoolReportHistoryRepository;
import org.cardanofoundation.job.repository.explorer.StakeKeyReportHistoryRepository;
import org.cardanofoundation.job.service.PoolReportService;
import org.cardanofoundation.job.service.StakeKeyReportService;

@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(
value = "kafka.configuration-enabled",
matchIfMissing = true,
havingValue = "true")
public class ReportsListener {

private final StakeKeyReportHistoryRepository stakeKeyReportHistoryRepository;
private final PoolReportHistoryRepository poolReportHistoryRepository;
private final StakeKeyReportService stakeKeyReportService;
private final PoolReportService poolReportService;

@KafkaListener(topics = "${kafka.listeners.topics.reports}")
public void consume(
ConsumerRecord<String, ReportMessage> consumerRecord, Acknowledgment acknowledgment) {
try {
ReportMessage reportMessage = consumerRecord.value();
ReportHistory reportHistory = reportMessage.getReportHistory();
log.info(
"Receive report history {} with type {}", reportHistory.getId(), reportHistory.getType());

switch (reportHistory.getType()) {
case STAKE_KEY:
StakeKeyReportHistory stakeKeyReportHistory =
stakeKeyReportHistoryRepository.findByReportHistoryId(reportHistory.getId());
stakeKeyReportService.exportStakeKeyReport(
stakeKeyReportHistory, reportMessage.getZoneOffset(), reportMessage.getTimePattern());
break;
case POOL_ID:
PoolReportHistory poolReportHistory =
poolReportHistoryRepository.findByReportHistoryId(reportHistory.getId());
poolReportService.exportPoolReport(
poolReportHistory, reportMessage.getZoneOffset(), reportMessage.getTimePattern());
break;
default:
break;
}

acknowledgment.acknowledge();
log.info("Acknowledge report history {}", reportHistory.getId());
} catch (Exception e) {
log.error("Consume report history failure: {}", e.getMessage());
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/org/cardanofoundation/job/model/TokenTxCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.cardanofoundation.job.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class TokenTxCount {
Long ident;
Long txCount;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.cardanofoundation.job.projection;

import java.math.BigInteger;
import java.sql.Timestamp;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand All @@ -15,5 +14,5 @@ public class StakeTxProjection {

Long txId;
BigInteger amount;
Timestamp time;
Long time;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ public void insertAll(List<TokenInfo> tokenInfos) {
field(entityUtil.getColumnField(TokenInfo_.MULTI_ASSET_ID)),
field(entityUtil.getColumnField(TokenInfo_.NUMBER_OF_HOLDERS)),
field(entityUtil.getColumnField(TokenInfo_.VOLUME24H)),
field(entityUtil.getColumnField(TokenInfo_.TOTAL_VOLUME)),
field(entityUtil.getColumnField(TokenInfo_.TX_COUNT)),
field(entityUtil.getColumnField(TokenInfo_.UPDATE_TIME)))
.values(
tokenInfo.getBlockNo(),
tokenInfo.getMultiAssetId(),
tokenInfo.getNumberOfHolders(),
tokenInfo.getVolume24h(),
tokenInfo.getTotalVolume(),
tokenInfo.getTxCount(),
tokenInfo.getUpdateTime());

queries.add(query);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

import java.util.List;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import org.cardanofoundation.explorer.common.entity.compositeKey.AddressTxAmountId;
import org.cardanofoundation.explorer.common.entity.ledgersync.AddressTxAmount;
import org.cardanofoundation.job.model.TokenTxCount;
import org.cardanofoundation.job.model.TokenVolume;
import org.cardanofoundation.job.projection.StakeTxProjection;
import org.cardanofoundation.job.projection.UniqueAccountTxCountProjection;

public interface AddressTxAmountRepository
Expand All @@ -21,4 +26,108 @@ public interface AddressTxAmountRepository
group by account
""")
List<UniqueAccountTxCountProjection> findUniqueAccountsInEpoch(@Param("epochNo") Integer epochNo);

@Query(
value =
"SELECT new org.cardanofoundation.job.projection.StakeTxProjection(tx.id, sum(addTxAmount.quantity), addTxAmount.blockTime)"
+ " FROM AddressTxAmount addTxAmount"
+ " JOIN Tx tx on tx.hash = addTxAmount.txHash"
+ " WHERE addTxAmount.stakeAddress = :stakeAddress"
+ " AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate"
+ " GROUP BY addTxAmount.txHash, addTxAmount.blockTime"
+ " ORDER BY addTxAmount.blockTime DESC")
Page<StakeTxProjection> findTxAndAmountByStake(
@Param("stakeAddress") String stakeAddress,
@Param("fromDate") Long fromDate,
@Param("toDate") Long toDate,
Pageable pageable);

@Query(
"SELECT COUNT(DISTINCT addTxAmount.txHash) FROM AddressTxAmount addTxAmount"
+ " WHERE addTxAmount.stakeAddress = :stakeAddress"
+ " AND addTxAmount.blockTime >= :fromDate AND addTxAmount.blockTime <= :toDate")
Long getCountTxByStakeInDateRange(
@Param("stakeAddress") String stakeAddress,
@Param("fromDate") Long fromDate,
@Param("toDate") Long toDate);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
JOIN Tx tx ON tx.hash = ata.txHash
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
AND tx.id >= :txId
AND ata.quantity > 0
GROUP BY ma.id
""")
List<TokenVolume> sumBalanceAfterTx(
@Param("startIdent") Long startIdent,
@Param("endIdent") Long endIdent,
@Param("txId") Long txId);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
JOIN Tx tx ON tx.hash = ata.txHash
WHERE ma.id IN :multiAssetIds
AND tx.id >= :txId
AND ata.quantity > 0
GROUP BY ma.id
""")
List<TokenVolume> sumBalanceAfterTx(
@Param("multiAssetIds") List<Long> multiAssetIds, @Param("txId") Long txId);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
AND ata.quantity > 0
GROUP BY ma.id
""")
List<TokenVolume> getTotalVolumeByIdentInRange(
@Param("startIdent") Long startIdent, @Param("endIdent") Long endIdent);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenTxCount(ma.id, count(distinct (ata.txHash)))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id >= :startIdent AND ma.id <= :endIdent
GROUP BY ma.id
""")
List<TokenTxCount> getTotalTxCountByIdentInRange(
@Param("startIdent") Long startIdent, @Param("endIdent") Long endIdent);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenVolume(ma.id, sum(ata.quantity))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id IN :multiAssetIds
AND ata.quantity > 0
GROUP BY ma.id
""")
List<TokenVolume> getTotalVolumeByIdentIn(@Param("multiAssetIds") List<Long> multiAssetIds);

@Query(
value =
"""
SELECT new org.cardanofoundation.job.model.TokenTxCount(ma.id, count(distinct (ata.txHash)))
FROM AddressTxAmount ata
JOIN MultiAsset ma ON ata.unit = ma.unit
WHERE ma.id IN :multiAssetIds
GROUP BY ma.id
""")
List<TokenTxCount> getTotalTxCountByIdentIn(@Param("multiAssetIds") List<Long> multiAssetIds);
}

This file was deleted.

0 comments on commit 07e2433

Please sign in to comment.