# DataFrame transform example with the Spark SQL ReAct Agent

The example is modified from the blog post [Introducing Window Functions in Spark SQL](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html).

In [1]:
from pyspark_ai import SparkAI

spark_ai = SparkAI(verbose=True)
spark_ai.activate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/15 11:28:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# create a dataframe productRevenue
df = spark_ai._spark.createDataFrame(
    [
        ("Normal", "Cellphone", 6000),
        ("Normal", "Tablet", 1500),
        ("Mini", "Tablet", 5500),
        ("Mini", "Cellphone", 5000),
        ("Foldable", "Cellphone", 6500),
        ("Foldable", "Tablet", 2500),
        ("Pro", "Cellphone", 3000),
        ("Pro", "Tablet", 4000),
        ("Pro Max", "Cellphone", 4500)
    ],
    ["product", "category", "revenue"]
)

In [3]:
df.ai.transform("What are the best-selling and the second best-selling products in every category?").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_d566e4[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new  chain...[0m
[32;1m[1;3mThought: I will use the ROW_NUMBER() function to rank the products in each category by revenue in descending order. Then I will filter the results to only include the products with a rank of 1 or 2.
Action: query_validation
Action Input: 
```
WITH ranked_products AS (
  SELECT 
    product, 
    category, 
    revenue, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  FROM spark_ai_temp_view_d566e4
)
SELECT product, category, revenue
FROM ranked_products
WHERE rank IN (1, 2)
```[0m
Observation: [33;1m[1;3mOK[0m
Thought:[32;1m[1;3mI now know the final answer.
Final Answer: 
```
WITH ranked_products AS (
  SELECT 
    product, 
    category, 
    revenue, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  FR

[Stage 0:>                                                        (0 + 10) / 10]

+--------+---------+-------+
| product| category|revenue|
+--------+---------+-------+
|Foldable|Cellphone|   6500|
|  Normal|Cellphone|   6000|
|    Mini|   Tablet|   5500|
|     Pro|   Tablet|   4000|
+--------+---------+-------+



                                                                                

In [3]:
df.ai.transform("What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_8f41a9[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new  chain...[0m
[32;1m[1;3mThought: I need to find the revenue of the best-selling product in each category first. Then, I will join this result with the original table and calculate the difference in revenue.
Action: query_sql_db
Action Input: "SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category"[0m

                                                                                


Observation: [36;1m[1;3m[('Cellphone', '6500'), ('Tablet', '5500')][0m
Thought:[32;1m[1;3mI can join this result with the original table and calculate the difference in revenue.
Action: query_validation
Action Input: 
```
SELECT 
    a.product, 
    a.category, 
    a.revenue, 
    b.max_revenue, 
    (b.max_revenue - a.revenue) as revenue_difference 
FROM 
    spark_ai_temp_view_8f41a9 a 
JOIN 
    (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b 
ON 
    a.category = b.category
```[0m
Observation: [33;1m[1;3mOK[0m
Thought:[32;1m[1;3mI now know the final answer.
Final Answer: 
```
SELECT 
    a.product, 
    a.category, 
    a.revenue, 
    b.max_revenue, 
    (b.max_revenue - a.revenue) as revenue_difference 
FROM 
    spark_ai_temp_view_8f41a9 a 
JOIN 
    (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b 
ON 
    a.category = b.category
```[0m

[1m> Finished chain.[0m
+--

In [5]:
df.ai.transform("Pivot the data by product and the revenue for each product").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_096b0f[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new  chain...[0m
[32;1m[1;3mThought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column product.
Action: query_sql_db
Action Input: "SELECT DISTINCT product FROM spark_ai_temp_view_096b0f"[0m
Observation: [36;1m[1;3m[('Normal',), ('Mini',), ('Foldable',), ('Pro',), ('Pro Max',)][0m
Thought:[32;1m[1;3mI can write a query to pivot the table by product and sum the revenue for each product.
Action: query_validation
Action Input: SELECT * FROM spark_ai_temp_view_096b0f PIVOT (SUM(revenue) FOR product IN ('Normal', 'Mini', 'Foldable', 'Pro', 'Pro Max'))[0m
Observation: [33;1m[1;3mOK[0m
Thought:[32;1m[1;3mI now know the final answer.
Final Answer: SELECT * FROM spark_ai_temp_view_096b0f 

In [6]:
df.ai.transform("Pivot the data by catagory and the revenue for each product").show()

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_48845a[39;49;00m[33m"[39;49;00m)[37m[39;49;00m



[1m> Entering new  chain...[0m
[32;1m[1;3mThought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column category.
Action: query_sql_db
Action Input: "SELECT DISTINCT category FROM spark_ai_temp_view_48845a"[0m
Observation: [36;1m[1;3m[('Cellphone',), ('Tablet',)][0m
Thought:[32;1m[1;3mI can write a query to pivot the table by category and sum the revenue for each product and category combination.
Action: query_validation
Action Input: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR category IN ('Cellphone', 'Tablet'))[0m
Observation: [33;1m[1;3mOK[0m
Thought:[32;1m[1;3mI now know the final answer.
Final Answer: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR catego

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

def generate_ddl(df, table_name, sample_rows=0):
    col_defs = []

    for col_name, dtype in df.dtypes:
        col_defs.append(f"{col_name} {dtype}")

    ddl = f"CREATE TABLE {table_name}({', '.join(col_defs)})"

    if sample_rows > 0:
        sample_data = df.limit(sample_rows).collect()

        # Format the sample rows as SQL VALUES
        values_str_list = []
        for row in sample_data:
            values_str_list.append(
                "(" + ", ".join([f'"{str(item)}"' if item is not None else "NULL" for item in row]) + ")"
            )

        values_str = ", ".join(values_str_list)

        # Create the CREATE TEMP VIEW statement
        ddl = f"CREATE TEMP VIEW `{table_name}` AS SELECT * FROM VALUES {values_str} as ({', '.join([f'`{col_name}`' for col_name, _ in df.dtypes])});"

    return ddl

In [5]:
spark = SparkSession.builder.master("local[1]").appName('SparkApp').getOrCreate()
df = spark.createDataFrame([(1, 'foo'), (2, 'bar'), (3, 'baz')], ['i', 's'])


In [6]:
print(generate_ddl(df, "t1"))

CREATE TABLE t1(i bigint, s string)


In [7]:
print(generate_ddl(df, "t1", sample_rows=2))

[Stage 0:>                                                          (0 + 1) / 1]

CREATE TEMP VIEW `t1` AS SELECT * FROM VALUES ("1", "foo"), ("2", "bar") as (`i`, `s`);


                                                                                

In [8]:
spark.sql("""CREATE TEMP VIEW `1-10015132-16` AS SELECT * FROM VALUES ("aleksandar radojević","25","serbia","center","1999-2000","barton cc (ks)"),("shawn respert","31","united states","guard","1997-98","michigan state"),("quentin richardson","n/a","united states","forward","2013-present","depaul"),("alvin robertson","7, 21","united states","guard","1995-96","arkansas"),("carlos rogers","33, 34","united states","forward-center","1995-98","tennessee state"),("roy rogers","9","united states","forward","1998","alabama"),("jalen rose","5","united states","guard-forward","2003-06","michigan"),("terrence ross","31","united states","guard","2012-present","washington") as (`Player`,`No.`,`Nationality`,`Position`,`Years in Toronto`,`School/Club Team`);
""")

DataFrame[]

In [10]:
generate_ddl(spark.table("`1-10015132-16`"), "t3", sample_rows=3)

'CREATE TEMP VIEW `t3` AS SELECT * FROM VALUES ("aleksandar radojević", "25", "serbia", "center", "1999-2000", "barton cc (ks)"), ("shawn respert", "31", "united states", "guard", "1997-98", "michigan state"), ("quentin richardson", "n/a", "united states", "forward", "2013-present", "depaul") as (`Player`, `No.`, `Nationality`, `Position`, `Years in Toronto`, `School/Club Team`);'