#### Creating the project

```bash
sentiment_sifters_project/
│
├── app/
│   ├── __init__.py          # Initialization file for the app module
│   ├── db_connection.py     # Contains database connection logic
│   ├── data_processing.py   # Contains functions for processing data (CSV, etc.)
│   └── utils.py             # Helper functions like cleaning data
│   └── db_setup.py  # This will handle the logic for creating databases and tables (if they don’t exist).
│   ├── ssis_packages/
│   │   └── OLTP_to_OLAP.dtsx
│   └── ssis_execution.py
│
├── data/                    # Store your CSV files
│   ├── processed/           # Folder for processed CSV files
│
├── scripts/                 # Folder for SQL stored procedures and queries
│   ├── stored_procedures.sql # SQL script for creating stored procedures
│
├── main.py                  # Entry point for running the data processing pipeline
│
└── README.md                # Documentation for the project

```

#### Folders setup

In [2]:
# import os

# def create_project_structure(base_dir):
#     # Define the folder structure
#     folders = [
#         'app',
#         'data',
#         'scripts',
#     ]

#     # Define the empty files
#     files = {
#         'app/__init__.py': '',
#         'app/db_connection.py': '',
#         'app/data_processing.py': '',
#         'app/utils.py': '',
#         'app/db_setup.py': '',
#         'main.py': '',

#     }

#     # Create the folders
#     for folder in folders:
#         folder_path = os.path.join(base_dir, folder)
#         os.makedirs(folder_path, exist_ok=True)

#     # Create the files
#     for file_path, content in files.items():
#         full_file_path = os.path.join(base_dir, file_path)
#         with open(full_file_path, 'w') as f:
#             f.write(content)

#     print(f"Project structure created at {base_dir}")

# # Specify the base directory where you want to create the project
# base_dir = r'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project'

# # Run the function to create the project structure
# create_project_structure(base_dir)


#### CSV columns
```bash
['unique_id', 'asin', 'product_name', 'product_type', 'helpful', 'rating', 'title', 'date', 'reviewer', 'reviewer_location', 'review_text', 'country', 'region']

```

#### 1. Database Connection (db_connection.py)
This file will handle connecting to the SQL Server and provide a reusable connection function.

In [1]:
%%file 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_connection.py'
# app/db_connection.py
import pyodbc

def get_oltp_connection():
    """Returns connection to the OLTP (sentiment_sifters) database."""
    connection = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        'SERVER=DESKTOP-4U8DFLK;'
        'DATABASE=sentiment_sifters;'
        'UID=sifter_login;PWD=sifter_login'
    )
    return connection

def get_olap_connection():
    """Returns connection to the OLAP (sentiment_warehouse) database."""
    connection = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        'SERVER=DESKTOP-4U8DFLK;'
        'DATABASE=sentiment_warehouse;'
        'UID=sifter_login;PWD=sifter_login'
    )
    return connection

def get_master_connection():
    """Returns connection to the master database for creating other databases."""
    connection = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        'SERVER=DESKTOP-4U8DFLK;'
        'DATABASE=master;'
        'UID=sifter_login;PWD=sifter_login'
    )
    return connection



Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_connection.py


#### db_setup.py

In [22]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_setup.py'
# app/db_setup.py
from app.db_connection import get_master_connection, get_oltp_connection, get_olap_connection
import logging

def create_database_if_not_exists(database_name):
    """Creates a database if it does not exist in the master DB."""
    connection = get_master_connection()
    cursor = connection.cursor()

    cursor.execute(f"""
        IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'{database_name}')
        BEGIN
            CREATE DATABASE [{database_name}]
        END
    """)
    cursor.commit()
    cursor.close()
    connection.close()

def create_oltp_tables():
    """Creates the tables for the OLTP (sentiment_sifters) database if they don't exist."""
    connection = get_oltp_connection()
    cursor = connection.cursor()

    # Create Product, Reviewer, and Review tables
    cursor.execute("""
        IF OBJECT_ID('Product', 'U') IS NULL
        CREATE TABLE Product (
            product_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,  
            product_name NVARCHAR(255) NOT NULL,  
            product_type NVARCHAR(100)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('Reviewer', 'U') IS NULL
        CREATE TABLE Reviewer (
            reviewer_id INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255) NOT NULL,  
            reviewer_location NVARCHAR(255),  
            country NVARCHAR(255),  
            region NVARCHAR(255)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('Review', 'U') IS NULL
        CREATE TABLE Review (
            review_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL,  
            product_id INT NOT NULL,
            reviewer_id INT NOT NULL,
            helpful_votes INT,
            total_votes INT,
            rating INT CHECK (rating BETWEEN 1 AND 5),
            review_title NVARCHAR(255),  
            review_date DATE,
            review_text NVARCHAR(MAX),
            helpful_ratio AS (CAST(helpful_votes AS DECIMAL(5,2)) / NULLIF(total_votes, 0)) PERSISTED,
            sentiment_label NVARCHAR(50),
            sentiment_summary NVARCHAR(MAX),
            sentiment_score DECIMAL(5,2),
            keyword NVARCHAR(255),
            FOREIGN KEY (product_id) REFERENCES Product(product_id),
            FOREIGN KEY (reviewer_id) REFERENCES Reviewer(reviewer_id)
        )
    """)
    
    cursor.commit()
    cursor.close()
    connection.close()

def create_olap_tables():
    """Creates the tables for the OLAP (sentiment_warehouse) database if they don't exist."""
    connection = get_olap_connection()
    cursor = connection.cursor()

    # Create Dimension tables (Product, Reviewer, Sentiment, Date, Keyword)
    cursor.execute("""
        IF OBJECT_ID('DimProduct', 'U') IS NULL
        CREATE TABLE DimProduct (
            ProductKey INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,
            product_name NVARCHAR(255),
            product_type NVARCHAR(100)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('DimReviewer', 'U') IS NULL
        CREATE TABLE DimReviewer (
            ReviewerKey INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255),
            reviewer_location NVARCHAR(255),
            country NVARCHAR(255),
            region NVARCHAR(255)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('DimSentiment', 'U') IS NULL
        CREATE TABLE DimSentiment (
            SentimentKey INT IDENTITY(1,1) PRIMARY KEY,
            SentimentLabel NVARCHAR(50),
            SentimentSummary NVARCHAR(MAX),
            SentimentScore DECIMAL(5,2)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('DimDate', 'U') IS NULL
        CREATE TABLE DimDate (
            DateKey INT IDENTITY(1,1) PRIMARY KEY,
            ReviewDate DATE NOT NULL,
            Year INT,
            Quarter INT,
            Month INT,
            Day INT,
            Weekday INT,
            IsWeekend BIT
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimKeyword', 'U') IS NULL
        CREATE TABLE DimKeyword (
            KeywordKey INT IDENTITY(1,1) PRIMARY KEY,
            Keyword NVARCHAR(255)
        )
    """)

    # Create FactReview table
    cursor.execute("""
        IF OBJECT_ID('FactReview', 'U') IS NULL
        CREATE TABLE FactReview (
            FactKey INT IDENTITY(1,1) PRIMARY KEY,
            ProductKey INT NOT NULL,
            ReviewerKey INT NOT NULL,
            SentimentKey INT NOT NULL,
            DateKey INT NOT NULL,
            KeywordKey INT,
            HelpfulVotes INT,
            TotalVotes INT,
            HelpfulRatio AS (CAST(HelpfulVotes AS DECIMAL(5,2)) / NULLIF(TotalVotes, 0)) PERSISTED,
            Rating INT CHECK (Rating BETWEEN 1 AND 5),
            ReviewTitle NVARCHAR(255),
            ReviewText NVARCHAR(MAX),
            ReviewDate DATE,
            Year AS YEAR(ReviewDate) PERSISTED,
            Quarter AS (MONTH(ReviewDate) - 1) / 3 + 1 PERSISTED,
            Month AS MONTH(ReviewDate) PERSISTED,
            Day AS DAY(ReviewDate) PERSISTED,
            IsPositiveReview AS (
                CASE WHEN Rating > 3 THEN 1 ELSE 0 END
            ) PERSISTED,
            FOREIGN KEY (ProductKey) REFERENCES DimProduct(ProductKey),
            FOREIGN KEY (ReviewerKey) REFERENCES DimReviewer(ReviewerKey),
            FOREIGN KEY (SentimentKey) REFERENCES DimSentiment(SentimentKey),
            FOREIGN KEY (DateKey) REFERENCES DimDate(DateKey),
            FOREIGN KEY (KeywordKey) REFERENCES DimKeyword(KeywordKey)
        )
    """)

    cursor.commit()
    cursor.close()
    connection.close()


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_setup.py


#### 2. Utility Functions (utils.py)
This will include helper functions such as cleaning reviewer names.

In [4]:
%%file 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\utils.py'
# app/utils.py
def clean_reviewer_name(name):
    return name.strip().replace('"', '').replace("'", "''")  # Remove surrounding quotes and handle SQL single quotes

def generate_sentiment_analysis(review_text):
    """
    This function will perform sentiment analysis and keyword extraction.
    Replace this with actual sentiment analysis code using an ML model.
    """
    # Dummy sentiment analysis results for now
    sentiment_label = "Neutral"  # Could be Positive, Negative, Neutral
    sentiment_summary = "This is a summary of the sentiment."  # Summarize sentiment
    sentiment_score = 0.5  # Sentiment score, e.g., 0.5 for neutral
    keyword = "example_keyword"  # Extract keyword from the text
    return sentiment_label, sentiment_summary, sentiment_score, keyword



Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\utils.py


#### 3. Data Processing Functions (data_processing.py)
Here, we will define the core functions for reading and inserting data.

In [1]:
%%file 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\data_processing.py'
# app/data_processing.py
import os
import csv
from datetime import datetime
from app.db_connection import get_oltp_connection, get_olap_connection
from app.utils import clean_reviewer_name

def process_csv_files(folder_path):
    # Use the OLTP connection since we're dealing with the sentiment_sifters (OLTP) data
    connection = get_oltp_connection()  
    cursor = connection.cursor()

    for filename in os.listdir(folder_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(folder_path, filename)
            process_single_file(file_path, cursor)
    
    cursor.close()
    connection.close()

def process_single_file(file_path, cursor):
    with open(file_path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            reviewer_name = clean_reviewer_name(row['reviewer'])

            # Insert Product
            cursor.execute("EXEC sp_InsertProduct ?, ?, ?", 
                           row['asin'], row['product_name'], row['product_type'])
            product_id = cursor.execute("SELECT product_id FROM Product WHERE asin = ?", row['asin']).fetchone()[0]
            
            # Insert Reviewer
            cursor.execute("EXEC sp_InsertReviewer ?, ?, ?, ?", 
                           reviewer_name, row['reviewer_location'], row['country'], row['region'])
            
            reviewer_result = cursor.execute("SELECT reviewer_id FROM Reviewer WHERE LOWER(reviewer_name) = LOWER(?)", reviewer_name).fetchone()
            
            if not reviewer_result:
                print(f"Reviewer not found for {row['reviewer']}. Skipping this review.")
                continue
            
            reviewer_id = reviewer_result[0]  # Extract reviewer_id

            # Process helpful votes
            helpful_votes, total_votes = process_helpful_votes(row['helpful'])

            # Convert date
            review_date = datetime.strptime(row['date'], "%B %d, %Y").date()

            # Convert rating
            rating = process_rating(row['rating'])

            # Insert Review with empty sentiment and keyword fields for now
            cursor.execute(
                "EXEC sp_InsertReview ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
                row['asin'], product_id, reviewer_id, helpful_votes, total_votes, rating, row['title'],
                review_date, row['review_text'], None, None, None, None  # Empty sentiment and keyword fields
            )
            
            # Commit after each row
            cursor.connection.commit()

def process_helpful_votes(helpful_str):
    helpful_votes, total_votes = 0, 0
    if 'of' in helpful_str:
        helpful_votes = int(helpful_str.split(' of ')[0].strip())
        total_votes = int(helpful_str.split(' of ')[1].strip())
    return helpful_votes, total_votes

def process_rating(rating_str):
    try:
        return int(float(rating_str))  # Convert to float first, then to int
    except ValueError:
        return None  # Handle invalid ratings


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\data_processing.py


#### 4. Main Entry Point (main.py)
This is the file you will run to start the processing.

In [3]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\main.py'
# main.py
import logging
from app.db_setup import create_database_if_not_exists, create_oltp_tables, create_olap_tables
from app.data_processing import process_csv_files
from app.sentiment_processing import process_sentiment_analysis
from app.ssis_execution import run_ssis_package


# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
    folder_path = r'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\data\processed'
    
    logging.info("Starting the ETL pipeline...")

    try:
        # Step 1: Create databases if they don't exist
        # create_database_if_not_exists('sentiment_sifters')
        # create_database_if_not_exists('sentiment_warehouse')
        
        # Step 2: Create tables for OLTP and OLAP
        #create_oltp_tables()
        create_olap_tables()

        # Step 3: Process CSV files (if you have implemented it)
        # process_csv_files(folder_path)
        # logging.info("CSV processing pipeline completed successfully.")
        
        # Step 4: Run sentiment analysis (if you have implemented it)
        # logging.info("Starting sentiment analysis...")
        # process_sentiment_analysis()
        # logging.info("Sentiment analysis completed successfully.")
        #Step5: Run the SSIS packages
        print("Starting SSIS packages...")
        run_ssis_package('OLTP_to_OLAP.dtsx')
        print("SSIS packages completed.")
        
    except Exception as e:
        logging.error(f"Error occurred during processing: {e}")
        raise

if __name__ == "__main__":
    main()


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\main.py


#### app/__init__.py

In [2]:
%%file 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\__init__.py'
# app/__init__.py

Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\__init__.py


#### app/sentiment_processing.py

In [3]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\sentiment_processing.py'
import logging
import mlflow
import mlflow.pyfunc
from transformers import pipeline
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from app.db_connection import get_oltp_connection  # Using OLTP for sentiment analysis
from sklearn.feature_extraction.text import TfidfVectorizer
import string
import re

# Initialize pipelines outside functions for reuse
sentiment_pipeline = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english", revision="714eb0f")

# Simplified keyword extraction, avoiding stopwords and trivial words
def extract_keywords(text):
    stopwords = set(["the", "is", "in", "at", "of", "and", "a", "an", "to", "for", "on", "it", "this", "that"])  # Add more common words
    words = re.findall(r'\b\w+\b', text.lower())  # Extract words
    keywords = [word for word in words if word not in stopwords and len(word) > 2]  # Filter out stopwords and short words
    return ' '.join(keywords[:2])  # Limit to 2 keywords

# Sentiment analysis function
def perform_sentiment_analysis(text):
    max_length = 512
    truncated_text = text[:max_length]
    try:
        result = sentiment_pipeline(truncated_text)[0]
        sentiment_label = result['label']
        sentiment_score = result['score']
        return sentiment_label, sentiment_score
    except Exception as e:
        logging.error(f"Sentiment analysis failed: {e}")
        raise e

# Simplified summary generation using the title
def summarize_text_using_title(title):
    try:
        summary = title[:50]  # Truncate the title to a max length of 50 characters as a simplified summary
        return summary
    except Exception as e:
        logging.error(f"Summary generation failed: {e}")
        raise e

# Function to process a single review
def process_single_review(review):
    review_id, review_text, rating, review_title = review
    combined_text = f"{review_text} {rating} {review_title}"

    try:
        start_time = time.time()  # Start time for each row
        sentiment_label, sentiment_score = perform_sentiment_analysis(combined_text)
        keyword = extract_keywords(review_text)
        summary = summarize_text_using_title(review_title)
        time_taken = time.time() - start_time  # Time taken for the row

        logging.info(f"Row {review_id} processed in {time_taken:.2f} seconds.")
        return (review_id, sentiment_label, sentiment_score, keyword, summary, time_taken)

    except Exception as e:
        logging.error(f"Error during processing for review_id {review_id}: {e}")
        return None

# Process sentiment, keyword extraction, and summarization, and log to MLflow
def process_sentiment_analysis():
    mlflow.start_run(run_name="Optimized Sentiment Analysis with Keywords and Summarization")

    connection = get_oltp_connection()  # Connecting to OLTP for sentiment processing
    cursor = connection.cursor()

    # Log model info
    mlflow.log_param("model_name", "distilbert-base-uncased-finetuned-sst-2-english")
    mlflow.log_param("max_sequence_length", 512)

    # Fetch reviews
    cursor.execute("SELECT review_id, review_text, rating, review_title FROM Review WHERE sentiment_label IS NULL")
    reviews = cursor.fetchall()

    total_reviews = len(reviews)
    mlflow.log_metric("total_reviews_to_process", total_reviews)

    start_time = time.time()

    total_time = 0  # Track total processing time

    # Batch processing size
    BATCH_SIZE = 20  # Tune this value for optimal performance

    # Using ThreadPoolExecutor for concurrent execution
    with ThreadPoolExecutor(max_workers=10) as executor:
        for i in range(0, total_reviews, BATCH_SIZE):
            batch = reviews[i:i + BATCH_SIZE]
            future_to_review = {executor.submit(process_single_review, review): review for review in batch}
            
            for future in as_completed(future_to_review):
                result = future.result()
                if result:
                    review_id, sentiment_label, sentiment_score, keyword, summary, row_time = result
                    total_time += row_time
                    try:
                        # Update the Review table with the results
                        cursor.execute(
                            "UPDATE Review SET sentiment_label = ?, sentiment_score = ?, keyword = ?, sentiment_summary = ? WHERE review_id = ?",
                            (sentiment_label, sentiment_score, keyword, summary, review_id)
                        )
                        connection.commit()

                        # Log to MLflow
                        mlflow.log_metric("processed_review_id", review_id)
                        mlflow.log_metric("sentiment_score", sentiment_score)
                        mlflow.log_param(f"sentiment_label_review_{review_id}", sentiment_label)

                    except Exception as e:
                        logging.error(f"Error updating review_id {review_id}: {e}")

    cursor.close()
    connection.close()

    execution_time = time.time() - start_time
    avg_time_per_row = total_time / total_reviews if total_reviews > 0 else 0
    mlflow.log_metric("execution_time_in_seconds", execution_time)
    mlflow.log_metric("average_time_per_row", avg_time_per_row)
    logging.info(f"Processing completed in {execution_time:.2f} seconds. Average time per row: {avg_time_per_row:.2f} seconds.")

    mlflow.end_run()


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\sentiment_processing.py


#### SSIS_execution.py

In [1]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\ssis_execution.py'
import subprocess
import os

def run_ssis_package(package_name):
    package_path = os.path.join(os.path.dirname(__file__), 'ssis_packages', package_name)
    dtexec_command = f'dtexec /f "{package_path}"'
    
    try:
        result = subprocess.run(dtexec_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print("SSIS package ran successfully.")
        print(result.stdout.decode())
    except subprocess.CalledProcessError as e:
        print(f"Failed to run SSIS package: {package_name}")
        print(e.stderr.decode())

if __name__ == "__main__":
    # Example: Running both SSIS packages
    run_ssis_package('OLTP_to_OLAP_DimensionTables.dtsx')
    run_ssis_package('OLTP_to_OLAP_FactRev.dtsx')


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\ssis_execution.py


#### db_connection_azure.py

In [15]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_connection_azure.py'
# app/db_connection_azure.py
import pyodbc

def get_azure_database_connection():
    """Returns connection to the Azure SQL Database (sifters) containing both OLTP and OLAP tables."""
    connection = pyodbc.connect(
        "Driver={ODBC Driver 17 for SQL Server};"  # Updated driver name
        'Server=tcp:sifters.database.windows.net,1433;'
        'Database=sifters;'  # Main database containing both OLTP and OLAP tables
        'Uid=sifter_login;'  # Azure username
        'Pwd=Sentiment@1990;'  # Replace with the actual password
        'Encrypt=yes;'
        'TrustServerCertificate=no;'
        'Connection Timeout=30;'
    )
    return connection

def get_azure_master_connection():
    """Returns connection to the master database on Azure for administrative tasks."""
    connection = pyodbc.connect(
        "Driver={ODBC Driver 17 for SQL Server};"  # Updated driver name
        'Server=tcp:sifters.database.windows.net,1433;'
        'Database=master;'
        'Uid=sifter_login;'  # Azure username
        'Pwd=Sentiment@1990;'  # Replace with the actual password
        'Encrypt=yes;'
        'TrustServerCertificate=no;'
        'Connection Timeout=30;'
    )
    return connection


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_connection_azure.py


#### db_setup_azure.py

In [16]:
%%writefile 'D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_setup_azure.py'
# app/db_setup_azure.py
from app.db_connection_azure import get_azure_database_connection

def create_oltp_and_olap_tables():
    """Creates the OLTP and OLAP tables in the same Azure database if they don't exist."""
    connection = get_azure_database_connection()
    cursor = connection.cursor()

    # Creating OLTP Tables
    print("Creating OLTP tables...")
    
    cursor.execute("""
        IF OBJECT_ID('Product', 'U') IS NULL
        CREATE TABLE Product (
            product_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,
            product_name NVARCHAR(255) NOT NULL,
            product_type NVARCHAR(100)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('Reviewer', 'U') IS NULL
        CREATE TABLE Reviewer (
            reviewer_id INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255) NOT NULL,
            reviewer_location NVARCHAR(255),
            country NVARCHAR(255),
            region NVARCHAR(255)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('Review', 'U') IS NULL
        CREATE TABLE Review (
            review_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL,
            product_id INT NOT NULL,
            reviewer_id INT NOT NULL,
            helpful_votes INT,
            total_votes INT,
            rating INT CHECK (rating BETWEEN 1 AND 5),
            review_title NVARCHAR(255),
            review_date DATE,
            review_text NVARCHAR(MAX),
            helpful_ratio AS (CAST(helpful_votes AS DECIMAL(5,2)) / NULLIF(total_votes, 0)) PERSISTED,
            sentiment_label NVARCHAR(50),
            sentiment_summary NVARCHAR(MAX),
            sentiment_score DECIMAL(5,2),
            keyword NVARCHAR(255),
            FOREIGN KEY (product_id) REFERENCES Product(product_id),
            FOREIGN KEY (reviewer_id) REFERENCES Reviewer(reviewer_id)
        )
    """)

    # Creating OLAP Tables
    print("Creating OLAP Dimension and Fact tables...")
    
    cursor.execute("""
        IF OBJECT_ID('DimProduct', 'U') IS NULL
        CREATE TABLE DimProduct (
            ProductKey INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,
            product_name NVARCHAR(255),
            product_type NVARCHAR(100)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimReviewer', 'U') IS NULL
        CREATE TABLE DimReviewer (
            ReviewerKey INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255),
            reviewer_location NVARCHAR(255),
            country NVARCHAR(255),
            region NVARCHAR(255)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimSentiment', 'U') IS NULL
        CREATE TABLE DimSentiment (
            SentimentKey INT IDENTITY(1,1) PRIMARY KEY,
            SentimentLabel NVARCHAR(50),
            SentimentSummary NVARCHAR(MAX),
            SentimentScore DECIMAL(5,2)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimDate', 'U') IS NULL
        CREATE TABLE DimDate (
            DateKey INT IDENTITY(1,1) PRIMARY KEY,
            ReviewDate DATE NOT NULL,
            Year INT,
            Quarter INT,
            Month INT,
            Day INT,
            Weekday INT,
            IsWeekend BIT
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimKeyword', 'U') IS NULL
        CREATE TABLE DimKeyword (
            KeywordKey INT IDENTITY(1,1) PRIMARY KEY,
            Keyword NVARCHAR(255)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('FactReview', 'U') IS NULL
        CREATE TABLE FactReview (
            FactKey INT IDENTITY(1,1) PRIMARY KEY,
            ProductKey INT NOT NULL,
            ReviewerKey INT NOT NULL,
            SentimentKey INT NOT NULL,
            DateKey INT NOT NULL,
            KeywordKey INT,
            HelpfulVotes INT,
            TotalVotes INT,
            HelpfulRatio AS (CAST(HelpfulVotes AS DECIMAL(5,2)) / NULLIF(TotalVotes, 0)) PERSISTED,
            Rating INT CHECK (Rating BETWEEN 1 AND 5),
            ReviewTitle NVARCHAR(255),
            ReviewText NVARCHAR(MAX),
            ReviewDate DATE,
            Year AS YEAR(ReviewDate) PERSISTED,
            Quarter AS (MONTH(ReviewDate) - 1) / 3 + 1 PERSISTED,
            Month AS MONTH(ReviewDate) PERSISTED,
            Day AS DAY(ReviewDate) PERSISTED,
            IsPositiveReview AS (
                CASE WHEN Rating > 3 THEN 1 ELSE 0 END
            ) PERSISTED,
            FOREIGN KEY (ProductKey) REFERENCES DimProduct(ProductKey),
            FOREIGN KEY (ReviewerKey) REFERENCES DimReviewer(ReviewerKey),
            FOREIGN KEY (SentimentKey) REFERENCES DimSentiment(SentimentKey),
            FOREIGN KEY (DateKey) REFERENCES DimDate(DateKey),
            FOREIGN KEY (KeywordKey) REFERENCES DimKeyword(KeywordKey)
        )
    """)

    cursor.commit()
    print("OLTP and OLAP tables created successfully.")
    cursor.close()
    connection.close()


if __name__ == "__main__":
    try:
        connection = get_azure_database_connection()
        print("Connection to Azure SQL Database successful.")
        connection.close()
    except pyodbc.InterfaceError as e:
        print(f"Connection failed: {e}")


Overwriting D:\D1\Depi\new_project\Sentiment_Sifters\sentiment_sifters_project\app\db_setup_azure.py


#### Combined CSV

In [17]:
# import pyodbc
# import pandas as pd

# # Database connection function
# def get_oltp_connection():
#     """Connect to the OLTP database."""
#     conn = pyodbc.connect(
#         'DRIVER={ODBC Driver 17 for SQL Server};'
#         'SERVER=DESKTOP-4U8DFLK;'  # Replace with your server name
#         'DATABASE=sentiment_sifters;'  # Replace with your OLTP database name
#         'UID=sifter_login;'  # Replace with your username
#         'PWD=sifter_login;'  # Replace with your password
#     )
#     return conn

# # Retrieve data from each table with custom prefixes
# def fetch_data_with_prefix(connection, table_name, prefix):
#     """Fetches data from the specified table and adds a prefix to columns."""
#     query = f"SELECT * FROM {table_name}"
#     data = pd.read_sql(query, connection)
#     data = data.add_prefix(f"{prefix}_")  # Add prefix to column headers
#     return data

# # Summing numeric columns and combining all data
# def create_combined_csv():
#     """Fetches data from all tables, sums numeric columns, and saves as CSV."""
#     conn = get_oltp_connection()
    
#     # Fetch data from tables with prefixes
#     product_data = fetch_data_with_prefix(conn, 'Product', 'product')
#     reviewer_data = fetch_data_with_prefix(conn, 'Reviewer', 'reviewer')
#     review_data = fetch_data_with_prefix(conn, 'Review', 'review')

#     # Concatenate tables horizontally, keeping original headers with prefixes
#     combined_data = pd.concat([product_data, reviewer_data, review_data], axis=1)

#     # Summing numeric columns
#     numeric_columns = combined_data.select_dtypes(include=['number']).columns
#     total_row = pd.DataFrame(combined_data[numeric_columns].sum()).T
#     total_row.index = ['Total']  # Add a label for the summation row

#     # Append the summation row to the combined data
#     combined_data = pd.concat([combined_data, total_row], ignore_index=False)

#     # Save to CSV
#     combined_data.to_csv('combined_oltp_data.csv', index=False)
#     print("Combined data saved as 'combined_oltp_data.csv'")
    
#     # Close the connection
#     conn.close()

# # Run the function
# create_combined_csv()


#### testing

In [1]:
import pyodbc

def get_azure_database_connection():
    """Returns connection to the Azure SQL Database (sifters) containing both OLTP and OLAP tables."""
    connection = pyodbc.connect(
        "Driver={ODBC Driver 17 for SQL Server};"  # Updated driver name
        "Server=tcp:sifters.database.windows.net,1433;"
        "Database=sifters;"
        "Uid=sifter_login;"
        "Pwd=Sentiment@1990;"
        "Encrypt=yes;"
        "TrustServerCertificate=no;"
        "Connection Timeout=30;"
    )
    return connection


def create_oltp_and_olap_tables():
    """Creates the OLTP and OLAP tables in the same Azure database if they don't exist."""
    connection = get_azure_database_connection()
    cursor = connection.cursor()

    # Creating OLTP Tables
    print("Creating OLTP tables...")
    
    cursor.execute("""
        IF OBJECT_ID('Product', 'U') IS NULL
        CREATE TABLE Product (
            product_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,
            product_name NVARCHAR(255) NOT NULL,
            product_type NVARCHAR(100)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('Reviewer', 'U') IS NULL
        CREATE TABLE Reviewer (
            reviewer_id INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255) NOT NULL,
            reviewer_location NVARCHAR(255),
            country NVARCHAR(255),
            region NVARCHAR(255)
        )
    """)
    
    cursor.execute("""
        IF OBJECT_ID('Review', 'U') IS NULL
        CREATE TABLE Review (
            review_id INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL,
            product_id INT NOT NULL,
            reviewer_id INT NOT NULL,
            helpful_votes INT,
            total_votes INT,
            rating INT CHECK (rating BETWEEN 1 AND 5),
            review_title NVARCHAR(255),
            review_date DATE,
            review_text NVARCHAR(MAX),
            helpful_ratio AS (CAST(helpful_votes AS DECIMAL(5,2)) / NULLIF(total_votes, 0)) PERSISTED,
            sentiment_label NVARCHAR(50),
            sentiment_summary NVARCHAR(MAX),
            sentiment_score DECIMAL(5,2),
            keyword NVARCHAR(255),
            FOREIGN KEY (product_id) REFERENCES Product(product_id),
            FOREIGN KEY (reviewer_id) REFERENCES Reviewer(reviewer_id)
        )
    """)

    # Creating OLAP Tables
    print("Creating OLAP Dimension and Fact tables...")
    
    cursor.execute("""
        IF OBJECT_ID('DimProduct', 'U') IS NULL
        CREATE TABLE DimProduct (
            ProductKey INT IDENTITY(1,1) PRIMARY KEY,
            asin NVARCHAR(20) NOT NULL UNIQUE,
            product_name NVARCHAR(255),
            product_type NVARCHAR(100)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimReviewer', 'U') IS NULL
        CREATE TABLE DimReviewer (
            ReviewerKey INT IDENTITY(1,1) PRIMARY KEY,
            reviewer_name NVARCHAR(255),
            reviewer_location NVARCHAR(255),
            country NVARCHAR(255),
            region NVARCHAR(255)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimSentiment', 'U') IS NULL
        CREATE TABLE DimSentiment (
            SentimentKey INT IDENTITY(1,1) PRIMARY KEY,
            SentimentLabel NVARCHAR(50),
            SentimentSummary NVARCHAR(MAX),
            SentimentScore DECIMAL(5,2)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimDate', 'U') IS NULL
        CREATE TABLE DimDate (
            DateKey INT IDENTITY(1,1) PRIMARY KEY,
            ReviewDate DATE NOT NULL,
            Year INT,
            Quarter INT,
            Month INT,
            Day INT,
            Weekday INT,
            IsWeekend BIT
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('DimKeyword', 'U') IS NULL
        CREATE TABLE DimKeyword (
            KeywordKey INT IDENTITY(1,1) PRIMARY KEY,
            Keyword NVARCHAR(255)
        )
    """)

    cursor.execute("""
        IF OBJECT_ID('FactReview', 'U') IS NULL
        CREATE TABLE FactReview (
            FactKey INT IDENTITY(1,1) PRIMARY KEY,
            ProductKey INT NOT NULL,
            ReviewerKey INT NOT NULL,
            SentimentKey INT NOT NULL,
            DateKey INT NOT NULL,
            KeywordKey INT,
            HelpfulVotes INT,
            TotalVotes INT,
            HelpfulRatio AS (CAST(HelpfulVotes AS DECIMAL(5,2)) / NULLIF(TotalVotes, 0)) PERSISTED,
            Rating INT CHECK (Rating BETWEEN 1 AND 5),
            ReviewTitle NVARCHAR(255),
            ReviewText NVARCHAR(MAX),
            ReviewDate DATE,
            Year AS YEAR(ReviewDate) PERSISTED,
            Quarter AS (MONTH(ReviewDate) - 1) / 3 + 1 PERSISTED,
            Month AS MONTH(ReviewDate) PERSISTED,
            Day AS DAY(ReviewDate) PERSISTED,
            IsPositiveReview AS (
                CASE WHEN Rating > 3 THEN 1 ELSE 0 END
            ) PERSISTED,
            FOREIGN KEY (ProductKey) REFERENCES DimProduct(ProductKey),
            FOREIGN KEY (ReviewerKey) REFERENCES DimReviewer(ReviewerKey),
            FOREIGN KEY (SentimentKey) REFERENCES DimSentiment(SentimentKey),
            FOREIGN KEY (DateKey) REFERENCES DimDate(DateKey),
            FOREIGN KEY (KeywordKey) REFERENCES DimKeyword(KeywordKey)
        )
    """)

    cursor.commit()
    print("OLTP and OLAP tables created successfully.")
    cursor.close()
    connection.close()


if __name__ == "__main__":
    create_oltp_and_olap_tables()



Creating OLTP tables...
Creating OLAP Dimension and Fact tables...
OLTP and OLAP tables created successfully.


In [14]:
import pyodbc
print(pyodbc.drivers())


['SQL Server', 'ODBC Driver 17 for SQL Server', 'Microsoft Access Driver (*.mdb, *.accdb)', 'Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)', 'Microsoft Access Text Driver (*.txt, *.csv)', 'Microsoft Access dBASE Driver (*.dbf, *.ndx, *.mdx)']


#### Post Process

In [1]:
import pyodbc

# Establishing the database connection
conn = pyodbc.connect(
    'Driver={ODBC Driver 17 for SQL Server};'
    'Server=DESKTOP-4U8DFLK;'
    'Database=sentiment_warehouse;'
    'Uid=sifter_login;'
    'Pwd=YourPasswordHere;'
    'Trusted_Connection=yes;'
)
cursor = conn.cursor()

# Define tables and their primary key columns for null ID removal
tables = {
    "DimProduct": "ProductKey",
    "DimReviewer": "ReviewerKey",
    "DimSentiment": "SentimentKey",
    "DimDate": "DateKey",
    "DimKeyword": "KeywordKey",
    "FactReview": ["ProductKey", "ReviewerKey", "SentimentKey", "DateKey", "KeywordKey"]  # Multiple keys in FactReview
}

# Iterate over each table and remove rows with null ID columns
for table, key_column in tables.items():
    if isinstance(key_column, list):
        # FactReview case with multiple keys
        condition = " OR ".join([f"{col} IS NULL" for col in key_column])
    else:
        # Single key case for dimensions
        condition = f"{key_column} IS NULL"
    
    delete_query = f"DELETE FROM {table} WHERE {condition}"
    
    try:
        cursor.execute(delete_query)
        conn.commit()
        print(f"Removed rows with null IDs from {table}.")
    except pyodbc.Error as e:
        print(f"Error processing table {table}: {e}")

# Close the connection
cursor.close()
conn.close()


Removed rows with null IDs from DimProduct.
Removed rows with null IDs from DimReviewer.
Removed rows with null IDs from DimSentiment.
Removed rows with null IDs from DimDate.
Removed rows with null IDs from DimKeyword.
Removed rows with null IDs from FactReview.


In [2]:

import pyodbc
import pandas as pd

# Establishing the database connection
conn = pyodbc.connect(
    'Driver={ODBC Driver 17 for SQL Server};'
    'Server=DESKTOP-4U8DFLK;'
    'Database=sentiment_warehouse;'
    'Uid=sifter_login;'
    'Pwd=YourPasswordHere;'
    'Trusted_Connection=yes;'
)
cursor = conn.cursor()

# Define tables and their primary key columns
tables = {
    "DimProduct": "ProductKey",
    "DimReviewer": "ReviewerKey",
    "DimSentiment": "SentimentKey",
    "DimDate": "DateKey",
    "DimKeyword": "KeywordKey",
    "FactReview": ["ProductKey", "ReviewerKey", "SentimentKey", "DateKey", "KeywordKey"]  # Multiple keys in FactReview
}

# Results dictionary to store issues found
results = {
    "Table": [],
    "Issue": [],
    "Affected Rows": []
}

# Check each table for null or duplicate IDs
for table, key_column in tables.items():
    # Check for null IDs
    if isinstance(key_column, list):
        # FactReview with multiple keys
        null_check = " OR ".join([f"{col} IS NULL" for col in key_column])
        duplicate_check = ", ".join(key_column)
    else:
        # Single key for dimensions
        null_check = f"{key_column} IS NULL"
        duplicate_check = key_column
    
    # Check for null IDs
    null_query = f"SELECT COUNT(*) FROM {table} WHERE {null_check}"
    cursor.execute(null_query)
    null_count = cursor.fetchone()[0]
    if null_count > 0:
        results["Table"].append(table)
        results["Issue"].append("Null IDs")
        results["Affected Rows"].append(null_count)

    # Check for duplicate IDs
    duplicate_query = f"""
    SELECT COUNT(*) FROM (
        SELECT {duplicate_check}, COUNT(*) as cnt FROM {table}
        GROUP BY {duplicate_check}
        HAVING COUNT(*) > 1
    ) as duplicates
    """
    cursor.execute(duplicate_query)
    duplicate_count = cursor.fetchone()[0]
    if duplicate_count > 0:
        results["Table"].append(table)
        results["Issue"].append("Duplicate IDs")
        results["Affected Rows"].append(duplicate_count)

# Close the connection
cursor.close()
conn.close()

# Display results
results_df = pd.DataFrame(results)
if results_df.empty:
    print("No null or duplicate IDs found.")
else:
    print("Issues found:")
    print(results_df)


Issues found:
        Table          Issue  Affected Rows
0  FactReview  Duplicate IDs           5459


In [3]:
import pyodbc

# Establishing the database connection
conn = pyodbc.connect(
    'Driver={ODBC Driver 17 for SQL Server};'
    'Server=DESKTOP-4U8DFLK;'
    'Database=sentiment_warehouse;'
    'Uid=sifter_login;'
    'Pwd=YourPasswordHere;'
    'Trusted_Connection=yes;'
)
cursor = conn.cursor()

# Define tables and their primary key columns
tables = {
    "DimProduct": "ProductKey",
    "DimReviewer": "ReviewerKey",
    "DimSentiment": "SentimentKey",
    "DimDate": "DateKey",
    "DimKeyword": "KeywordKey",
    "FactReview": ["ProductKey", "ReviewerKey", "SentimentKey", "DateKey", "KeywordKey"]  # Composite key for FactReview
}

# Removing duplicates
for table, key_column in tables.items():
    if isinstance(key_column, list):
        # Composite key case for FactReview
        key_columns_str = ", ".join(key_column)
        row_identifier = " + '-' + ".join([f"CAST({col} AS NVARCHAR)" for col in key_column])
    else:
        # Single key case for dimension tables
        key_columns_str = key_column
        row_identifier = f"CAST({key_column} AS NVARCHAR)"
    
    delete_query = f"""
    WITH DuplicateCTE AS (
        SELECT *, ROW_NUMBER() OVER(PARTITION BY {key_columns_str} ORDER BY {key_columns_str}) AS RowNum
        FROM {table}
    )
    DELETE FROM {table}
    WHERE {key_columns_str} IN (
        SELECT {key_columns_str}
        FROM DuplicateCTE
        WHERE RowNum > 1
    )
    """
    
    # Executing the delete query
    try:
        cursor.execute(delete_query)
        conn.commit()
        print(f"Duplicates removed from {table} (if any).")
    except Exception as e:
        print(f"An error occurred while processing {table}: {e}")

# Close the connection
cursor.close()
conn.close()
print("Duplicate removal completed.")


Duplicates removed from DimProduct (if any).
Duplicates removed from DimReviewer (if any).
Duplicates removed from DimSentiment (if any).
Duplicates removed from DimDate (if any).
Duplicates removed from DimKeyword (if any).
An error occurred while processing FactReview: ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]An expression of non-boolean type specified in a context where a condition is expected, near ','. (4145) (SQLExecDirectW)")
Duplicate removal completed.


In [10]:
import pyodbc

# Initialize variables for connection and cursor
connection = None
cursor = None

try:
    # Establish the database connection
    connection = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-4U8DFLK;DATABASE=sentiment_warehouse;UID=sifter_login;PWD=sifter_login'
    )
    cursor = connection.cursor()
    
    # Execute a sample query on FactReview
    query = "SELECT * FROM FactReview"  # Adjust this query as needed
    cursor.execute(query)
    results = cursor.fetchall()
    print("Query executed successfully.")
    
    # Process results if needed
    for row in results:
        print(row)  # Replace this with your actual data processing logic

    # Commit transaction if there were any updates
    connection.commit()

except pyodbc.ProgrammingError as e:
    print(f"Programming error occurred: {e}")

except pyodbc.Error as e:
    print(f"An error occurred with the database operation: {e}")

finally:
    # Close cursor and connection if they were successfully created
    if cursor is not None:
        cursor.close()
    if connection is not None:
        connection.close()
    print("Database connection closed.")


Query executed successfully.
(1, 1, 1, 1, 1, 1, None, None, None, 1, None, None, datetime.date(2006, 3, 10), 2006, 1, 3, 10, 0)
(2, 1, 2, 1, 2, 2, None, None, None, 1, None, None, datetime.date(2006, 1, 13), 2006, 1, 1, 13, 0)
(3, 2, 3, 3, 3, 3, None, None, None, 2, None, None, datetime.date(2006, 11, 3), 2006, 4, 11, 3, 0)
(4, 2, 4, 1, 4, 4, None, None, None, 1, None, None, datetime.date(2006, 7, 30), 2006, 3, 7, 30, 0)
(5, 3, 5, 1, 5, 5, None, None, None, 1, None, None, datetime.date(2005, 4, 20), 2005, 2, 4, 20, 0)
(6, 4, 6, 1, 6, 6, None, None, None, 1, None, None, datetime.date(2006, 2, 25), 2006, 1, 2, 25, 0)
(7, 5, 7, 1, 7, 7, None, None, None, 2, None, None, datetime.date(2006, 8, 7), 2006, 3, 8, 7, 0)
(8, 6, 8, 1, 8, 8, None, None, None, 1, None, None, datetime.date(2006, 3, 3), 2006, 1, 3, 3, 0)
(9, 7, 9, 1, 9, 9, None, None, None, 1, None, None, datetime.date(2005, 4, 25), 2005, 2, 4, 25, 0)
(10, 8, 10, 1, 10, 10, None, None, None, 2, None, None, datetime.date(2006, 8, 26), 