In [None]:
# Databricks notebook source
from typing import List

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.window import Window

from pipelines import utils
from pipelines.context import Context
from pipelines.etl import readers, writers
from pipelines.etl.paths import Path
from pipelines.references import currencies

In [None]:
# COMMAND ----------

SILVER_PAYMENTS = "base.bitx.vw_payments"
SILVER_DEPOSITS = "base.bitx.vw_deposits"
SILVER_BANK_TRANSACTIONS = "base.bitx.bank_transactions"
SILVER_CURRENCY_CONVERTER = "analytics.charter.currency_converter"

HIVE_TABLE = "bitx_analytics.deposits_extended"
OUTPUT_TABLE = "analytics.bitx.deposits"

In [None]:
FIAT_CURRENCIES = list(currencies.currency_fiats())

# COMMAND ----------

windowUserRowNumber = Window.partitionBy(F.col("a.user_id")).orderBy(
 F.asc(F.col("b.created_at"))
)

# COMMAND ----------


def get_payment_deposits(
 payments_df: DataFrame, fiat_currencies: List[str] = FIAT_CURRENCIES
) -> DataFrame:
 """Returns payments filtered on deposits_only"""

 is_fiat_currency = payments_df.currency.isin(fiat_currencies)
 is_deposit_type = payments_df.type.isin([1, 2, 3, 4, 5, 6, 21])
 is_credit = payments_df.principal_e8 > 0

 return payments_df.where(is_fiat_currency & is_deposit_type & is_credit)


def get_deposit_timestamps(payment_deposits_df: DataFrame) -> DataFrame:
 """Returns the customer first and last deposit timestamps per deposit types"""

 return payment_deposits_df.groupBy("user_id", "type").agg(
 F.min((F.col("timestamp") / 1000).cast(T.TimestampType())).alias(
 "first_deposit_at"
 ),
 F.max((F.col("timestamp") / 1000).cast(T.TimestampType())).alias(
 "last_deposit_at"
 ),
 )


def get_deposit_infos(
 deposits_df: DataFrame,
 payment_deposits_df: DataFrame,
 bank_transactions_df: DataFrame,
) -> DataFrame:
 """Returns deposit details"""

 return (
 deposits_df.alias("a")
 .join(
 payment_deposits_df.alias("b"),
 F.col("a.payment_id") == F.col("b.id"),
 "inner",
 )
 .join(
 bank_transactions_df.alias("c"),
 F.col("a.settlement_recon_entry") == F.col("c.id"),
 "left",
 )
 .select(
 "a.payment_type_artifact_id",
 "a.created_at",
 "a.payment_id",
 "a.payment_type",
 "a.status",
 "a.status_reason",
 "a.status_note",
 "a.card_network",
 "a.deadline",
 F.col("c.created_at").alias("settled_at"),
 F.col("c.statement_date").alias("statement_date"),
 )
 )


# COMMAND ----------


def transform(
 payments_df: DataFrame,
 deposits_df: DataFrame,
 bank_transactions_df: DataFrame,
 currency_converter_df: DataFrame,
 lookups_df: DataFrame,
 payment_provider_type_dims_df: DataFrame,
 payment_provider_dims_df: DataFrame,
 operational_accounts_df: DataFrame,
) -> DataFrame:

 payment_deposits_df = get_payment_deposits(payments_df)
 deposit_timestamps_df = get_deposit_timestamps(payment_deposits_df)
 deposit_infos_df = get_deposit_infos(
 deposits_df, payment_deposits_df, bank_transactions_df
 )

 return (
 payment_deposits_df.alias("a")
 .join(
 deposit_infos_df.alias("b"),
 F.col("a.id") == F.col("b.payment_id"),
 how="left",
 )
 .join(
 bank_transactions_df.alias("c"),
 F.col("a.id") == F.col("c.payment_id"),
 how="left",
 )
 .join(
 currency_converter_df.alias("d"),
 (
 (F.round(F.col("a.timestamp") / (1000 * 15 * 60)) * 15 * 60).cast(
 T.TimestampType()
 )
 == F.col("d.reference_at")
 )
 & (F.col("a.currency") == F.col("d.currency")),
 how="left",
 )
 .withColumn(
 "amount_usd",
 F.col("a.principal_e8") / 1e8 * F.col("d.average_price_per_usd"),
 )
 .join(
 deposit_timestamps_df.alias("e"),
 (F.col("a.user_id") == F.col("e.user_id"))
 & (F.col("a.type") == F.col("e.type")),
 how="left",
 )
 .join(
 lookups_df.alias("f"),
 (F.col("b.payment_type") == F.col("f.lookup_enum"))
 & (F.col("f.lookup_type") == "deposits_payment_type"),
 how="left",
 )
 .join(
 lookups_df.alias("g"),
 (F.col("a.bank_account") == F.col("g.lookup_enum"))
 & (F.col("g.lookup_type") == "payments_bank_name"),
 how="left",
 )
 .join(
 lookups_df.alias("h"),
 (F.col("c.payment_rail") == F.col("h.lookup_enum"))
 & (F.col("h.lookup_type") == "payment_rail_name"),
 how="left",
 )
 .join(
 lookups_df.alias("i"),
 (F.col("b.status_reason") == F.col("i.lookup_enum"))
 & (F.col("i.lookup_type") == "status_reason_description"),
 how="left",
 )
 .join(
 lookups_df.alias("j"),
 (F.col("b.status") == F.col("j.lookup_enum"))
 & (F.col("j.lookup_type") == "status_name"),
 how="left",
 )
 .join(
 lookups_df.alias("k"),
 (F.col("a.type") == F.col("k.lookup_enum"))
 & (F.col("k.lookup_type") == "payments_type"),
 how="left",
 )
 .join(
 payment_provider_type_dims_df.alias("ppt"),
 F.col("b.payment_type") == F.col("ppt.id"),
 how="left",
 )
 .join(
 payment_provider_dims_df.alias("pp"),
 F.col("ppt.payment_provider_id") == F.col("pp.id"),
 how="left",
 )
 .join(
 operational_accounts_df.alias("ac"),
 F.col("a.bank_account") == F.col("ac.account_number"),
 how="left",
 )
 .withColumn(
 "card_network_standardised",
 F.when(F.lower(F.col("b.card_network")).like("%visa%"), "Visa")
 .when(F.lower(F.col("b.card_network")).like("%master%"), "MasterCard")
 .otherwise(F.col("b.card_network")),
 )
 .withColumn(
 "bank_country",
 F.when(F.col("ac.country").isNull(), F.substring(F.col("a.currency"), 1, 2))
 .otherwise(F.col("ac.country"))
 .alias("bank_country"),
 )
 .withColumn(
 "bank_name",
 F.when(F.col("ac.name").isNull(), F.col("g.lookup_name")).otherwise(
 F.col("ac.name")
 ),
 )
 .withColumn("deposit_number", F.row_number().over(windowUserRowNumber))
 .withColumn(
 "bank_region",
 F.when(F.col("bank_country").isin(["ZA", "NG", "ZM", "UG"]), "Africa")
 .when(
 F.col("bank_country").isin(["MY", "ID", "SG", "AU"]), "Southeast Asia"
 )
 .when(F.col("bank_country").isin(["EU", "GB", "FR"]), "Europe")
 .when(F.col("bank_country") == "US", "North America")
 .otherwise("Other"),
 )
 .select(
 # payment deposits
 "a.account_id",
 "a.user_id",
 "a.currency",
 F.col("a.bank_account").alias("bank_account"),
 "bank_name",
 "bank_country",
 "bank_region",
 "a.type",
 F.col("k.lookup_name").alias("type_name_description"),
 F.col("a.id").alias("payment_id"),
 (F.col("a.timestamp") / 1000).cast(T.TimestampType()).alias("payment_at"),
 (F.col("a.timestamp") / 1000)
 .cast(T.TimestampType())
 .cast(T.DateType())
 .alias("p_payment_date"),
 (F.col("principal_e8") / 1e8).alias("amount"),
 F.col("amount_usd"),
 F.coalesce(
 F.col("b.created_at"),
 F.col("c.created_at"),
 (F.col("a.timestamp") / 1000).cast(T.TimestampType()),
 ).alias("created_at"),
 # deposit infos
 "b.payment_type",
 F.col("ppt.name").alias("payment_type_name"),
 F.col("pp.provider_name").alias("provider_name"),
 F.col("pp.country").alias("provider_country"),
 "b.status",
 F.col("j.lookup_name").alias("status_name"),
 "b.status_reason",
 F.col("i.lookup_name").alias("status_reason_description"),
 "b.status_note",
 "b.card_network",
 "card_network_standardised",
 "b.deadline",
 "b.settled_at",
 "b.statement_date",
 # bank transactions
 "c.payment_rail",
 F.col("h.lookup_name").alias("payment_rail_name"),
 # currency converter
 "d.average_price_per_usd",
 # deposit timestamps
 "e.first_deposit_at",
 "e.last_deposit_at",
 "deposit_number",
 "b.payment_type_artifact_id",
 )
 )


# COMMAND ----------


def run_pipeline(ctx: Context) -> None:
 """Orchestrates the pipeline"""

 payments_df = ctx.spark.read.table(SILVER_PAYMENTS)

 deposits_df = ctx.spark.read.table(SILVER_DEPOSITS)

 bank_transactions_df = ctx.spark.read.table(SILVER_BANK_TRANSACTIONS)

 currency_converter_df = ctx.spark.read.table(SILVER_CURRENCY_CONVERTER)

 lookups_df = ctx.spark.table("analytics.bitx.vw_lookups")

 payment_provider_type_dims_df = ctx.spark.read.table(
 "bitx_analytics.payment_provider_type_dims"
 )

 payment_provider_dims_df = ctx.spark.read.table(
 "bitx_analytics.payment_provider_dims"
 )

 operational_accounts_df = ctx.spark.read.table(
 "bitx_analytics.operational_accounts"
 )

 output_df = transform(
 payments_df,
 deposits_df,
 bank_transactions_df,
 currency_converter_df,
 lookups_df,
 payment_provider_type_dims_df,
 payment_provider_dims_df,
 operational_accounts_df,
 )

 # output_df.write.insertInto(OUTPUT_TABLE, overwrite=True)
 output_df.write.format("delta").mode("overwrite").option(
 "overwriteSchema", "true"
 ).saveAsTable(OUTPUT_TABLE)


# COMMAND ----------

if __name__ == "__main__":
 context = Context.for_databricks()
 utils.create_managed_uc_table(context.spark, HIVE_TABLE, OUTPUT_TABLE)
 run_pipeline(context)
 utils.upgrade_managed_hive_table(context.spark, HIVE_TABLE, OUTPUT_TABLE)

# COMMAND ----------