# Riesgo_PySpark.ipynb
<h> by PySpark
#### Estado: **Ok**
<p> Source: Se realizará la lectura de STG2 para obtener los dataframes necesarios para generar la tabla de 'Riesgos', y cargarla en formato Delta Table para su posterior consulta.
<p> Se tomará la información desde el STG2 para generar un consumible de STG3 en formato Delta Table.
<p> Inputs:
  <ol>
    <li>Player</li>
    <li>Depositos y Retiros</li>
    <li>Apuestas y Premios</li>
    <li>Otras Operaciones</li>
  </ol>
<p> Output: Delta Table actualizado de "Riesgos" y su carga en el Warehouse de Databricks.

## 1. Start Up

### Declaración de librerias utilizadas
<p> pyspark.sql.functions = Importa funciones de PySpark para manipular y trabajar con columnas y datos en los DataFrames.
<p> pyspark.sql.types = Importa tipos de datos, para definir la estructura de los DataFrames.

In [0]:
from pyspark.sql.functions import col, date_format, lit, coalesce
from pyspark.sql.types import IntegerType, StringType, DateType, TimestampType, LongType, DecimalType
from pyspark.sql import Window
from delta.tables import DeltaTable
from datetime import datetime

### Nomenclatura de Fuentes y Auxiliares
<ul>
  <li>df = dataframe</li>
  <li>p = player</li>
  <li>ayp = apuestas y premios</li>
  <li>dyr = depositos y premios</li>
  <li>op = otras operaciones</li>
  <li>ad = apuestas deportivas</li>
</ul>

## Input

In [0]:
df_p = spark.read.table('path.productivo.player')
df_ayp = spark.read.table('path.productivo.apuestas_y_premios')
df_dyr = spark.read.table('path.productivo.depositos_y_retiros')
df_op = spark.read.table('path.productivo.otras_operaciones')
df_ad = spark.read.table('path.productivo.apuestas_deportivas')
df_b = spark.read.table('path.productivo.balance')
df_churn = spark.read.table('path.productivo.interacciones_player')

In [0]:
df_dic_ad = spark.read.table('path.productivo.diccionario_aadd')

## Caso N° 1
<p> Realizo algún Cash Out en el mes

In [0]:
df_Caso1 = df_ad.filter(col('combination_status_id') == 6)\
    .select(
        'partner_id', 
        'partner_player_id', 
        date_format(col('fecha'), 'yyyy-MM').alias('mes')
    ).distinct().orderBy('partner_player_id')

df_Caso1 = df_Caso1.withColumn('Caso1', lit(5))

## Caso N° 2
<p> Realizo una apuesta de Cuota Baja

In [0]:
df_Caso2 = df_ad.filter((col('combination_odds') >= 1) & (col('combination_odds') <= 1.07) & (col('operation_type_id') == 4))\
    .select(
        'partner_id', 
        'partner_player_id',
        # 'combination_status_id',
        date_format(col('fecha'), 'yyyy-MM').alias('mes')
    ).distinct().orderBy('partner_player_id')

df_Caso2 = df_Caso2.withColumn('Caso2', lit(5))

## Caso N° 3
<p> Cantidad distintas de IPs X Mes

In [0]:
df_la = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/bplaybet_arg/dbo/login_activities')
df_usr = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/bplaybet_arg/dbo/users')

# Definir los datos
data = [(46, 460), (13, 130), (88, 256)]

# Definir los nombres de las columnas
columns = ["id", "partner"]

# Crear el DataFrame
df_partner_Migra = spark.createDataFrame(data, columns)

In [0]:
from pyspark.sql.functions import date_format, col, countDistinct, concat, when, format_string

# Cantidad de IPs
df_Caso3_GT = df_la.join(df_usr, df_la["user_id"] == df_usr["id"])\
        .join(df_partner_Migra, df_usr["bookmaker_id"] == df_partner_Migra["id"])\
        .select(
    date_format(df_la["created_at"], 'yyyy-MM').alias('Mes'),
    df_partner_Migra["partner"],
    #df_usr["id"],
    concat(df_partner_Migra["partner"], when(df_usr['alira_id'].isNotNull(), df_usr['alira_id'])
            .when(df_usr["bookmaker_id"].isin(88), concat(df_usr["bookmaker_id"], format_string("%08d", df_usr["id"]))) \
                .otherwise(df_usr["id"])).cast(LongType()).alias("partner_player_id"), ##### Migracion ####
    df_la["ip"]
).groupBy('partner', 'partner_player_id', 'Mes').agg(countDistinct('ip').alias('cant_ips')).orderBy('partner_player_id', 'Mes')

In [0]:
    # Get DF Player
df_p0 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_py/OGP/player')
df_p1 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_sf/OGP/player')
df_p2 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_caba/OGP/player')
df_p3 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_cordoba/OGP/player')
    # Get DF IPs
df_ip0 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_py/OGP/ip_log/')
df_ip1 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_sf/OGP/ip_log/')
df_ip2 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_caba/OGP/ip_log/')
df_ip3 = spark.read.parquet('/Volumes/boldtproddatalake/boldtdatalake/dwbplay_cordoba/OGP/ip_log/')

In [0]:
# Cantidad de IPs
def Create_Caso3(p, ip):
    p = p.filter((col('type') == 3))

    result = ip.join(p, p["id"] == ip["player"]).select(
        p["partner"].alias('partner_id'),
        concat(p["partner"], p["id"]).alias("partner_player_id"),
        date_format(ip["created_date"], 'yyyy-MM').alias('Mes'),
        ip["ip_address"]
    ).groupBy('partner_id', 'partner_player_id', 'Mes').agg(countDistinct('ip_address').alias('cant_ips')).orderBy('partner_player_id', 'Mes')
    return result

In [0]:
df_Caso3_0 = Create_Caso3(df_p0, df_ip0)
df_Caso3_1 = Create_Caso3(df_p1, df_ip1)
df_Caso3_2 = Create_Caso3(df_p2, df_ip2)
df_Caso3_3 = Create_Caso3(df_p3, df_ip3) # No funciona
df_Caso3_GT = df_Caso3_GT

df_Caso3 = df_Caso3_0.union(df_Caso3_1).union(df_Caso3_2).union(df_Caso3_3).union(df_Caso3_GT)

df_Caso3 = df_Caso3.withColumn(
    "Caso3",
    when(col("cant_ips") <= 9, lit(1))
    .when(col("cant_ips") <= 19, lit(2))
    .when(col("cant_ips") <= 29, lit(4))
    .otherwise(lit(5))
)

## Caso N° 4
<p> Bonos Convertidos

In [0]:
from pyspark.sql.functions import count, sum

df_Caso4 = df_op.filter((col('operation_type_id') == 5) & (col('balance_type_id') == 1))\
    .groupBy(
        'partner_id',
        'partner_player_id',
        date_format(col('fecha_operacion'), 'yyyy-MM').alias('Mes')
    ).agg(
        sum(col('movement_amount')).alias('monto')
    ).orderBy('partner_player_id', 'Mes').filter(col('monto') >= 0)

df_Caso4 = df_Caso4.withColumn(
    "Caso4",
    when(col("monto") <= 10000, lit(2))
    .when(col("monto") <= 43500, lit(4))
    .when(col("monto") > 43500, lit(5))
    .otherwise(lit(1))
)

## Caso N° 5
<p> GGR Menor 0 Y JUGADO EN BONOS Mayor JUGADO EN Efectivo$
<p> cuando el GGR es menor 0 (la ganancia es del player) y el monto apostado en Bono es mayor al monto apuestado en efectivo

In [0]:
# GGR Menor 0 Y JUGADO EN BONOS Mayor JUGADO EN Efectivo$

## GGR Efectivo
df_ayp_efectivo = df_ayp.filter(col('balance_type_id') == 1)
df_Caso5_GGR = df_ayp_efectivo.groupBy(
        'partner_id',
        'partner_player_id',
        date_format(col('fecha_operacion'), 'yyyy-MM').alias('Mes')
    ).agg(
        sum(col('movement_amount')*-1).alias('GGR')
    ).orderBy('partner_player_id', 'Mes')

## Apuestas Bono y Apuestas Efectivo
df_Ap = df_ayp.filter((col('operation_type_id') == 4) & (col('balance_type_id').isin(1, 8))).groupBy(
        'partner_id',
        'partner_player_id',
        date_format(col('fecha_operacion'), 'yyyy-MM').alias('Mes'),
        when(col('balance_type_id') == 1, 'Ap_Eftvo')
            .otherwise(when(col('balance_type_id') == 8, 'Ap_Bono')).alias('tipo')
    ).agg(
        sum(col('movement_amount')*-1).alias('GGR')
    ).orderBy('partner_player_id', 'Mes')

## PIVOT
# Asumimos que tu DataFrame se llama df
df_pivot = df_Ap.groupBy("partner_id", "partner_player_id", "Mes") \
                .pivot("tipo", ["Ap_Eftvo", "Ap_Bono"]) \
                .sum("GGR")
                
df_Caso5_Ap  = df_pivot.fillna(0.0, subset=["Ap_Eftvo", "Ap_Bono"])

df_Caso5_unificado = df_Caso5_GGR.join(df_Caso5_Ap, ['partner_id', 'partner_player_id', 'Mes'], 'inner').orderBy('partner_player_id', 'Mes')

In [0]:
df_Caso5 = df_Caso5_unificado.withColumn(
    "Caso5",
    when((col("GGR") < 0) & (col("Ap_Bono") > col("Ap_Eftvo")), lit(5))
    .otherwise(lit(1))
)

## Caso N° 6
<p> Deposito menor a (retiro + Saldo disponible)

In [0]:
df_C6_dyr = df_dyr.filter(col('transaction_status_id') == 2).groupBy(
        'partner_id',
        'partner_player_id',
        date_format(col('fecha'), 'yyyy-MM').alias('Mes'),
        when(col('transaction_type_id') == 1, 'Deposito')
            .otherwise('Retiro').alias('tipo')
    ).agg(
        sum('transaction_amount').alias('monto')
    )

## PIVOT
# Asumimos que tu DataFrame se llama df
df_pivot = df_C6_dyr.groupBy("partner_id", "partner_player_id", "Mes") \
                    .pivot("tipo", ["Deposito", "Retiro"]) \
                    .sum("monto")

df_Caso6_dyr  = df_pivot.fillna(0.0, subset=["Deposito", "Retiro"])

df_Caso6_unificado = df_Caso6_dyr.join(df_b.filter(col('balance_type_id') == 1), 'partner_player_id') \
                .select(
                    df_Caso6_dyr['partner_id'],
                    df_Caso6_dyr['partner_player_id'],
                    df_Caso6_dyr['Mes'],
                    df_Caso6_dyr['Deposito'],
                    df_Caso6_dyr['Retiro'],
                    df_b['balance_amount'].alias('Saldo Disponible')
                )

In [0]:
df_Caso6 = df_Caso6_unificado.withColumn(
    "Caso6",
    when(
        col("Deposito") < (col("Retiro") + col("Saldo Disponible")),
        when(col("Deposito") <= 100000, lit(2)) ## <= $100.000,00
        .when(col("Deposito") <= 200000, lit(4)) ## <= $200.000,00
        .otherwise(lit(5))
    ).otherwise(lit(1))
)

## Caso 7
<p> FTD o Deposito luego de 3 meses de inactividad

In [0]:
from pyspark.sql import functions as F, Window

# Definir ventana por jugador y orden temporal
w = Window.partitionBy("partner_player_id").orderBy("periodo")

# Flag de inactividad (1 si es churn/abandono, 0 en caso contrario)
df_flag = df_churn.withColumn(
    "inactivo",
    F.when(F.col("status").isin("Churn", "Abandono"), F.lit(1)).otherwise(F.lit(0))
)

# Contar rachas de inactividad consecutivas
df_flag = df_flag.withColumn(
    "racha_inactivo",
    F.when(F.col("inactivo") == 1,
           F.row_number().over(w) - F.row_number().over(w.rowsBetween(Window.unboundedPreceding, 0))
    )
)

# Crear columna "Caso7"
df_resultado = df_flag.withColumn(
    "Caso7",
    F.when(F.col("status") == "FTD", 5)  # FTD original
     .when(
        (F.col("inactivo") == 0) &  # vuelve a tener actividad
        (F.lag("inactivo", 1).over(w) == 1) &  # el mes anterior estaba inactivo
        (F.sum("inactivo").over(w.rowsBetween(-3, -1)) == 3),  # hubo 3 meses seguidos inactivo
        5
     )
)

df_Caso7 = df_resultado.filter(F.col("Caso7").isNotNull()).select("partner_player_id", col("periodo").alias('mes'), "Caso7")

## Caso 8
<p> Contraseñas duplicadas y si alguno de los usuarios se encuentra en estado como "contrato anulado"

In [0]:
  # Get Player
df_p0 = spark.read.parquet("/Volumes/boldtproddatalake/boldtdatalake/dwbplay_py/OGP/player").filter((col("status") != 1) & (col("type") == 3))
df_p1 = spark.read.parquet("/Volumes/boldtproddatalake/boldtdatalake/dwbplay_sf/OGP/player").filter((col("status") != 1) & (col("type") == 3))
df_p2 = spark.read.parquet("/Volumes/boldtproddatalake/boldtdatalake/dwbplay_caba/OGP/player").filter((col("status") != 1) & (col("type") == 3))
df_p3 = spark.read.parquet("/Volumes/boldtproddatalake/boldtdatalake/dwbplay_cordoba/OGP/player").filter((col("status") != 1) & (col("type") == 3))

In [0]:
from pyspark.sql import functions as F

def Caso8(p):
    p = p.filter((col("status") != 1) & (col("type") == 3))
    df = p.select(
            p["partner"].alias("partner_id"),
            concat(p["partner"], p["id"].cast(DecimalType(10,0))).alias("partner_player_id"),
            p["status"].alias("status_id"),
            p["password"].alias("password")
        )

    # Paso 1: marcar contraseñas duplicadas
    dup = df.withColumn(
        "dup_count", F.count("partner_player_id").over(Window.partitionBy("password"))
    )

    # Paso 2: detectar si alguna contraseña duplicada tiene status_id = 7
    flag = dup.withColumn(
        "has_status7",
        F.max(F.when(F.col("status_id") == 7, 1).otherwise(0)).over(Window.partitionBy("password"))
    )

    # Paso 3: asignar Caso8
    res = flag.withColumn(
        "Caso8",
        F.when(F.col("dup_count") > 1, 
            F.when(F.col("has_status7") == 1, 5).otherwise(3)
        ).otherwise(F.lit(None))  # si no está duplicada -> null
    ).drop("dup_count", "has_status7")

    return res


df_Caso8 = Caso8(df_p0).unionByName(Caso8(df_p1)).unionByName(Caso8(df_p2)).unionByName(Caso8(df_p3))

### Casos

In [0]:
df_C1 = df_Caso1 #Realizo algún Cash Out en el mes
df_C2 = df_Caso2 #Realizo una apuesta de Cuota Baja
df_C3 = df_Caso3 #Cantidad distintas de IPs X Mes
df_C4 = df_Caso4 #Bonos Convertidos
df_C5 = df_Caso5 #GGR Menor 0 Y JUGADO EN BONOS Mayor JUGADO EN Efectivo$
df_C6 = df_Caso6 #Deposito menor a (retiro + Saldo disponible)
df_C7 = df_Caso7 #FTD o mas de 3 meses seguidos inactivos
df_C8 = df_Caso8 #Contraseña duplicada con status_id = 7

In [0]:
df_C1 = df_ad.filter(col('combination_status_id') == 6)\
    .groupBy(
        'partner_id', 
        'partner_player_id', 
        date_format(col('fecha'), 'yyyy-MM').alias('mes')
    ).agg(count('*').alias('cant_CashOut')).orderBy('partner_player_id')

df_C2 = df_ad.filter((col('combination_odds') >= 1) & (col('combination_odds') <= 1.07) & (col('operation_type_id') == 4))\
    .groupBy(
        'partner_id', 
        'partner_player_id',
        date_format(col('fecha'), 'yyyy-MM').alias('mes')
    ).agg(count('*').alias('cant_ApCuotaBaja')).orderBy('partner_player_id')

res = df_C1.join(df_C2, ["partner_player_id", "mes"], "full")\
            .join(df_C3, ["partner_player_id", "mes"], "full")\
                .join(df_C4, ["partner_player_id", "mes"], "full")\
                    .join(df_C5, ["partner_player_id", "mes"], "full")\
                        .join(df_C6, ["partner_player_id", "mes"], "full")\
                            .join(df_p, ["partner_player_id"], "inner")\
    .select(
        df_p["partner_id"],
        df_p["partner_player_id"],
        #df_p["Camada_deposito"].alias("FTD"),
        coalesce(
            df_C1["mes"],
            df_C2["mes"],
            df_C3["mes"],
            df_C4["mes"],
            df_C5["mes"]).alias("mes"),
        df_C1["cant_CashOut"],
        df_C2["cant_ApCuotaBaja"],
        df_C3["cant_ips"],
        df_C4["monto"].alias("bono_conv"),
        df_C5["GGR"],
        df_C5["Ap_Eftvo"],
        df_C5["Ap_Bono"],
        df_C6["Deposito"],
        df_C6["Retiro"],
        df_C6["Saldo Disponible"].alias("Saldo_Disponible")
    )

res = res.fillna({'cant_CashOut': 0, 'cant_ApCuotaBaja': 0, 'cant_ips': 0, 'bono_conv': 0.00, 'GGR': 0.00, 'Ap_Eftvo': 0.00, 'Ap_Bono': 0.00, 'Deposito': 0.00, 'Retiro': 0.00, 'Saldo_Disponible': 0.00})
# 7028097
# print(res.count())

## Unificado

In [0]:
from pyspark.sql.functions import coalesce, col, when, lit

df_res = (
    df_Caso1.alias("df1")
    .join(df_Caso2.alias("df2"), ["partner_player_id", "mes"], "full")
    .join(df_Caso3.alias("df3"), ["partner_player_id", "mes"], "full")
    .join(df_Caso4.alias("df4"), ["partner_player_id", "mes"], "full")
    .join(df_Caso5.alias("df5"), ["partner_player_id", "mes"], "full")
    .join(df_Caso6.alias("df6"), ["partner_player_id", "mes"], "full")
    .join(df_Caso7.alias("df7"), ["partner_player_id", "mes"], "full")
    .join(df_Caso8.alias("df8"), ["partner_player_id"], "full")
    .join(df_p.alias("dfp"), ["partner_player_id"], "inner")
    .select(
        df_p["partner_id"],
        coalesce(
            col("df1.partner_player_id"),
            col("df2.partner_player_id"),
            col("df3.partner_player_id"),
            col("df4.partner_player_id"),
            col("df5.partner_player_id"),
            col("df6.partner_player_id"),
            col("df7.partner_player_id"),
            col("df8.partner_player_id"),
            col("dfp.partner_player_id")
        ).alias("partner_player_id"),
        coalesce(
            df_Caso1["mes"], 
            df_Caso2["mes"], 
            df_Caso3["mes"], 
            df_Caso4["mes"], 
            df_Caso5["mes"], 
            df_Caso6["mes"],
            df_Caso7["mes"]).alias("mes"),
        df_Caso1["Caso1"],
        df_Caso2["Caso2"],
        df_Caso3["Caso3"],
        df_Caso4["Caso4"],
        df_Caso5["Caso5"],
        df_Caso6["Caso6"],
        df_Caso7["Caso7"],
        df_Caso8["Caso8"],
        df_p["Camada_deposito"].alias("FTD")
    ).orderBy("partner_player_id", "mes")
)

df_res = df_res.select(
    "partner_id", "partner_player_id", "FTD", "mes", "Caso1", "Caso2", "Caso3", "Caso4", "Caso5", "Caso6", "Caso7", "Caso8"
).fillna({'Caso1': 1, 'Caso2': 1, 'Caso3': 1, 'Caso4': 1, 'Caso5': 1, 'Caso6': 1, 'Caso7': 1, 'Caso8': 1})

In [0]:
df_resultado_final = df_res.select(
    'partner_id',
    'partner_player_id',
    'FTD',
    'mes',
    'Caso1',
    (col('Caso1')*0.09).alias('Res1'),
    'Caso2',
    (col('Caso2')*0.09).alias('Res2'),
    'Caso3',
    (col('Caso3')*0.07).alias('Res3'),
    'Caso4',
    (col('Caso4')*0.07).alias('Res4'),
    'Caso5',
    (col('Caso5')*0.18).alias('Res5'),
    'Caso6',
    (col('Caso6')*0.18).alias('Res6'),
    'Caso7',
    (col('Caso7')*0.14).alias('Res7'),
    'Caso8',
    (col('Caso8')*0.18).alias('Res8')
)

df_resultado_final = df_resultado_final.join(res, ["partner_player_id", "mes", "partner_id"], 'full')

In [0]:
df_resultado_final.write \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable('path.productivo.riesgo')

## Resumido Mes x Mes

In [0]:
df_resumen_res = df_resultado_final.select(
    "partner_id",
    "partner_player_id",
    "FTD",
    col("mes").alias("periodo"),
    (col("Res1") + col("Res2") + col("Res3") + col("Res4") + col("Res5") + col("Res6") + col("Res7")).cast(DecimalType(10, 2)).alias('Resultado')
)

In [0]:
from pyspark.sql.functions import current_date
calendario = spark.read.table("path.productivo.calendario")
# Get Periodos "yyyy-MM"
periodo = calendario.select(date_format('Fecha', 'yyyy-MM').alias('periodo')).distinct()
periodo = periodo.filter(col('periodo') <= date_format(current_date(), 'yyyy-MM')).orderBy('periodo')

In [0]:
df_datos = df_resumen_res
df_periodos = periodo

In [0]:
from pyspark.sql.functions import lit, coalesce

# Paso 1: Crear una tabla cruzada con todos los periodos para cada jugador
df_ids = df_datos.select("partner_id", "partner_player_id", "FTD").distinct()

# Paso 2: Unir con todos los periodos
df_completo = df_ids.join(df_periodos, df_periodos["periodo"] >= df_ids["FTD"], "inner")

# Paso 3: Unir con los datos reales
df_final = df_completo.join(
    df_datos.select("partner_id", "partner_player_id", "periodo", col("Resultado").cast(DecimalType(10, 2))),
    on=["partner_id", "partner_player_id", "periodo"],
    how="left"
)

# Paso 4: Rellenar Resultado faltante con 1.00
df_final = df_final.fillna({"Resultado": 1.00})

# Paso 5: Orden opcional
df_final = df_final.orderBy("partner_id", "partner_player_id", "periodo")


In [0]:
df_final.write \
    .mode("overwrite")\
            .saveAsTable('path.productivo.resumen_riesgo')