In [0]:
from pyspark.sql import functions as f
from datetime import datetime, timedelta

In [0]:
KEYSPACE = "bank"
USERS_DAILY_TABLE = "unique_users_daily"
TRANSACTIONS_DAILY_TABLE = "successful_transactions_daily"
OUTPUT_TABLE = "bank_statistics_daily"

In [0]:
def get_transactions_metrics(date: str) -> (int, int):
    """
    Get the number of trunsactions and capital turnover over the provided day.
    Return zeroes in case there are no records.
    
    :param date: (str) - Date in the format (YYYY-MM-DD).
    :return: (int, int) - Number of trunsactions and capital turnover.
    """
    # Get all the transactions for the provided date.
    transactions_df = ( 
        spark.read
            .format("org.apache.spark.sql.cassandra")
            .options(table=TRANSACTIONS_DAILY_TABLE, keyspace=KEYSPACE)
            .load()
            .filter(f"date = '{date}'")
    )
    
    # Get the total number of transactions and capital turnover over the current day. 
    transactions_metrics = (
        transactions_df
            .groupBy(f.col("date"))
            .agg(
                f.count(f.col("transaction_id")).alias("number_transactions"),
                f.sum(f.col("amount")).alias("capital_turnover")
            )
            .select(f.col("number_transactions"), f.col("capital_turnover"))
            .collect()
    )
    
    # In case there were no records for that day, return zeroes.
    try:
        return transactions_metrics[0].number_transactions, transactions_metrics[0].capital_turnover
    except (IndexError, AttributeError):
        return 0, 0
    
    
def get_users_metrics(date: str) -> int:
    """
    Get number of unique users for the provided date.
    
    :param date: (str) - Date in the format (YYYY-MM-DD).
    :return: (int) - Number of unique users.
    """
    # Get all the unique users for the provided date.
    users_df = ( 
        spark.read
            .format("org.apache.spark.sql.cassandra")
            .options(table=USERS_DAILY_TABLE, keyspace=KEYSPACE)
            .load()
            .filter(f"date = '{date}'")
            .distinct()
    )
    # Get the number of unique users.
    return users_df.count()


def get_bank_statistics(date=None):
    """
    Get bank statistics for the provided date.
    If no date was provided, return the maximum statistics 
    as those are regarded to be last.
    Return zeroes in case there are no records.
    
    :param date: (str) - Date in the format (YYYY-MM-DD).
    :return: (int, int, int) - Number of trunsactions, uniques users and capital turnover.
    """
    # Get the statistics.
    stats_df = ( 
        spark.read
            .format("org.apache.spark.sql.cassandra")
            .options(table=OUTPUT_TABLE, keyspace=KEYSPACE)
            .load()
    )
    
    # Collect results for the provided date.
    if date:
        # Filter by date.
        stats = (
            stats_df
                .filter(f"date = '{date}'") \
                .select("number_transactions", "number_unique_users", "capital_turnover") \
                .collect()
        )
    else:
        # If there is not record for specified date, get the maximum results.
        stats = (
            stats_df
                .select(
                    f.max("number_transactions").alias("number_transactions"), 
                    f.max("number_unique_users").alias("number_unique_users"), 
                    f.max("capital_turnover").alias("capital_turnover")
                ).collect()
        )
    
    # In case there were no records for that day, return zeroes.
    try:
        return stats[0].number_transactions, stats[0].number_unique_users, stats[0].capital_turnover
    except (IndexError, AttributeError):
        if not date: # If there was no record at all.
            return 0, 0, 0
        return get_bank_statistics()

In [0]:
# Get current and previous day dates. As the job will run at 00:00 (the next day), 
# the current day is actually current day - 1 day.
date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
previous_date = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d")
# date = "2022-06-07"
# previous_date = "2022-06-06"

# Get transactions metrics.
curr_number_transactions, curr_capital_turnover = get_transactions_metrics(date)
# Get users metrics.
curr_number_users = get_users_metrics(date)
# Get previous day bank statistics.
prev_number_transactions, prev_number_users, prev_capital_turnover = get_bank_statistics(previous_date)

In [0]:
# Define Spark SQL query for Spark DF.
query = f"""
SELECT
    CAST({prev_number_transactions + curr_number_transactions} AS BIGINT) AS number_transactions,
    CAST({prev_number_users + curr_number_users} AS BIGINT) AS number_unique_users,
    CAST({prev_capital_turnover + curr_capital_turnover} AS BIGINT) AS capital_turnover,
    CAST('{date}' AS DATE) AS date
"""

# Write the dataframe to Cassandra.
spark.sql(query).write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table=OUTPUT_TABLE, keyspace=KEYSPACE) \
        .save()