# Optimize the query plan II

Suppose we want to join questions with users. We also want to apply a UDF (which performs some computation on the question's `body` field), and use a window function to order each user's questions by creation date.

See the suboptimal query below that performs this task, and try to rewrite it to achieve a more efficient execution plan. Specifically, try to eliminate the Exchange operator from the query plan.

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, udf, row_number
from pyspark.sql.types import IntegerType

import os

In [None]:
spark = (
    SparkSession
    .builder
    .appName('Optimize II')
    .getOrCreate()
)

In [None]:
base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-3]) 

questions_input_path = os.path.join(project_path, 'data/questions-json')
users_input_path = os.path.join(project_path, 'data/users')

In [None]:
# We will turn broadcast join off because we want to work with sort merge join (SMJ) because we want to assume that
# in practice both datasets are large so SMJ would manifest anyway

spark.conf.set('spark.sql.autoBroadcastJoinThreshold', -1)

In [None]:
usersDF = spark.read.parquet(users_input_path)

questionsDF = spark.read.json(questions_input_path)

#### UDF:

The UDF bellow is just simple function that gets the lenght of a string. This can be easily done using native pyspark dataframe function length. For the sake of this example however suppose that this function encapsulates some complex logic which cannot be done natively.

In [None]:
@udf(IntegerType())
def get_length_udf(str):
    return len(str)

#### Window definition:

In [None]:
w = Window().partitionBy('user_id').orderBy('creation_date')

In [None]:
(
    usersDF
    .join(questionsDF, 'user_id')
    .withColumn('question_len', get_length_udf('body'))
    .withColumn('question_n', row_number().over(w))
    .write
    .mode('overwrite')
    .format('noop')
    .save()
)

# Task:

The query above is suboptimal. Try to rewrite the query to achive more optimal plan that leads to more efficient execution.

Hint:
* see the query plan
* eliminate the Exchange from the plan

In [None]:
# your code here:


In [None]:
spark.stop()