# Installation

In [None]:
%pip install openai
%pip install pyspark

In [None]:
%pip show openai

# Library

In [None]:
import openai
from openai import AzureOpenAI
from openai import OpenAIError, RateLimitError, APIError
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row


# Remove before submission

In [None]:
# Key in the end point and the key
ENDPOINT = "Your Endpoint"
API_KEY = "Your API"

In [None]:
def estimate_productModel(review, max_retries=5, initial_wait_time=30):
    API_VERSION = "2024-02-01"
    MODEL_NAME = "gpt-4-turbo-2024-04-09"
    retry_count = 0
    wait_time = initial_wait_time

    client = AzureOpenAI(
        azure_endpoint=ENDPOINT,
        api_key=API_KEY,
        api_version=API_VERSION,
    )

    MESSAGES = [
        {"role": "system", "content": "You are helping to evalute the twitter review and estimate the product model.\
        This is the list of product:\
        Category → subcategory → product model\
        Bikes -> Road Bikes -> Road-250\
        Bikes -> Road Bikes -> Road-550-W\
        Bikes -> Mountain Bikes -> Mountain-200\
        Accessories -> Helmets -> Sport-100\
        Clothing -> Caps -> Cycling Cap\
        Clothing -> Gloves -> Half-Finger Gloves\
        Clothing -> Jerseys -> Long-Sleeve Logo Jersey.\
        If you are not sure about the prudct model. Please state it as N.A.\
        Please do not guess the product model outside of this list.\
        Please *ONLY* answer the product model without any other extra texts\
        "},
        {"role": "user", "content": review},]
    while retry_count < max_retries:
        try:
            completion = client.chat.completions.create(
                model=MODEL_NAME,
                messages=MESSAGES,
            )
            response = completion.choices[0].message.content
            print (response)
            return response

        except RateLimitError as e:
            print(f"Rate limit error (429) encountered: {e}. Retrying {retry_count + 1}/{max_retries} after {wait_time} seconds...")
            retry_count += 1
            time.sleep(wait_time)
            wait_time *= 2  

        except APIError as e:
            print(f"Server error (500) encountered: {e}. Retrying {retry_count + 1}/{max_retries} after {wait_time} seconds...")
            retry_count += 1
            time.sleep(wait_time)
            wait_time *= 2  

        except OpenAIError as e:
            print("An unexpected OpenAI error occurred:", e)
            break  

    
    return "N.A."

In [None]:
# API_VERSION = "2024-02-01"
# MODEL_NAME = "gpt-4-turbo-2024-04-09"
# review = "Just spent the day trying to figure out my new Mountain-100, @MountainBikes. It's like a relic from the past! So complicated to use and it's silver... really, in 2011? #OverIt #MountainBikeFails"

# client = AzureOpenAI(
#     azure_endpoint=ENDPOINT,
#     api_key=API_KEY,
#     api_version=API_VERSION,
# )

# MESSAGES = [
#     {"role": "system", "content": "You are helping to evalute the twitter review and estimate the product model.\
#     This is the list of product:\
#     Category → subcategory → product model\
#     Bikes -> Road Bikes -> Road-250\
#     Bikes -> Road Bikes -> Road-550-W\
#     Bikes -> Mountain Bikes -> Mountain-200\
#     Accessories -> Helmets -> Sport-100\
#     Clothing -> Caps -> Cycling Cap\
#     Clothing -> Gloves -> Half-Finger Gloves\
#     Clothing -> Jerseys -> Long-Sleeve Logo Jersey.\
#     If you are not sure about the prudct model. Please state it as N.A.\
#     Please do not guess the product model outside of this list.\
#     Please *ONLY* answer the product model without any other extra texts\
#     "},
#     {"role": "user", "content": review},]


# completion = client.chat.completions.create(
#             model=MODEL_NAME,
#             messages=MESSAGES,)
# response = completion.model_dump_json(indent=2)


# Initialise Spark session and define schema for the new DataFrame

In [None]:

spark = SparkSession.builder.appName("ProductModelEstimation").getOrCreate()

schema = StructType([
    StructField("reviews", StringType(), True),
    StructField("review_date", StringType(), True),
    StructField("username", StringType(), True),
    StructField("product_model", StringType(), True),
    StructField("review_source", StringType(), True)  
])

# Retrieve table into DataFrame

In [None]:
df = spark.sql("SELECT * FROM AdventureWorks_Lakehouse.Ops_Bronze.social_twitterreviews")
reviews_data = df.collect()

# Populate the product model into the DataFrame

In [None]:
processed_rows = []
for row in reviews_data:
    review = row["reviews"]
    review_date = row["review_date"]
    username = row["username"]
    
    product_model = estimate_productModel(review)

    processed_rows.append(Row(
        reviews=review,
        review_date=review_date,
        username=username,
        product_model=product_model,
        review_source="twitter"  
    ))
    df_with_product_model = spark.createDataFrame(processed_rows, schema)
    df_with_product_model.write.mode("overwrite").saveAsTable("AdventureWorks_Lakehouse.Ops_Silver.social_twitterreviews")



In [None]:
processed_rows

In [None]:
df_with_product_model = spark.createDataFrame(processed_rows, schema)

### Save the 

In [None]:
save = False
if save:
    df_with_product_model.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("AdventureWorks_Lakehouse.Ops_Silver.social_twitterreviews")


## Save the product model estimation as batch (Used as backup in case of kernal disconnection)

In [None]:
# Initialize an empty list to accumulate rows
processed_rows = []
batch_size = 50  # Number of rows to write per batch
counter = 0      # Row counter to keep track of rows in the current batch

for row in reviews_data:
    review = row["reviews"]
    review_date = row["review_date"]
    username = row["username"]

    # Get product model for the review
    product_model = estimate_productModel(review)

    # Add row to the processed_rows list
    processed_rows.append(Row(
        reviews=review,
        review_date=review_date,
        username=username,
        product_model=product_model,
        review_source="Twitter"  # Add review source
    ))

    counter += 1

    # When we reach the batch size, save to the table
    if counter == batch_size:
        # Create a DataFrame from the batch
        df_batch = spark.createDataFrame(processed_rows)
        
        # Write batch to the table (append mode)
        df_batch.write.mode("append").saveAsTable("AdventureWorks_Lakehouse.Ops_Silver.social_twitterreviews")
        
        # Clear the batch and reset the counter
        processed_rows.clear()
        counter = 0

# Write any remaining rows that didn’t make a full batch
if processed_rows:
    df_batch = spark.createDataFrame(processed_rows)
    df_batch.write.mode("append").saveAsTable("AdventureWorks_Lakehouse.Ops_Silver.social_twitterreviews")


# Rename the table columns and Make Alias

In [None]:
load = False
if load:
    df_with_product_model = spark.sql("SELECT * FROM AdventureWorks_Lakehouse.Ops_Silver.social_twitterreviews")
    display(df_with_product_model)

## Get the information from the ProductModel database

In [None]:
df_map_model_id = spark.sql("SELECT ProductModelId, Name FROM AdventureWorks_Lakehouse.dbo.Production_ProductModel")
df_map_model_id = df_map_model_id.withColumnRenamed("Name", "ProductModelName_DB")
display(df_map_model_id)

## Join Table from the ProductModel database

In [None]:
from pyspark.sql import functions as F
df_with_product_model = df_with_product_model.withColumn("product_model", F.trim(F.col("product_model")))
df_with_id = df_with_product_model.withColumn("ID", F.monotonically_increasing_id())
df_with_date = df_with_id.withColumn("review_date", F.to_date(F.col("review_date"), "dd/MM/yyyy"))
df_with_date = df_with_date.withColumn("PostedYearMonth", F.date_format("review_date", "yyyy-MM"))
df_with_mapped_id = df_with_date.join(df_map_model_id,df_with_date["product_model"] == df_map_model_id["ProductModelName_DB"],"left")
display(df_with_mapped_id)

## Rename the table

In [None]:
df_renamed = df_with_mapped_id.select(
    F.col("ID"),
    F.col("review_date").alias("PostedDate"),
    F.col("PostedYearMonth"),
    F.col("review_source").alias("ReviewSource"),
    F.col("username").alias("ReviewerUsername"),
    F.col("ProductModelId"),
    F.col("product_model").alias("ProductModelName"),
    F.col("reviews").alias("Review")
)

In [None]:
display(df_renamed)

## Save the table to Silver level

In [None]:
# Overwrite the existing table with the new DataFrame including 'comment_id'
df_renamed.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("AdventureWorks_Lakehouse.Ops_Silver.Social_Reviews")


In [None]:
df = spark.sql("SELECT * FROM AdventureWorks_Lakehouse.Ops_Silver.Social_Reviews LIMIT 1000")
display(df)