In [1]:
import os
import sys,json, requests
from snowflake.snowpark.types import StringType
from snowflake.snowpark.functions import udf
from snowflake.snowpark import Session
import pandas as pd

In [9]:
# Unpack env vars
API_BASE = "http://ollama:11434/api/generate"

connection_params = {
    'host': os.environ['SNOWFLAKE_HOST'],
    'port': os.environ['SNOWFLAKE_PORT'],
    'protocol': 'https',
    'account': os.environ['SNOWFLAKE_ACCOUNT'],
    'password':os.environ["SNOW_PASSWORD"],
    'role': os.environ["SNOW_ROLE"],
    'warehouse': os.environ["SNOW_WAREHOUSE"],
    'database': os.environ["SNOW_DATABASE"],
    'schema': os.environ["SNOW_SCHEMA"],
    'user': os.environ["SNOW_USER"], 
}


session = Session.builder.configs(connection_params).create()
maxrecords = 10


In [19]:
#@udf(name="build_prompts", is_permanent=True, stage_location="@REVIEW_DATA", replace=True, session=session)
def build_prompt(ASIN: str, reviewText: str):
    PROMPT_TEMPLATE = f'Here are 20 product reviews for a music product with an ID of "{ASIN}". Respond back only as only JSON! Only provide a single record returned. Provide the product "description", product "name", a summary of all the reviews as "review_summary", the "ASIN" and product "features" based on the content of these reviews. The "features" should be a string describing the features and NOT JSON. Do not include the ASIN in the description field. The reviews for the product are: {reviewText}'
    
    return PROMPT_TEMPLATE

def get_reviews(ASIN: str):
    df = session.table('PRODUCT_REVIEWS')
    #filter on asin
    df= df.filter(df["ASIN"]==ASIN)
    #only use the top 20 reviews
    df = df.limit(20)

    concat_reviews = ""
    #loop through each review and add it to an array and add a cariage return after each one
    reviews = df.toPandas()["REVIEWTEXT"].tolist()
    for review in reviews:
        if review is not None:
            concat_reviews += review + "\n"

    #build prompt
    prompt = build_prompt(ASIN, concat_reviews)
    
    #print (prompt)
    #create a new dataframe using the variables ASIN, reviewText, and prompt
    df = pd.DataFrame()
    df["ASIN"] = [ASIN]
    df["REVIEWTEXT"] = [concat_reviews]
    df["PROMPT"] = [prompt]

    
    return df

def getProducts():
    # Get the distinct list of asins from the reviews table
    df = session.table('PRODUCT_REVIEWS')
    df = df.select(df["ASIN"]).distinct()
    # Limit for testing
    #df = df.limit(maxrecords)

    # Initialize an empty list to store DataFrames
    df_list = []

    # Loop through the asins and get the reviews
    ASINS = df.toPandas()["ASIN"].tolist()
    for ASIN in ASINS:
        print(ASIN)
        # Append each DataFrame to the list
        df_list.append(get_reviews(ASIN))

    # Concatenate all DataFrames in the list
    final_df = pd.concat(df_list, ignore_index=True)

    return final_df

In [None]:
df_s = getProducts()
 
df_s.head()

In [None]:
prompt_list = df_s["PROMPT"].values.tolist()
schema = os.environ["SNOW_SCHEMA"]
db = os.environ["SNOW_DATABASE"]
counter = 0
resp_list = []
for p in prompt_list:
    #print("About to send to LLM:", p)
    resp=[]
    
    print(counter)
    try:
        r = requests.post('http://ollama:11434/api/generate',
                          json={
                              'model': 'mistral',
                              'prompt': p
                             
                          },
                          stream=True)  # timeout in seconds
        r.raise_for_status()
        
        if r.status_code == 200:
            try:
                
                for line in r.iter_lines():
                    body = json.loads(line)
                    response_part = body.get('response', '')
                    # the response streams one token at a time, print that as we receive it
                    
                    resp.append(response_part)
            
                    if 'error' in body:
                        raise Exception(body['error'])
            
                
                resp_dict = json.loads("".join(resp))
 
                resp_list.append(resp_dict)
                print(resp_dict)
                counter +=1
                

            except json.JSONDecodeError as e:
                # Print raw response if JSON parsing fails
                print("JSON parsing error:", e)
                print("Issue with data so just skipping record")
                
            
        else:
            print("Failed to get a successful response, status code:", r.status_code)

    except requests.exceptions.RequestException as e:
        print("An error occurred:", e)
print("COMPLETED")


In [None]:
resp_df = pd.DataFrame(resp_list)
#this will drop out extra columns
resp_df = resp_df[["ASIN","description","features","name","review_summary"]]
#note sometimes we get an object here so we are forcing it to be all strings
resp_df['features'] = resp_df['features'].astype(str)

schema = os.environ["SNOW_SCHEMA"]
db = os.environ["SNOW_DATABASE"]

session.write_pandas(df=resp_df,table_name='PRODUCTS', schema=schema, database = db, overwrite=True, quote_identifiers=False, auto_create_table=True)

print("Data inserted into snowflake!")