In [2]:
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import pandas as pd
import os

connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

container_name = "team5container"
blob_path = "Silver/Product Reviews/reviews.parquet"

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_path)

stream = BytesIO()
blob_client.download_blob().readinto(stream)
stream.seek(0)

df = pd.read_parquet(stream)
df.head()

In [3]:
len(df)

In [4]:
!pip install transformers==4.30.2 torch==1.13.1 --quiet

In [5]:
from transformers import pipeline
# Load Hugging Face sentiment analysis pipeline
sentiment_pipeline = pipeline("sentiment-analysis")


In [9]:
def classify_batch_with_rating(text_list, rating_list):
    results = sentiment_pipeline(text_list, truncation=True)
    output = []
    for res, rating in zip(results, rating_list):
        try:
            if int(rating) == 3:
                output.append("neutral")  # Neutral from rating
            elif res['score'] < 0.65:
                output.append("neutral")  # Neutral if model is uncertain
            else:
                output.append("positive" if res['label'] == 'POSITIVE' else "negative")
        except:
            output.append("neutral")
    return output

# Run in batches
batch_size = 200
sentiments = []

for i in range(0, len(df), batch_size):
    batch = df.iloc[i:i+batch_size]
    truncated_reviews = batch['review_text'].apply(lambda x: " ".join(str(x).split()[:300])).tolist()
    ratings = batch['rating'].tolist()
    sentiments.extend(classify_batch_with_rating(truncated_reviews, ratings))

df['sentiment'] = sentiments


In [10]:
df['sentiment'].value_counts()


In [11]:
df.head()

In [12]:
import pyarrow as pa
import pyarrow.parquet as pq

#Convert DataFrame to Parquet
table = pa.Table.from_pandas(df)
buffer = BytesIO()
pq.write_table(table, buffer)
buffer.seek(0)

#Define path in your Gold layer
filename = f"Gold/Product Reviews/reviews_with_sentiment.parquet"

# 3. Initialize blob client
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container="team5container", blob=filename)

# 4. Upload Parquet file
blob_client.upload_blob(buffer, overwrite=True)
print(f"File uploaded successfully to: {filename}")
