# DataFrame transform with the Spark SQL ReAct Agent

Use [Kaggle Subscription Dataset](https://www.kaggle.com/datasets/gsagar12/dspp1/data)

In [1]:
## install necessary packages
# !pip install pyspark_ai
# !pip install pyspark

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [12]:
spark = SparkSession.builder.appName("subscription").getOrCreate()

In [4]:
# generate dates from start_date to end_date
start_date = '2021-01-01'
end_date = '2021-12-31'
dates_query = f"SELECT sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day) as date"
dates_df = spark.sql(dates_query).withColumn("date", F.explode(F.col("date")))
dates_df.show()

                                                                                

+----------+
|      date|
+----------+
|2021-01-01|
|2021-01-02|
|2021-01-03|
|2021-01-04|
|2021-01-05|
|2021-01-06|
|2021-01-07|
|2021-01-08|
|2021-01-09|
|2021-01-10|
|2021-01-11|
|2021-01-12|
|2021-01-13|
|2021-01-14|
|2021-01-15|
|2021-01-16|
|2021-01-17|
|2021-01-18|
|2021-01-19|
|2021-01-20|
+----------+
only showing top 20 rows



In [13]:
spark.read.csv("customer_product.csv", header=True).createOrReplaceTempView("customer_product")
spark.read.csv("product_info.csv", header=True).createOrReplaceTempView("product_info")

In [6]:
spark.sql("select min(CAST(signup_date_time as DATE)), max(CAST(signup_date_time as DATE)) from customer_product").show()



+-----------------------------------+-----------------------------------+
|min(CAST(signup_date_time AS DATE))|max(CAST(signup_date_time AS DATE))|
+-----------------------------------+-----------------------------------+
|                         2017-01-01|                         2021-12-31|
+-----------------------------------+-----------------------------------+



                                                                                

In [7]:
# prepare data
sql_query = """
select c.customer_id, 
c.product, 
CAST(c.signup_date_time as DATE) as signup_date_time, 
CAST(c.cancel_date_time AS DATE) AS cancel_date_time,
p.name,
p.price,
p.billing_cycle
from customer_product c
join product_info p on c.product = p.product_id
"""
df = spark.sql(sql_query)
df.show()

+-----------+-------+----------------+----------------+--------------------+-----+-------------+
|customer_id|product|signup_date_time|cancel_date_time|                name|price|billing_cycle|
+-----------+-------+----------------+----------------+--------------------+-----+-------------+
|      C2448|  prd_1|      2017-01-01|            NULL| annual_subscription| 1200|           12|
|      C2449|  prd_1|      2017-01-01|      2021-09-05| annual_subscription| 1200|           12|
|      C2450|  prd_1|      2017-01-01|      2019-01-13| annual_subscription| 1200|           12|
|      C2451|  prd_2|      2017-01-01|            NULL|monthly_subscription|  125|            1|
|      C2452|  prd_1|      2017-01-01|      2021-06-28| annual_subscription| 1200|           12|
|      C2453|  prd_1|      2017-01-01|            NULL| annual_subscription| 1200|           12|
|      C2454|  prd_1|      2017-01-01|      2021-06-01| annual_subscription| 1200|           12|
|      C2455|  prd_1|      201

##  Active Subscriptions

### tradition sql

In [8]:
# calculate active subsription customers for a sepefic date and for each product
df.createOrReplaceTempView("customer_product_combined")
analysis_date = '2021-10-05'
subscritions_sql = f"""
SELECT
    '{analysis_date}' as analysis_date,
    product,
    name as product_name,
    COUNT(distinct customer_id) AS subscription_count
FROM
    customer_product_combined
WHERE
    signup_date_time <= '{analysis_date}'
    AND (cancel_date_time IS NULL OR cancel_date_time > '{analysis_date}')
GROUP BY
    1,2,3
"""
spark.sql(subscritions_sql).show()



+-------------+-------+--------------------+------------------+
|analysis_date|product|        product_name|subscription_count|
+-------------+-------+--------------------+------------------+
|   2021-10-05|  prd_1| annual_subscription|            252793|
|   2021-10-05|  prd_2|monthly_subscription|            135700|
+-------------+-------+--------------------+------------------+



                                                                                

### generative ai

In [16]:
from pyspark_ai import SparkAI
from langchain.chat_models import ChatOpenAI

In [39]:
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.6)

spark_ai = SparkAI(llm=llm,verbose=True)
spark_ai.activate()

In [40]:
# active subscriptions at 2021-10-05
df.ai.transform("calculate active subscription customers for each product at a specific date 2021-10-05").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_2ec336[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new AgentExecutor chain...[0m
**************text: Final Answer: 

SELECT product, COUNT(DISTINCT customer_id) AS active_subscription_customers
FROM spark_ai_temp_view_2ec336
WHERE signup_date_time <= '2021-10-05' AND (cancel_date_time > '2021-10-05' OR cancel_date_time IS NULL)
GROUP BY product **************
[32;1m[1;3mFinal Answer: 

SELECT product, COUNT(DISTINCT customer_id) AS active_subscription_customers
FROM spark_ai_temp_view_2ec336
WHERE signup_date_time <= '2021-10-05' AND (cancel_date_time > '2021-10-05' OR cancel_date_time IS NULL)
GROUP BY product[0m

[1m> Finished chain.[0m
[92mINFO: [0mSQL query:
[34mSELECT[39;49;00m[37m [39;49;00mproduct,[37m [39;49;00m[34mCOUNT[39;49;00m([34mDISTINCT[39;49;00m[37m [39;49;00mcustomer_id)[37m [39;49;00m[34mAS[39;49;00m[37m 

[Stage 29:>                                                         (0 + 3) / 3]

+-------+-----------------------------+
|product|active_subscription_customers|
+-------+-----------------------------+
|  prd_1|                       252793|
|  prd_2|                       135700|
+-------+-----------------------------+



                                                                                

##  Subscription Starts

### tradition sql

In [22]:
# calculate new subsription customers for each product at 2021-10-05

start_sql = f"""
SELECT
    '{analysis_date}' as analysis_date,
    product,
    name as product_name,
    COUNT(distinct customer_id) AS subscription_count
FROM
    customer_product_combined
WHERE
    signup_date_time = '{analysis_date}'
GROUP BY
    1,2,3
"""
spark.sql(start_sql).show()




+-------------+-------+--------------------+------------------+
|analysis_date|product|        product_name|subscription_count|
+-------------+-------+--------------------+------------------+
|   2021-10-05|  prd_1| annual_subscription|               199|
|   2021-10-05|  prd_2|monthly_subscription|               229|
+-------------+-------+--------------------+------------------+



                                                                                

### generative ai

In [None]:
# new subscription starts at 2021-10-05
df.ai.transform("get new subscriptions for each product at date 2021-10-05").show()

##  Subscription Churn

### tradition sql

In [None]:
# calculate new subsription customers for each product at 2021-10-05

start_sql = f"""
SELECT
    '{analysis_date}' as analysis_date,
    product,
    name as product_name,
    COUNT(distinct customer_id) AS subscription_count
FROM
    customer_product_combined
WHERE
    cancel_date_time = '{analysis_date}'
GROUP BY
    1,2,3
"""
spark.sql(start_sql).show()

### generative ai

In [None]:
# calculate how many customers churn for each product at 2021-10-05
df.ai.transform("calculate how many customers churn for each product at 2021-10-05").show()

In [20]:
# calculate how many customers churn for each product at 2021-10-05
df.ai.transform("give me top 10 most tenure customers, if cancel_date_time is null, use current_date").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_703cd8[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new AgentExecutor chain...[0m
**************text: Thought: I will select the `customer_id`, `name`, and calculate the tenure using the `signup_date_time` and `cancel_date_time` columns. If `cancel_date_time` is null, I will use the current date. Then I will order the results by tenure in descending order and limit the output to the top 10 customers.

Action: query_validation
Action Input: 
SELECT customer_id, name, DATEDIFF(IF(cancel_date_time IS NULL, current_date, cancel_date_time), signup_date_time) AS tenure
FROM spark_ai_temp_view_703cd8
ORDER BY tenure DESC
LIMIT 10
 **************
[32;1m[1;3mThought: I will select the `customer_id`, `name`, and calculate the tenure using the `signup_date_time` and `cancel_date_time` columns. If `cancel_date_time` is null, I will use the current date. Then

                                                                                

+-----------+--------------------+------+
|customer_id|                name|tenure|
+-----------+--------------------+------+
|    C131975|monthly_subscription|  2480|
|    C131994| annual_subscription|  2480|
|    C131976| annual_subscription|  2480|
|      C2451|monthly_subscription|  2480|
|    C131977| annual_subscription|  2480|
|    C131987| annual_subscription|  2480|
|    C131991| annual_subscription|  2480|
|    C131989| annual_subscription|  2480|
|      C2453| annual_subscription|  2480|
|    C131990| annual_subscription|  2480|
+-----------+--------------------+------+

