In [1]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

## We have sales data with order_date , customer_id and qty customer ordered.


## We need to count the customers added in each month

In [3]:
data = [('2021-01-01', 'C1', '20'),('2021-01-01', 'C2', '30'),('2021-02-01', 'C1', '10'),('2021-02-01', 'C3', '15'),
('2021-03-01', 'C5', '19'),('2021-03-01', 'C4', '10'),('2021-04-01', 'C3', '13'),('2021-04-01', 'C5', '15'),
('2021-04-01', 'C6', '10')]
schema = StructType(
[
    StructField("order_date",StringType()),
    StructField("customer_id",StringType()),
    StructField("order_qty",StringType())
])

In [4]:
sales_df = spark.createDataFrame(data,schema)

In [5]:
sales_df.show()

+----------+-----------+---------+
|order_date|customer_id|order_qty|
+----------+-----------+---------+
|2021-01-01|         C1|       20|
|2021-01-01|         C2|       30|
|2021-02-01|         C1|       10|
|2021-02-01|         C3|       15|
|2021-03-01|         C5|       19|
|2021-03-01|         C4|       10|
|2021-04-01|         C3|       13|
|2021-04-01|         C5|       15|
|2021-04-01|         C6|       10|
+----------+-----------+---------+



## Extracted month only from order_date so that we can play around month during the transformations. We actually can avoid using original order_date and quantity to reduce the data processing

In [12]:
sales_modified_df = sales_df.withColumn("month",month(sales_df.order_date))
sales_modified_df.show()

+----------+-----------+---------+-----+
|order_date|customer_id|order_qty|month|
+----------+-----------+---------+-----+
|2021-01-01|         C1|       20|    1|
|2021-01-01|         C2|       30|    1|
|2021-02-01|         C1|       10|    2|
|2021-02-01|         C3|       15|    2|
|2021-03-01|         C5|       19|    3|
|2021-03-01|         C4|       10|    3|
|2021-04-01|         C3|       13|    4|
|2021-04-01|         C5|       15|    4|
|2021-04-01|         C6|       10|    4|
+----------+-----------+---------+-----+



Creating window on customer_id and order by date so that same user will have row_number with order by of month

In [8]:
customer_window = Window.partitionBy("customer_id").orderBy("month")
new_sales_df = sales_modified_df.withColumn("row",row_number().over(customer_window))

In [9]:
new_sales_df.show()

+----------+-----------+---------+-----+---+
|order_date|customer_id|order_qty|month|row|
+----------+-----------+---------+-----+---+
|2021-04-01|         C6|       10|    4|  1|
|2021-02-01|         C3|       15|    2|  1|
|2021-04-01|         C3|       13|    4|  2|
|2021-03-01|         C4|       10|    3|  1|
|2021-03-01|         C5|       19|    3|  1|
|2021-04-01|         C5|       15|    4|  2|
|2021-01-01|         C1|       20|    1|  1|
|2021-02-01|         C1|       10|    2|  2|
|2021-01-01|         C2|       30|    1|  1|
+----------+-----------+---------+-----+---+



Extracted count distinct of the customers with grouping of the month to get our desired output ; As we are only interested in new customers then second and future orders we are not interesed so we extracted only first month with row = 1

In [13]:
new_sales_df.filter("row = '1'").groupBy("month").agg(countDistinct(new_sales_df.customer_id).alias("new_customers_added")).show()

+-----+-------------------+
|month|new_customers_added|
+-----+-------------------+
|    1|                  2|
|    3|                  2|
|    4|                  1|
|    2|                  1|
+-----+-------------------+

