In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]') \
        .appName('Basics') \
        .getOrCreate()

print(spark.version)

3.5.4


In [5]:
df_2018 = spark.read.csv(f"C:/Users/gyalm/churn_prediction/churn_prediction/data/week2_for_teacher_demo.csv.gz", inferSchema=True, header=True)
df_2018.show(5)

+----------------+-----------+--------------------+------+-------+--------+------+-----+---------+-----------+------+-------------+
|transaction_date|member_type|           member_id|gender|product|quantity|amount|  age|card_type|branch_name|region|category_name|
+----------------+-----------+--------------------+------+-------+--------+------+-----+---------+-----------+------+-------------+
|      2018-01-17|     member|ffc9e419-cfda-11e...|FEMALE|    420|       2|  5300|50-55|  REGULAR|    D_Store|     2|          PET|
|      2018-04-10|     member|ff39d825-cfda-11e...|  MALE|     69|       4|  7100|40-45|  REGULAR|    D_Store|     2|     CLEANERS|
|      2018-02-19|     member|ff90d1f8-cfda-11e...|FEMALE|   1015|      18|  8100|40-45|  REGULAR|    J_Store|     1|          PET|
|      2018-01-27|     member|ffd5caf9-cfda-11e...|  MALE|     69|      11|  7100|18-25|  REGULAR|    H_Store|     4|     CLEANERS|
|      2018-05-07|     member|ff92f50e-cfda-11e...|FEMALE|   3456|      13| 

In [13]:
df_2018.createOrReplaceTempView("df_2018_view")

In [14]:
spark.sql('''SELECT * FROM df_2018_view LIMIT 5''').show()

+----------------+-----------+--------------------+------+-------+--------+------+-----+---------+-----------+------+-------------+
|transaction_date|member_type|           member_id|gender|product|quantity|amount|  age|card_type|branch_name|region|category_name|
+----------------+-----------+--------------------+------+-------+--------+------+-----+---------+-----------+------+-------------+
|      2018-01-17|     member|ffc9e419-cfda-11e...|FEMALE|    420|       2|  5300|50-55|  REGULAR|    D_Store|     2|          PET|
|      2018-04-10|     member|ff39d825-cfda-11e...|  MALE|     69|       4|  7100|40-45|  REGULAR|    D_Store|     2|     CLEANERS|
|      2018-02-19|     member|ff90d1f8-cfda-11e...|FEMALE|   1015|      18|  8100|40-45|  REGULAR|    J_Store|     1|          PET|
|      2018-01-27|     member|ffd5caf9-cfda-11e...|  MALE|     69|      11|  7100|18-25|  REGULAR|    H_Store|     4|     CLEANERS|
|      2018-05-07|     member|ff92f50e-cfda-11e...|FEMALE|   3456|      13| 

In [15]:
spark.sql('''SELECT COUNT(DISTINCT member_id) AS MemberId FROM df_2018_view''').show()

+--------+
|MemberId|
+--------+
|  302262|
+--------+



In [41]:
spark.sql('''SELECT COUNT(DISTINCT member_id) AS MemberId FROM df_2018_view WHERE transaction_date LIKE "%2018-04%"''').show()

+--------+
|MemberId|
+--------+
|   67717|
+--------+



In [18]:
month_txn_window = [
    ('2018-01-01', '2018-01-31', 'pm1_total_txn'),
    ('2018-02-01', '2018-02-28', 'pm2_total_txn'),
    ('2018-03-01', '2018-03-31', 'pm3_total_txn'),
    ('2018-04-01', '2018-04-30', 'pm4_total_txn'),
    ('2018-05-01', '2018-05-31', 'pm5_total_txn'),
    ('2018-06-01', '2018-06-30', 'pm6_total_txn')]

sum_txn_window = ",\n        ".join([
    f"SUM(CASE WHEN transaction_date BETWEEN '{start_date}' AND '{end_date}' THEN quantity ELSE 0 END) AS {past_month}"
for start_date, end_date, past_month in month_txn_window
])

spark.sql(f'''
WITH
aggregated_customers AS (
    SELECT DISTINCT member_id
    FROM df_2018_view
    WHERE transaction_date BETWEEN '{month_txn_window[::len(month_txn_window)][0][0]}' AND '{month_txn_window[::-1][0][1]}'
),

aggregated_customers_with_past_txns AS (
    SELECT DISTINCT
        a.member_id,
        b.quantity,
        b.transaction_date
    FROM
        aggregated_customers AS a
    LEFT JOIN
        df_2018_view AS b
    ON
        a.member_id = b.member_id
    WHERE
        b.transaction_date BETWEEN '{month_txn_window[::len(month_txn_window)][0][0]}' AND '{month_txn_window[::-1][0][1]}'
),

aggregated_total_txns_per_month AS (
    SELECT DISTINCT
        member_id,
        {sum_txn_window}
    FROM
        aggregated_customers_with_past_txns
    GROUP BY
        member_id
)

SELECT DISTINCT
    member_id,
    {", ".join(list(map(lambda x: x[2], month_txn_window)))},
    CASE 
        WHEN { " AND ".join(list(map(lambda x: f"{x[2]} = 0", month_txn_window))) } THEN 'Churned'
        ELSE 'Active'
    END AS churn_status
FROM
    aggregated_total_txns_per_month
LIMIT 
20
''').show()

+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+
|           member_id|pm1_total_txn|pm2_total_txn|pm3_total_txn|pm4_total_txn|pm5_total_txn|pm6_total_txn|churn_status|
+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+
|ffd9e9d8-cfda-11e...|            0|            0|           19|            5|            2|            0|      Active|
|006274b5-cfdb-11e...|            0|            0|            0|            0|           15|            0|      Active|
|ffd1853a-cfda-11e...|            0|            0|           14|            0|            0|            0|      Active|
|ff4e988d-cfda-11e...|            0|            0|           17|            7|           11|           42|      Active|
|ffe9e850-cfda-11e...|            0|            7|            0|            0|            0|            0|      Active|
|ffbb8c2c-cfda-11e...|            0|    

In [45]:
month_txn_window = [
    ('2018-01-01', '2018-01-31', 'pm1_total_txn'),
    ('2018-02-01', '2018-02-28', 'pm2_total_txn'),
    ('2018-03-01', '2018-03-31', 'pm3_total_txn'),
    ('2018-04-01', '2018-04-30', 'pm4_total_txn'),
    ('2018-05-01', '2018-05-31', 'pm5_total_txn'),
    ('2018-06-01', '2018-06-30', 'pm6_total_txn')]

sum_txn_window = ",\n        ".join([
    f"SUM(CASE WHEN transaction_date BETWEEN '{start_date}' AND '{end_date}' THEN quantity ELSE 0 END) AS {past_month}"
    for start_date, end_date, past_month in month_txn_window
])

spark.sql(f'''
WITH
aggregated_customers AS (
    SELECT DISTINCT member_id
    FROM df_2018_view
    WHERE transaction_date BETWEEN '{month_txn_window[::len(month_txn_window)][0][0]}' AND '{month_txn_window[::-1][0][1]}'
),

aggregated_customers_with_past_txns AS (
    SELECT DISTINCT
        a.member_id,
        COALESCE(b.quantity, 0) AS quantity,
        b.transaction_date
    FROM
        aggregated_customers AS a
    LEFT JOIN
        df_2018_view AS b
    ON
        a.member_id = b.member_id
    WHERE
        b.transaction_date BETWEEN '{month_txn_window[::len(month_txn_window)][0][0]}' AND '{month_txn_window[::-1][0][1]}'
),

aggregated_total_txns_per_month AS (
    SELECT
        member_id,
        {sum_txn_window}
    FROM
        aggregated_customers_with_past_txns
    GROUP BY
        member_id
),

customers_with_target_definition AS (
    SELECT
        member_id,
        {", ".join([x[2]for x in month_txn_window])},
        CASE 
            WHEN { " AND ".join([f"{x[2]} = 0" for x in month_txn_window]) } THEN 1
            ELSE 0
        END AS target
    FROM
        aggregated_total_txns_per_month
),

count_customers_with_target_definition AS (
    SELECT
        target,
        COUNT(member_id) AS cnt_member_id,
        COUNT(DISTINCT member_id) AS cntd_member_id
    FROM
        customers_with_target_definition
    GROUP BY
        target
)

SELECT
    *
FROM
    count_customers_with_target_definition
''').show()

+------+-------------+--------------+
|target|cnt_member_id|cntd_member_id|
+------+-------------+--------------+
|     0|       289307|        289307|
+------+-------------+--------------+

