<a href="https://colab.research.google.com/github/hashangit/pySparkAI-demo/blob/main/PySpark_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installing Python and Pip

In [None]:

!sudo apt-get update -y
!sudo apt-get install python3.11
!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1
!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10.12 2


In [None]:
!python --version

Python 3.11.8


In [None]:
!curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
!python3 get-pip.py --force-reinstall

## Installing Packages

In [None]:
!pip install pyspark pyarrow pyspark-ai openai langchain langchain-openai

## Secrets

In [None]:
from google.colab import userdata
userdata.get('OPENAI_API_KEY')

## Initiate PySparkAI

In [15]:
import openai
from langchain_openai import ChatOpenAI
from pyspark_ai import SparkAI
from pyspark.sql import SparkSession
from google.colab import userdata
import os

model = "gpt-4-0125-preview"
# model="gpt-3.5-turbo-0125"

os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
llm = ChatOpenAI(model=model, temperature=0)
sparkai = SparkAI(llm=llm, verbose=True)
sparkai.activate()

## Creating the df

In [None]:
link = "https://docs.google.com/spreadsheets/d/1VbUa5AvOBgArjoigeuZbptKKoqpKKB5WPgcV77AYAIg/edit#gid=1630247784"
#link = "https://docs.google.com/spreadsheets/d/17-oqBiI2hVAQZ58NzvhqKM0QnAOFF2lbZRnTxHG7UFI/edit#gid=1637711490"

auto_df = sparkai.create_df(link)
auto_df.show(5)

## Dataframe Explaination

In [None]:
print(auto_df.ai.explain())

## Applying Transformations

In [None]:
Topic_Performance = auto_df.ai.transform("advertising performance by topic where the highest conversions at lowest cost would mean the best performance. lowest cost per conversion = best performance. define topic based on the landing page url taken as context (url: Topic, https://creately.com/lp/concept-map-maker/:Concept Map, https://creately.com/lp/org-chart-software/:Orgchart)")

In [23]:
Topic_Performance.show(20)

+-----+-----------------+----------+---------------------------+
|Topic|Total Conversions|Total Cost|Average Cost Per Conversion|
+-----+-----------------+----------+---------------------------+
|Other|               23|    124.37|                   1.535556|
+-----+-----------------+----------+---------------------------+



## Verifying

In [29]:
auto_df.ai.verify("expect cost_per_conversion to be below $20")

[92mINFO: [0mLLM Output:
def cost_per_conversion_below_20(df) -> bool:
    # Import necessary PySpark SQL functions
    from pyspark.sql.functions import col

    # Check if the maximum cost_per_conversion is below $20
    max_cost_per_conversion = df.select(col("cost_per_conversion")).rdd.max()[0]
    if max_cost_per_conversion < 20:
        return True
    else:
        return False

result = cost_per_conversion_below_20(df)


INFO:spark_ai:LLM Output:
def cost_per_conversion_below_20(df) -> bool:
    # Import necessary PySpark SQL functions
    from pyspark.sql.functions import col

    # Check if the maximum cost_per_conversion is below $20
    max_cost_per_conversion = df.select(col("cost_per_conversion")).rdd.max()[0]
    if max_cost_per_conversion < 20:
        return True
    else:
        return False

result = cost_per_conversion_below_20(df)


[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mcost_per_conversion_below_20[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [37m# Import necessary PySpark SQL functions[39;49;00m[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the maximum cost_per_conversion is below $20[39;49;00m[37m[39;49;00m
    max_cost_per_conversion = df.select(col([33m"[39;49;00m[33mcost_per_conversion[39;49;00m[33m"[39;49;00m)).rdd.max()[[34m0[39;49;00m][37m[39;49;00m
    [34mif[39;49;00m max_cost_per_conversion < [34m20[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;00m
    [34melse[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;49;00m
result = cost_per_conversion_below_2

INFO:spark_ai:Generated code:
[34mdef[39;49;00m [32mcost_per_conversion_below_20[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [37m# Import necessary PySpark SQL functions[39;49;00m[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the maximum cost_per_conversion is below $20[39;49;00m[37m[39;49;00m
    max_cost_per_conversion = df.select(col([33m"[39;49;00m[33mcost_per_conversion[39;49;00m[33m"[39;49;00m)).rdd.max()[[34m0[39;49;00m][37m[39;49;00m
    [34mif[39;49;00m max_cost_per_conversion < [34m20[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;00m
    [34melse[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;49;00m
result = cost_per_conversion_below_20

[92mINFO: [0m
Result: True


INFO:spark_ai:
Result: True


True

## Visualization

In [None]:
auto_df.show(5)

In [34]:
auto_df.ai.plot("Plot a bar chart with matplotlib to show the impressions, conversions, cost and cost per conversion by landing page. impressions should be plot on right y axis while the other 3 on the left y axis")

[92mINFO: [0mTo achieve the visualization of the DataFrame `df` using plotly as per the given requirements, follow the steps below. This includes aggregating the data in Spark, converting it to a Pandas DataFrame, and then plotting it using Plotly. Note that we're using Plotly for plotting as per the initial request, even though the purpose mentions matplotlib, assuming it's a typo given the context.

First, ensure you have a Spark session running:


```
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m functions [34mas[39;49;00m F[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas

INFO:spark_ai:To achieve the visualization of the DataFrame `df` using plotly as per the given requirements, follow the steps below. This includes aggregating the data in Spark, converting it to a Pandas DataFrame, and then plotting it using Plotly. Note that we're using Plotly for plotting as per the initial request, even though the purpose mentions matplotlib, assuming it's a typo given the context.

First, ensure you have a Spark session running:


```
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m functions [34mas[39;49;00m F[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[

'from pyspark.sql import SparkSession\nfrom pyspark.sql import functions as F\nimport pandas as pd\nimport plotly.graph_objects as go\nfrom plotly.subplots import make_subplots\n\n# Start Spark session\nspark = SparkSession.builder.getOrCreate()\n# Aggregate data\ndf_agg = df.groupBy("landing_page").agg(\n    F.sum("impressions").alias("total_impressions"),\n    F.sum("conversions").alias("total_conversions"),\n    F.sum("cost").alias("total_cost"),\n    (F.sum("cost") / F.sum("conversions")).alias("avg_cost_per_conversion")\n).orderBy("landing_page")\n# Convert to Pandas DataFrame\npdf = df_agg.toPandas()\n# Create subplots with 2 y-axes\nfig = make_subplots(specs=[[{"secondary_y": True}]])\n\n# Add bar charts for conversions, cost, and cost per conversion on the left y-axis\nfig.add_trace(go.Bar(x=pdf[\'landing_page\'], y=pdf[\'total_conversions\'], name=\'Conversions\'), secondary_y=False)\nfig.add_trace(go.Bar(x=pdf[\'landing_page\'], y=pdf[\'total_cost\'], name=\'Cost\'), secondar