In [None]:
pip install plotly

# Importing Environment Variables

In [1]:
import os
import openai
import sys

sys.path.append('../..')

from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file

openai.api_key  = os.environ['OPENAI_API_KEY']
google_api_key = os.environ['GOOGLE_API_KEY']
google_cse_id=os.environ['GOOGLE_CSE_ID']

# Spark Session Initialization

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("English_SDK_Demo_with_FIFA_WC_Final_Results") \
    .getOrCreate()

# OpenAI Initialization

In [3]:
from pyspark_ai import SparkAI
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model_name='gpt-3.5-turbo', temperature=0)
spark_ai = SparkAI(llm, spark_session=spark, verbose=True)
# The following activates partial functions for Spark DataFrame
# so, the DataFrame itself can be used to call the new Gen AI functions.
spark_ai.activate()  

[92mINFO: [0mThe pyspark.sql.connect.dataframe module could not be imported. This might be due to your PySpark version being below 3.4.


# Data Ingestion

In [4]:
wc_df = spark_ai.create_df("https://en.wikipedia.org/wiki/List_of_FIFA_World_Cup_finals")
wc_df.show(n=5)

[92mINFO: [0mParsing URL: https://en.wikipedia.org/wiki/List_of_FIFA_World_Cup_finals

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mworld_cup_finals[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m
[37m    [39;49;00m[34myear[39;49;00m,[37m[39;49;00m
[37m    [39;49;00mwinners,[37m[39;49;00m
[37m    [39;49;00mrunners_up,[37m[39;49;00m
[37m    [39;49;00mvenue,[37m[39;49;00m
[37m    [39;49;00m[34mlocation[39;49;00m,[37m[39;49;00m
[37m    [39;49;00mattendance[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00m
[37m    [39;49;00m([34mVALUES[39;49;00m[37m [39;49;00m
[37m        [39;49;00m([34m1930[39;49;00m,[37m [39;49;00m[33m'Uruguay'[39;49;00m,[37m [39;49;00m[33m'Argentina'[39;49;00m,[37m [39;49;00m[33m'Estadio Centenario'[39;49;

# Data Transformarions

In [5]:
att_df = wc_df.ai.transform("select only year and attendance order by year ascending")
att_df.show(n=5)

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34myear[39;49;00m,[37m [39;49;00mattendance[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34myear[39;49;00m[37m [39;49;00m[34mASC[39;49;00m[37m[39;49;00m



+----+----------+
|year|attendance|
+----+----------+
|1930|     68346|
|1934|     55000|
|1938|     45000|
|1950|    173850|
|1954|     62500|
+----+----------+
only showing top 5 rows



In [6]:
att_df2 = att_df.ai.transform("add a new column called 'serial_no' with year and attendance order by year ascending")
att_df3 = att_df2.ai.transform ("get attendance from previous serial_no and add that as a new column called 'prev_attd' along with year and attendance order by year ascending")
att_df3.show(n=5)

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34myear[39;49;00m,[37m [39;49;00mattendance,[37m [39;49;00mROW_NUMBER()[37m [39;49;00mOVER[37m [39;49;00m([34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34myear[39;49;00m[37m [39;49;00m[34mASC[39;49;00m)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mserial_no[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34myear[39;49;00m[37m [39;49;00m[34mASC[39;49;00m[37m[39;49;00m

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m
[37m  [39;49;00m[34myear[39;49;00m,[37m [39;49;00m
[37m  [39;49;00mattendance,[37m [39;49;00m
[37m  [39;49;00mLAG(attendance)[37m [39;49;00mOVER[37m [39;49;00m([34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34myear[39;49;00m)[37m [39;49;

In [11]:
@spark_ai.udf
def attendance_diff_w_prev_year(attendance: int, prev_attd: int) -> int:
    """Calculate 'attendance' difference with 'prev_attd' as percentage change. Return as integer"""
    ...
    
spark.udf.register("attendance_diff_w_prev_year", attendance_diff_w_prev_year)
att_df3.createOrReplaceTempView("att_df3")

df = spark.sql("select year, attendance, prev_attd, attendance_diff_w_prev_year(attendance, prev_attd) as attendance_diff_percentage from att_df3")

[92mINFO: [0mCreating following Python UDF:
[34mdef[39;49;00m [32mattendance_diff_w_prev_year[39;49;00m(attendance, prev_attd) -> [36mint[39;49;00m:[37m[39;49;00m
    [34mif[39;49;00m attendance [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m [35mand[39;49;00m prev_attd [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [36mint[39;49;00m(((attendance - prev_attd) / prev_attd) * [34m100[39;49;00m)[37m[39;49;00m



# Plot

In [7]:
att_df.ai.plot("attendance over year, print all years")

[92mINFO: [0mTo visualize the result of the `df` dataframe using Plotly, we can follow these steps:

1. Import the necessary libraries: `pyspark.sql` for Spark SQL operations and `plotly.graph_objects` for creating the plot.
2. Convert the Spark DataFrame `df` into a Pandas DataFrame using the `toPandas()` method.
3. Create a line plot using Plotly, with the `year` column as the x-axis and the `attendance` column as the y-axis.
4. Display the plot directly using the `show()` method.

Here's the code that accomplishes the above steps:


```
[34mimport[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mas[39;49;00m [04m[36mF[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m[37m[39;49;00m
[37m[39;49;00m
[37m# Convert Spark DataFrame to Pandas DataFrame[39;49;00m

In [8]:
winner_df = wc_df.ai.transform ("select winner, rename 'West Germany' as 'Germany'")
winner_df.ai.plot("pie chart with top 6 winners")

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m
[37m  [39;49;00m[34mCASE[39;49;00m[37m [39;49;00m
[37m    [39;49;00m[34mWHEN[39;49;00m[37m [39;49;00mwinners[37m [39;49;00m=[37m [39;49;00m[33m'West Germany'[39;49;00m[37m [39;49;00m[34mTHEN[39;49;00m[37m [39;49;00m[33m'Germany'[39;49;00m[37m[39;49;00m
[37m    [39;49;00m[34mELSE[39;49;00m[37m [39;49;00mwinners[37m[39;49;00m
[37m  [39;49;00m[34mEND[39;49;00m[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mwinner[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00m
[37m  [39;49;00mtemp_view_for_transform[37m[39;49;00m

[92mINFO: [0mTo visualize the result of the `df` dataframe using Plotly, we can create a pie chart with the top 6 winners. Here's the Python code to achieve this:


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[37m[39;49;0

In [9]:
runner_df = wc_df.ai.transform ("select runner, rename 'West Germany' as 'Germany'")
#runners_up
runner_df.ai.plot("pie chart with top 10 runner ups")

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m
[37m  [39;49;00m[34mCASE[39;49;00m[37m [39;49;00m
[37m    [39;49;00m[34mWHEN[39;49;00m[37m [39;49;00mrunners_up[37m [39;49;00m=[37m [39;49;00m[33m'West Germany'[39;49;00m[37m [39;49;00m[34mTHEN[39;49;00m[37m [39;49;00m[33m'Germany'[39;49;00m[37m[39;49;00m
[37m    [39;49;00m[34mELSE[39;49;00m[37m [39;49;00mrunners_up[37m[39;49;00m
[37m  [39;49;00m[34mEND[39;49;00m[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mrunner[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00m
[37m  [39;49;00mtemp_view_for_transform[37m[39;49;00m

[92mINFO: [0mTo visualize the result of the `df` dataframe using Plotly, we can create a pie chart with the top 10 runner-ups. Here's the Python code to achieve this:


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[37

In [32]:
finalist_df.show(5)

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34myear[39;49;00m,[37m [39;49;00mwinners[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mfinalists[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mUNION[39;49;00m[37m[39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m[34myear[39;49;00m,[37m [39;49;00mrunners_up[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mfinalists[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34myear[39;49;00m[37m [39;49;00m[34mASC[39;49;00m[37m[39;49;00m



+----+--------------+
|year|     finalists|
+----+--------------+
|1930|       Uruguay|
|1930|     Argentina|
|1934|         Italy|
|1934|Czechoslovakia|
|1938|       Hungary|
+----+--------------+
only showing top 5 rows



In [38]:
wc_clean_df = wc_df.ai.transform ("rename 'West Germany' as 'Germany' in winners and runners_up")
finalist_df = wc_clean_df.ai.transform ("""select year, winner union 
                                  select year, runner column. 
                                  Name the new column as finalists,
                                  Order year ascending""")
finalist_df2 = finalist_df.ai.transform ("select finalists")
finalist_df2.ai.plot("bar chart of finalists from most to least")

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m
[37m    [39;49;00m[34myear[39;49;00m,[37m[39;49;00m
[37m    [39;49;00m[34mCASE[39;49;00m[37m [39;49;00m[34mWHEN[39;49;00m[37m [39;49;00mwinners[37m [39;49;00m=[37m [39;49;00m[33m'West Germany'[39;49;00m[37m [39;49;00m[34mTHEN[39;49;00m[37m [39;49;00m[33m'Germany'[39;49;00m[37m [39;49;00m[34mELSE[39;49;00m[37m [39;49;00mwinners[37m [39;49;00m[34mEND[39;49;00m[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mwinners,[37m[39;49;00m
[37m    [39;49;00m[34mCASE[39;49;00m[37m [39;49;00m[34mWHEN[39;49;00m[37m [39;49;00mrunners_up[37m [39;49;00m=[37m [39;49;00m[33m'West Germany'[39;49;00m[37m [39;49;00m[34mTHEN[39;49;00m[37m [39;49;00m[33m'Germany'[39;49;00m[37m [39;49;00m[34mELSE[39;49;00m[37m [39;49;00mrunners_up[37m [39;49;00m[34mEND[39;49;00m[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00mrunners_up,[37m[39;49;00m
[37m    [

[92mINFO: [0mTo visualize the result of `df` using plotly, we can follow these steps:

1. Import the necessary libraries:

```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mas[39;49;00m [04m[36mF[39;49;00m[37m[39;49;00m
```

2. Group the dataframe by the `finalists` column and count the number of occurrences:

```
df_grouped = df.groupBy([33m"[39;49;00m[33mfinalists[39;49;00m[33m"[39;49;00m).agg(F.count([33m"[39;49;00m[33m*[39;49;00m[33m"[39;49;00m).alias([33m"[39;49;00m[33mcount[39;49;00m[33m"[39;49;00m))[37m[39;49;00m
```

3. Sort the dataframe by the count in descending order:

```
df_sorted = df_grouped.orderBy(F.desc([33m"[39;49;00m[33mcount[39;49;00m[33m"[39;49;00m))[37m[39;49;00m

# Explain

## Explaing a Spark DataFrame

In [39]:
finalist_df2.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [finalists#579]
   +- Sort [year#12 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(year#12 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=1857]
         +- HashAggregate(keys=[year#12, finalists#579], functions=[])
            +- Exchange hashpartitioning(year#12, finalists#579, 200), ENSURE_REQUIREMENTS, [plan_id=1854]
               +- HashAggregate(keys=[year#12, finalists#579], functions=[])
                  +- Union
                     :- LocalTableScan [year#12, finalists#579]
                     +- LocalTableScan [year#587, finalists#588]




## Explaining using Open AI

In [40]:
finalist_df2.ai.explain()

"In summary, this dataframe is retrieving the finalists of the World Cup for each year. It combines the winners and runners-up of each year, replacing 'West Germany' with 'Germany' in the process. The results are sorted by year."

# Verification

In [44]:
finalist_df2.ai.verify("expect Germany is total 8 finalists")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mhas_8_finalists[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [37m# Filter the DataFrame to only include rows where finalists is equal to Germany[39;49;00m[37m[39;49;00m
    germany_finalists = df.filter(df.finalists == [33m"[39;49;00m[33mGermany[39;49;00m[33m"[39;49;00m)[37m[39;49;00m
[37m[39;49;00m
    [37m# Get the number of rows where finalists is equal to Germany[39;49;00m[37m[39;49;00m
    num_germany_finalists = germany_finalists.count()[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the number of Germany finalists is equal to 8[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m num_germany_finalists == [34m8[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;00m
    [34melse[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;49;00m
result = has_8_finalists(df)[37m[39;49;00m

[92mINFO: 

# In-memory Cache

In [45]:
spark_ai.commit()