In [1]:
%%configure -f
{
    "conf": {
        "spark.sql.legacy.parquet.datetimeRebaseModeInRead": "LEGACY",
        "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY"
    }
}

In [2]:
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

from typing import Iterable

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)

spark = glueContext.spark_session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
predictions: DataFrame = spark.read.option("recursiveFileLookup", "true").load(
    path='s3://cvm-krz-conformed-7a4f071/predictions/part-00000-ebbf45d3-a716-4675-b300-7682d5438db9-c000.snappy.parquet',
    format='parquet'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
predictions = predictions.select('idi_counterparty', 'next_best_offer')
predictions.printSchema()
predictions.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- idi_counterparty: string (nullable = true)
 |-- next_best_offer: string (nullable = true)

+----------------+---------------+
|idi_counterparty|next_best_offer|
+----------------+---------------+
|             137|       offer_01|
|             137|       offer_02|
|             137|       offer_03|
|             139|       offer_06|
|             139|       offer_07|
|             139|       offer_08|
|             128|       offer_04|
|             128|       offer_02|
|             128|       offer_05|
+----------------+---------------+

In [10]:
data = [
    {'idi_gcr': 139, 'idi_src': 171, 'bu': 'A'},
    {'idi_gcr': 139, 'idi_src': 271, 'bu': 'B'},
    # {'idi_gcr': 121235671, 'idi_src': 371, 'bu': 'C'},
    # {'idi_gcr': 121235672, 'idi_src': 171, 'bu': 'A'},
    {'idi_gcr': 138, 'idi_src': 271, 'bu': 'B'},
    # {'idi_gcr': 121235672, 'idi_src': 371, 'bu': 'C'},
    # {'idi_gcr': 121235673, 'idi_src': 171, 'bu': 'A'},
    # {'idi_gcr': 121235673, 'idi_src': 271, 'bu': 'B'},
    # {'idi_gcr': 121235673, 'idi_src': 371, 'bu': 'C'},
]
id_mapping: DataFrame = spark.createDataFrame(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
id_mapping.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+-------+
| bu|idi_gcr|idi_src|
+---+-------+-------+
|  A|    139|    171|
|  B|    139|    271|
|  B|    138|    271|
+---+-------+-------+

In [12]:
a = predictions.join(id_mapping, predictions.idi_counterparty == id_mapping.idi_gcr, 'left').drop('idi_counterparty')
a.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----+-------+-------+
|next_best_offer|  bu|idi_gcr|idi_src|
+---------------+----+-------+-------+
|       offer_01|null|   null|   null|
|       offer_02|null|   null|   null|
|       offer_03|null|   null|   null|
|       offer_04|null|   null|   null|
|       offer_02|null|   null|   null|
|       offer_05|null|   null|   null|
|       offer_06|   A|    139|    171|
|       offer_06|   B|    139|    271|
|       offer_07|   A|    139|    171|
|       offer_07|   B|    139|    271|
|       offer_08|   A|    139|    171|
|       offer_08|   B|    139|    271|
+---------------+----+-------+-------+

In [14]:
a = predictions.select('idi_counterparty').dropDuplicates()
a = a.join(id_mapping, a.idi_counterparty == id_mapping.idi_gcr, 'left').drop('idi_counterparty').na.drop(
    subset=['idi_gcr'])
a = a.groupBy('idi_gcr').pivot('bu').agg(collect_set(col('idi_src')))

r = a

from typing import Dict


def reverted_meld(df: DataFrame, cols: Dict[str, str]):
    for f, t in cols.items():
        if f in df.columns:
            df = df.withColumn(t, when(col(f)[0].isNull(), 'dummy').otherwise(col(f)[0].cast(StringType()))).drop(f)
        else:
            df = df.withColumn(t, lit('dummy').cast(StringType())).drop(f)
    return df


r = reverted_meld(
    r,
    {
        'A': 'crf_id',
        'B': 'share_id',
        'C': 'braze_id'
    }
)

r = r.join(predictions, a.idi_gcr == predictions.idi_counterparty, how='left')
r = r.drop('idi_counterparty')
r = r.withColumnRenamed('idi_gcr', 'gcr_id').withColumnRenamed('next_best_offer', 'offer')

r.show(100, truncate=False)

# r = a.withColumn('bu_id', concat(col('bu'), col('idi_src')))
#
# r = r.groupBy('idi_gcr').agg(collect_list(concat(col('idi_src'), col('bu'), col('next_best_offer'))))
# r.orderBy('idi_gcr').show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+--------+--------+--------+
|gcr_id|crf_id|share_id|braze_id|offer   |
+------+------+--------+--------+--------+
|139   |171   |271     |dummy   |offer_08|
|139   |171   |271     |dummy   |offer_07|
|139   |171   |271     |dummy   |offer_06|
+------+------+--------+--------+--------+

In [None]:
from pyspark.sql.window import Window

asd = r.withColumn('rank', row_number().over(Window.partitionBy("gcr_id").orderBy('gcr_id')))
asd.show(truncate=False)
asd.printSchema()
