# Problem Statement: Identify First-Time and Repeat Customers by Date
You are given a transactional dataset representing customer orders in a table named customer_orders. 
for each order_date, determine:
- The number of first-time customers placing their very first order on that date
- The number of repeat customers who have placed orders before that date



In [26]:
#spark Session
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = (
    SparkSession
    .builder
    .appName("spark intro") 
    .master("local[*]")
    .getOrCreate()
)                                                                                                                                                                                                                         

In [4]:
# DataFrame
data = [
    {"order_id": 1, "customer_id": 100, "order_date": "2022-01-01", "order_amount": 2000},
    {"order_id": 2, "customer_id": 200, "order_date": "2022-01-01", "order_amount": 2500},
    {"order_id": 3, "customer_id": 300, "order_date": "2022-01-01", "order_amount": 2100},
    {"order_id": 4, "customer_id": 100, "order_date": "2022-01-02", "order_amount": 2000},
    {"order_id": 5, "customer_id": 400, "order_date": "2022-01-02", "order_amount": 2200},
    {"order_id": 6, "customer_id": 500, "order_date": "2022-01-02", "order_amount": 2700},
    {"order_id": 7, "customer_id": 100, "order_date": "2022-01-03", "order_amount": 3000},
    {"order_id": 8, "customer_id": 400, "order_date": "2022-01-03", "order_amount": 1000},
    {"order_id": 9, "customer_id": 600, "order_date": "2022-01-03", "order_amount": 3000}
]
df = spark.createDataFrame(data)

df.show()

+-----------+------------+----------+--------+
|customer_id|order_amount|order_date|order_id|
+-----------+------------+----------+--------+
|        100|        2000|2022-01-01|       1|
|        200|        2500|2022-01-01|       2|
|        300|        2100|2022-01-01|       3|
|        100|        2000|2022-01-02|       4|
|        400|        2200|2022-01-02|       5|
|        500|        2700|2022-01-02|       6|
|        100|        3000|2022-01-03|       7|
|        400|        1000|2022-01-03|       8|
|        600|        3000|2022-01-03|       9|
+-----------+------------+----------+--------+



In [27]:
#using window and selectExpr both, so that the problem can be solved in either wway
window_spec = Window.partitionBy("customer_id")

df1 = df.withColumn("first_order_date", expr("min(order_date) over(partition by customer_id)")) \
        .withColumn("first_order_date2", min("order_date").over(window_spec)) \
        .withColumn("first_order_flag", expr(" case when first_order_date = order_date then 1 else 0 end")) \
        .withColumn("repeat_order_flag", expr(" case when first_order_date != order_date then 1 else 0 end"))
df1.show()

+-----------+------------+----------+--------+----------------+-----------------+----------------+-----------------+
|customer_id|order_amount|order_date|order_id|first_order_date|first_order_date2|first_order_flag|repeat_order_flag|
+-----------+------------+----------+--------+----------------+-----------------+----------------+-----------------+
|        100|        2000|2022-01-01|       1|      2022-01-01|       2022-01-01|               1|                0|
|        100|        2000|2022-01-02|       4|      2022-01-01|       2022-01-01|               0|                1|
|        100|        3000|2022-01-03|       7|      2022-01-01|       2022-01-01|               0|                1|
|        200|        2500|2022-01-01|       2|      2022-01-01|       2022-01-01|               1|                0|
|        300|        2100|2022-01-01|       3|      2022-01-01|       2022-01-01|               1|                0|
|        400|        2200|2022-01-02|       5|      2022-01-02| 

In [31]:
df2 = df1.groupBy("order_date").agg(  sum("first_order_flag").alias("first_customer_count"),
                                      sum(col("repeat_order_flag").alias("repeat_customer_count")  )
                                   ).orderBy(col("order_date").asc())

df2.show()

+----------+--------------------+-----------------------------------------------+
|order_date|first_customer_count|sum(repeat_order_flag AS repeat_customer_count)|
+----------+--------------------+-----------------------------------------------+
|2022-01-01|                   3|                                              0|
|2022-01-02|                   2|                                              1|
|2022-01-03|                   1|                                              2|
+----------+--------------------+-----------------------------------------------+

