In [None]:
pip install keybert sentence-transformers torch tqdm nltk psycopg2-binary sqlalchemy pandas

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import torch
import psycopg2
from sqlalchemy import create_engine, text
import json

# Database connection
DATABASE_URL = "postgresql://neondb_owner:npg_ExFXHY8yiNT0@ep-lingering-term-ab7pbfql-pooler.eu-west-2.aws.neon.tech/neondb?sslmode=require"

# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Check GPU availability
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    device = 'cuda'
else:
    print("Using CPU")
    device = 'cpu'

# Fetch data from PostgreSQL database
print("Fetching data from silver.silver_translated...")
try:
    df = pd.read_sql_query("SELECT * FROM silver.silver_translated", engine)
    print(f"Dataset loaded from database: {df.shape}")
    print("Database connection successful!")
except Exception as e:
    print(f"Database connection failed: {e}")
    

In [None]:
# Display basic info
print("Dataset loaded successfully")
print(f"Columns: {list(df.columns)}")
print(f"Processing device: {device}")
print(f"Total reviews from database: {len(df)}")

In [None]:
# Quick data preview
print("Sample data from silver.silver_translated:")
df.head()

In [None]:
# Check for missing values in review columns
missing_pos = df['Positive Review Translated'].isna().sum()
missing_neg = df['Negative Review Translated'].isna().sum()
print(f"Missing positive reviews: {missing_pos}")
print(f"Missing negative reviews: {missing_neg}")
print(f"Data quality check completed")

In [None]:
# Data preprocessing - fill NaN values once
df['Positive Review Translated'] = df['Positive Review Translated'].fillna('')
df['Negative Review Translated'] = df['Negative Review Translated'].fillna('')
print("Data preprocessing completed")

In [None]:
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer
import time

# Load models once - optimized for GPU
print("Loading models...")
start_time = time.time()
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)  # Specify device
kw_model = KeyBERT(model=embedding_model)
print(f"Models loaded in {time.time() - start_time:.2f} seconds")

# GPU-optimized keyword extraction function
def extract_semantic_phrases_batch(texts, batch_size=100):  # Increased batch size for GPU
    """Extract keywords in batches optimized for GPU"""
    all_results = []
    
    # Filter out empty texts upfront
    valid_indices = []
    valid_texts = []
    for i, text in enumerate(texts):
        if isinstance(text, str) and len(text.strip()) > 10:
            valid_indices.append(i)
            valid_texts.append(text)
    
    print(f"Processing {len(valid_texts)} valid texts out of {len(texts)} total")
    
    # Process valid texts in batches
    valid_results = []
    for i in tqdm(range(0, len(valid_texts), batch_size), desc="Processing batches"):
        batch = valid_texts[i:i+batch_size]
        batch_results = []
        
        for text in batch:
            try:
                keywords = kw_model.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 2),
                    stop_words='english',
                    use_maxsum=False,
                    nr_candidates=10,
                    top_n=3
                )
                batch_results.append([kw[0] for kw in keywords])
            except Exception as e:
                batch_results.append([])
        
        valid_results.extend(batch_results)
    
    # Map results back to original indices
    result_map = dict(zip(valid_indices, valid_results))
    all_results = [result_map.get(i, []) for i in range(len(texts))]
    
    return all_results

# Process positive reviews
print("Processing positive reviews...")
start_time = time.time()
positive_texts = df['Positive Review Translated'].tolist()
df['Semantic Phrases Pos'] = extract_semantic_phrases_batch(positive_texts)
pos_time = time.time() - start_time
print(f"Positive reviews processed in {pos_time:.2f} seconds")

# Process negative reviews
print("Processing negative reviews...")
start_time = time.time()
negative_texts = df['Negative Review Translated'].tolist()
df['Semantic Phrases Neg'] = extract_semantic_phrases_batch(negative_texts)
neg_time = time.time() - start_time
print(f"Negative reviews processed in {neg_time:.2f} seconds")

# Summary statistics
missing_pos = sum(1 for x in df['Semantic Phrases Pos'] if len(x) == 0)
missing_neg = sum(1 for x in df['Semantic Phrases Neg'] if len(x) == 0)
print(f"Empty semantic phrases in positive reviews: {missing_pos}")
print(f"Empty semantic phrases in negative reviews: {missing_neg}")
print(f"Total processing time: {pos_time + neg_time:.2f} seconds")

# GPU memory cleanup
if device == 'cuda':
    torch.cuda.empty_cache()
    print("GPU cache cleared")

In [None]:
# Verify semantic extraction results
print(f"Dataset shape after semantic extraction: {df.shape}")
print(f"Semantic phrase columns: {[col for col in df.columns if 'Semantic Phrases' in col]}")

In [None]:
# Sample semantic phrases preview
sample_results = df[['Positive Review Translated', 'Semantic Phrases Pos', 
                    'Negative Review Translated', 'Semantic Phrases Neg']].head(10)
for idx, row in sample_results.iterrows():
    if len(row['Semantic Phrases Pos']) > 0:
        print(f"Row {idx}:")
        print(f"Positive phrases: {row['Semantic Phrases Pos']}")
        if len(row['Semantic Phrases Neg']) > 0:
            print(f"Negative phrases: {row['Semantic Phrases Neg']}")
        print("-" * 50)

In [None]:
import re
import nltk
from collections import Counter

# Download required NLTK data
try:
    nltk.download('stopwords', quiet=True)
    nltk.download('names', quiet=True)
    from nltk.corpus import stopwords, names
    stop_words = set(stopwords.words('english'))
    name_list = set(names.words())
except:
    print("NLTK data not available, using basic filtering")
    stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
    name_list = set()

# Additional hotel-specific stop words to filter
hotel_stopwords = {
    'hotel', 'room', 'stay', 'night', 'day', 'time', 'place', 'location', 'area', 'staff', 'service', 
    'people', 'guest', 'customer', 'visitor', 'person', 'man', 'woman', 'guy', 'lady', 'someone', 
    'anyone', 'everyone', 'everything', 'something', 'anything', 'nothing'
}

# Comprehensive tourism services and interests vocabulary
tourism_vocabulary = {
    # Accommodation & Room Features
    'accommodation', 'suite', 'apartment', 'villa', 'cottage', 'cabin', 'chalet', 'hostel', 'guesthouse', 'resort',
    'bedroom', 'bathroom', 'kitchen', 'balcony', 'terrace', 'patio', 'garden', 'pool', 'jacuzzi', 'spa',
    'wifi', 'internet', 'television', 'minibar', 'refrigerator', 'airconditioning', 'heating', 'fireplace',
    'bed', 'pillow', 'mattress', 'linen', 'towel', 'amenities', 'toiletries', 'hairdryer', 'safe', 'wardrobe',
    
    # Hotel Services & Facilities
    'reception', 'concierge', 'housekeeping', 'maintenance', 'security', 'valet', 'bellhop', 'porter',
    'checkin', 'checkout', 'reservation', 'booking', 'availability', 'upgrade', 'complimentary', 'inclusive',
    'restaurant', 'dining', 'breakfast', 'lunch', 'dinner', 'buffet', 'menu', 'cuisine', 'bar', 'lounge',
    'gym', 'fitness', 'sauna', 'massage', 'wellness', 'relaxation', 'treatment', 'therapy',
    'conference', 'meeting', 'business', 'events', 'wedding', 'banquet', 'catering',
    'parking', 'garage', 'transportation', 'shuttle', 'airport', 'transfer', 'taxi', 'rental',
    
    # Tourism Activities & Attractions
    'sightseeing', 'tour', 'excursion', 'adventure', 'exploration', 'hiking', 'walking', 'cycling', 'biking',
    'museum', 'gallery', 'exhibition', 'theater', 'cinema', 'entertainment', 'nightlife', 'shopping',
    'beach', 'ocean', 'sea', 'lake', 'river', 'mountain', 'forest', 'park', 'nature', 'landscape',
    'historic', 'cultural', 'heritage', 'architecture', 'monument', 'castle', 'church', 'temple',
    'festival', 'event', 'celebration', 'carnival', 'market', 'fair', 'concert', 'performance',
    
    # Experience & Service Quality
    'comfortable', 'luxury', 'elegant', 'modern', 'traditional', 'authentic', 'unique', 'spectacular',
    'clean', 'spacious', 'cozy', 'quiet', 'peaceful', 'relaxing', 'convenient', 'accessible',
    'friendly', 'helpful', 'professional', 'courteous', 'attentive', 'responsive', 'efficient',
    'delicious', 'tasty', 'fresh', 'quality', 'variety', 'selection', 'choice', 'option',
    'expensive', 'affordable', 'reasonable', 'value', 'price', 'cost', 'budget', 'cheap',
    'recommend', 'satisfaction', 'experience', 'memorable', 'enjoyable', 'pleasant', 'disappointing',
    
    # Location & Geography
    'downtown', 'center', 'district', 'neighborhood', 'vicinity', 'nearby', 'walking', 'distance',
    'view', 'scenery', 'panoramic', 'overlooking', 'facing', 'waterfront', 'beachfront', 'hillside',
    'accessibility', 'transportation', 'connection', 'proximity', 'convenience', 'central',
    
    # Negative Aspects (Important for feedback)
    'problem', 'issue', 'complaint', 'dissatisfied', 'unhappy', 'poor', 'worst', 'terrible',
    'dirty', 'noisy', 'crowded', 'outdated', 'broken', 'damaged', 'faulty',
    'rude', 'unfriendly', 'unprofessional', 'slow', 'delayed', 'cancelled', 'overbooked',
    'overpriced', 'overrated', 'disappointing', 'unacceptable', 'uncomfortable'
}

def extract_and_clean_tokens(phrase_lists):
    """Extract individual tokens from phrase lists and remove redundant/non-meaningful ones"""
    all_tokens = []
    
    for phrase_list in phrase_lists:
        if isinstance(phrase_list, list):
            for phrase in phrase_list:
                if isinstance(phrase, str):
                    # Split phrase into individual words
                    tokens = re.findall(r'\b[a-z]+\b', phrase.lower())
                    all_tokens.extend(tokens)
    
    # Remove duplicates and filter tokens
    unique_tokens = set(all_tokens)
    
    # Filter out unwanted tokens
    filtered_tokens = []
    for token in unique_tokens:
        if (len(token) >= 3 and
            token not in stop_words and
            token not in hotel_stopwords and
            token.lower() not in name_list and
            not token.isdigit() and
            token.isalpha()):
            filtered_tokens.append(token)
    
    return sorted(filtered_tokens)

# Extract and clean tokens
print("Extracting and cleaning tokens...")
positive_tokens = extract_and_clean_tokens(df['Semantic Phrases Pos'])
negative_tokens = extract_and_clean_tokens(df['Semantic Phrases Neg'])
print(f"Positive tokens extracted: {len(positive_tokens)}")
print(f"Negative tokens extracted: {len(negative_tokens)}")

In [None]:
# Filter tokens for tourism relevance only
def filter_tourism_tokens(token_list, tourism_vocab):
    """Filter tokens to only include tourism-related terms"""
    tourism_tokens = []
    for token in token_list:
        if token.lower() in tourism_vocab:
            tourism_tokens.append(token)
    return sorted(tourism_tokens)

def categorize_tourism_tokens(tokens):
    """Categorize tokens by tourism service types"""
    categories = {
        'accommodation': [],
        'services': [],
        'activities': [],
        'dining': [],
        'quality': [],
        'location': [],
        'issues': []
    }
    
    # Define category keywords
    accommodation_terms = {'accommodation', 'suite', 'apartment', 'villa', 'bedroom', 'bathroom', 'kitchen', 'balcony', 'pool', 'spa', 'bed', 'amenities'}
    service_terms = {'reception', 'concierge', 'housekeeping', 'checkin', 'checkout', 'reservation', 'parking', 'transportation', 'shuttle'}
    activity_terms = {'sightseeing', 'tour', 'excursion', 'museum', 'beach', 'hiking', 'shopping', 'entertainment', 'nightlife'}
    dining_terms = {'restaurant', 'dining', 'breakfast', 'lunch', 'dinner', 'buffet', 'cuisine', 'bar', 'cafe', 'wine', 'delicious'}
    quality_terms = {'comfortable', 'luxury', 'clean', 'spacious', 'friendly', 'professional', 'quality', 'recommend', 'satisfaction'}
    location_terms = {'downtown', 'center', 'nearby', 'view', 'scenery', 'accessibility', 'convenient', 'central'}
    issue_terms = {'problem', 'complaint', 'dirty', 'noisy', 'broken', 'rude', 'slow', 'overpriced', 'disappointing'}
    
    for token in tokens:
        token_lower = token.lower()
        if token_lower in accommodation_terms:
            categories['accommodation'].append(token)
        elif token_lower in service_terms:
            categories['services'].append(token)
        elif token_lower in activity_terms:
            categories['activities'].append(token)
        elif token_lower in dining_terms:
            categories['dining'].append(token)
        elif token_lower in quality_terms:
            categories['quality'].append(token)
        elif token_lower in location_terms:
            categories['location'].append(token)
        elif token_lower in issue_terms:
            categories['issues'].append(token)
    
    return categories

# Filter for tourism-relevant tokens only
print("Filtering tokens for tourism relevance...")
tourism_positive_tokens = filter_tourism_tokens(positive_tokens, tourism_vocabulary)
tourism_negative_tokens = filter_tourism_tokens(negative_tokens, tourism_vocabulary)

print(f"Tourism-relevant positive tokens: {len(tourism_positive_tokens)} out of {len(positive_tokens)}")
print(f"Tourism-relevant negative tokens: {len(tourism_negative_tokens)} out of {len(negative_tokens)}")

# Categorize filtered tokens
pos_categories = categorize_tourism_tokens(tourism_positive_tokens)
neg_categories = categorize_tourism_tokens(tourism_negative_tokens)

print("\n=== POSITIVE TOKEN CATEGORIES ===")
for category, tokens in pos_categories.items():
    if tokens:
        print(f"{category.upper()}: {len(tokens)} tokens - {tokens[:10]}")

print("\n=== NEGATIVE TOKEN CATEGORIES ===")
for category, tokens in neg_categories.items():
    if tokens:
        print(f"{category.upper()}: {len(tokens)} tokens - {tokens[:10]}")

In [None]:
# Apply tourism filtering to dataframe rows
def extract_tourism_tokens_per_row(phrase_list, valid_tourism_tokens):
    """Extract only tourism-relevant tokens for each row"""
    if not isinstance(phrase_list, list):
        return []
    
    row_tokens = set()
    for phrase in phrase_list:
        if isinstance(phrase, str):
            tokens = re.findall(r'\b[a-z]+\b', phrase.lower())
            for token in tokens:
                if token in [t.lower() for t in valid_tourism_tokens]:
                    row_tokens.add(token)
    
    return sorted(list(row_tokens))

# Create final tourism-filtered token columns
print("Creating final tourism-filtered token columns...")
df['Tourism_Tokens_Pos'] = df['Semantic Phrases Pos'].apply(
    lambda x: extract_tourism_tokens_per_row(x, tourism_positive_tokens)
)
df['Tourism_Tokens_Neg'] = df['Semantic Phrases Neg'].apply(
    lambda x: extract_tourism_tokens_per_row(x, tourism_negative_tokens)
)

# Final statistics
tourism_pos_count = sum(len(tokens) for tokens in df['Tourism_Tokens_Pos'])
tourism_neg_count = sum(len(tokens) for tokens in df['Tourism_Tokens_Neg'])

print(f"\n=== FINAL TOURISM TOKEN RESULTS ===")
print(f"Tourism-filtered positive tokens: {tourism_pos_count}")
print(f"Tourism-filtered negative tokens: {tourism_neg_count}")
print(f"Average positive tourism tokens per review: {tourism_pos_count / len(df):.2f}")
print(f"Average negative tourism tokens per review: {tourism_neg_count / len(df):.2f}")

In [None]:
# Filter out reviews with empty tourism token lists
print("=== FILTERING EMPTY TOURISM TOKEN REVIEWS ===")

# Check current dataset statistics
print(f"Dataset before filtering: {len(df)} reviews")

# Count reviews with empty tourism tokens
empty_pos_tokens = (df['Tourism_Tokens_Pos'].apply(len) == 0).sum()
empty_neg_tokens = (df['Tourism_Tokens_Neg'].apply(len) == 0).sum()
both_empty = ((df['Tourism_Tokens_Pos'].apply(len) == 0) & 
              (df['Tourism_Tokens_Neg'].apply(len) == 0)).sum()

print(f"Reviews with empty positive tourism tokens: {empty_pos_tokens}")
print(f"Reviews with empty negative tourism tokens: {empty_neg_tokens}")
print(f"Reviews with both empty tourism token lists: {both_empty}")

# Filter out reviews where both positive and negative tourism tokens are empty
df_filtered = df[~((df['Tourism_Tokens_Pos'].apply(len) == 0) & 
                   (df['Tourism_Tokens_Neg'].apply(len) == 0))].copy()

print(f"\nDataset after filtering: {len(df_filtered)} reviews")
print(f"Removed {len(df) - len(df_filtered)} reviews with no tourism tokens")
print(f"Retention rate: {(len(df_filtered) / len(df)) * 100:.1f}%")

# Update statistics for filtered dataset
filtered_pos_count = sum(len(tokens) for tokens in df_filtered['Tourism_Tokens_Pos'])
filtered_neg_count = sum(len(tokens) for tokens in df_filtered['Tourism_Tokens_Neg'])

print(f"\n=== FILTERED DATASET STATISTICS ===")
print(f"Total positive tourism tokens: {filtered_pos_count}")
print(f"Total negative tourism tokens: {filtered_neg_count}")
print(f"Average positive tokens per review: {filtered_pos_count / len(df_filtered):.2f}")
print(f"Average negative tokens per review: {filtered_neg_count / len(df_filtered):.2f}")

# Show distribution of token counts
pos_token_counts = df_filtered['Tourism_Tokens_Pos'].apply(len)
neg_token_counts = df_filtered['Tourism_Tokens_Neg'].apply(len)

print(f"\nPositive token distribution:")
print(f"  Min: {pos_token_counts.min()}, Max: {pos_token_counts.max()}")
print(f"  Mean: {pos_token_counts.mean():.2f}, Median: {pos_token_counts.median():.1f}")

print(f"Negative token distribution:")
print(f"  Min: {neg_token_counts.min()}, Max: {neg_token_counts.max()}")
print(f"  Mean: {neg_token_counts.mean():.2f}, Median: {neg_token_counts.median():.1f}")

# Update df to the filtered version
df = df_filtered
print(f"\nDataset updated to filtered version with {len(df)} informative reviews")

In [None]:
# Analyze sentiment-specific patterns
if 'sentiment classification' in df.columns:
    negative_sentiment = df[df['sentiment classification'] == -1]['Tourism_Tokens_Neg']
    non_empty_neg = [tokens for tokens in negative_sentiment if len(tokens) > 0]
    print(f"Negative sentiment reviews with tourism tokens: {len(non_empty_neg)}")
    if len(non_empty_neg) > 0:
        print("Sample negative tourism tokens:", non_empty_neg[:5])

In [None]:
# Display sample tourism-filtered results
print("=== SAMPLE TOURISM-FILTERED RESULTS ===")
sample_rows = df[['Tourism_Tokens_Pos', 'Tourism_Tokens_Neg']].head(15)
for idx, row in sample_rows.iterrows():
    if len(row['Tourism_Tokens_Pos']) > 0 or len(row['Tourism_Tokens_Neg']) > 0:
        print(f"Row {idx}:")
        if len(row['Tourism_Tokens_Pos']) > 0:
            print(f"  Positive Tourism Tokens: {row['Tourism_Tokens_Pos']}")
        if len(row['Tourism_Tokens_Neg']) > 0:
            print(f"  Negative Tourism Tokens: {row['Tourism_Tokens_Neg']}")
        print()

In [None]:
# Prepare data for database ingestion with target schema
print("=== PREPARING DATA FOR TARGET SCHEMA ===")

# Create final dataframe with only required columns and proper naming
final_df = pd.DataFrame()

# Map source columns to target schema
final_df['city'] = df['City']
final_df['hotel_name'] = df['Hotel Name']
final_df['reviewer_name'] = df['Reviewer Name']
final_df['reviewer_nationality'] = df['Reviewer Nationality']
final_df['duration'] = df['Duration']
final_df['check_in_date'] = df['Check-in Date']
final_df['review_date'] = df['Review Date']
final_df['travel_type'] = df['Travel Type']
final_df['room_type'] = df['Room Type']
final_df['positive_review'] = df['Positive Review Translated']
final_df['negative_review'] = df['Negative Review Translated']
final_df['sentiment_classification'] = df['sentiment classification']

# Convert tourism tokens to PostgreSQL TEXT[] format
def convert_to_postgres_array(token_list):
    """Convert Python list to PostgreSQL TEXT[] format"""
    if not isinstance(token_list, list) or len(token_list) == 0:
        return None
    # Escape any quotes in the tokens and format as PostgreSQL array
    escaped_tokens = []
    for token in token_list:
        escaped_token = token.replace('"', '""')
        escaped_tokens.append(f'"{escaped_token}"')
    return '{' + ','.join(escaped_tokens) + '}'

final_df['positive_tokens'] = df['Tourism_Tokens_Pos'].apply(convert_to_postgres_array)
final_df['negative_tokens'] = df['Tourism_Tokens_Neg'].apply(convert_to_postgres_array)

print(f"Final dataset prepared for target schema: {final_df.shape}")
print(f"Target columns: {list(final_df.columns)}")

# Verify data types and content
print(f"\nData type verification:")
for col in final_df.columns:
    print(f"  {col}: {final_df[col].dtype}")

print(f"\nSample token arrays:")
non_null_pos = final_df['positive_tokens'].dropna()
non_null_neg = final_df['negative_tokens'].dropna()
if len(non_null_pos) > 0:
    print(f"  Positive tokens sample: {non_null_pos.iloc[0]}")
if len(non_null_neg) > 0:
    print(f"  Negative tokens sample: {non_null_neg.iloc[0]}")

In [None]:
# Create target table schema and ingest data
print("=== DATABASE INGESTION WITH TARGET SCHEMA ===")

try:
    with engine.connect() as conn:
        # Create gold schema if it doesn't exist
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS gold"))
        print("✅ Gold schema created/verified")
        
        # Drop existing table if it exists
        conn.execute(text("DROP TABLE IF EXISTS gold.final_reviews"))
        print("✅ Existing table dropped")
        
        # Create table with exact target schema
        create_table_sql = """
        CREATE TABLE gold.final_reviews (
            id SERIAL PRIMARY KEY,
            city TEXT,
            hotel_name TEXT,
            reviewer_name TEXT,
            reviewer_nationality TEXT,
            duration TEXT,
            check_in_date TEXT,
            review_date TEXT,
            travel_type TEXT,
            room_type TEXT,
            positive_review TEXT,
            negative_review TEXT,
            sentiment_classification INTEGER,
            positive_tokens TEXT[],
            negative_tokens TEXT[],
            inserted_at TIMESTAMP DEFAULT now()
        )
        """
        
        conn.execute(text(create_table_sql))
        print("✅ Target table created with proper schema")
        
        # Commit schema changes
        conn.commit()
    
    # Ingest data using manual INSERT for proper TEXT[] handling
    print("Ingesting processed data into gold.final_reviews...")
    
    with engine.connect() as conn:
        # Prepare INSERT statement
        insert_sql = """
        INSERT INTO gold.final_reviews (
            city, hotel_name, reviewer_name, reviewer_nationality, duration,
            check_in_date, review_date, travel_type, room_type, positive_review,
            negative_review, sentiment_classification, positive_tokens, negative_tokens
        ) VALUES (
            :city, :hotel_name, :reviewer_name, :reviewer_nationality, :duration,
            :check_in_date, :review_date, :travel_type, :room_type, :positive_review,
            :negative_review, :sentiment_classification, :positive_tokens, :negative_tokens
        )
        """
        
        # Insert data in batches
        batch_size = 1000
        total_rows = len(final_df)
        
        for i in tqdm(range(0, total_rows, batch_size), desc="Inserting batches"):
            batch = final_df.iloc[i:i+batch_size]
            batch_data = []
            
            for _, row in batch.iterrows():
                batch_data.append({
                    'city': row['city'],
                    'hotel_name': row['hotel_name'],
                    'reviewer_name': row['reviewer_name'],
                    'reviewer_nationality': row['reviewer_nationality'],
                    'duration': row['duration'],
                    'check_in_date': row['check_in_date'],
                    'review_date': row['review_date'],
                    'travel_type': row['travel_type'],
                    'room_type': row['room_type'],
                    'positive_review': row['positive_review'],
                    'negative_review': row['negative_review'],
                    'sentiment_classification': row['sentiment_classification'],
                    'positive_tokens': row['positive_tokens'],
                    'negative_tokens': row['negative_tokens']
                })
            
            conn.execute(text(insert_sql), batch_data)
            conn.commit()
    
    # Verify ingestion
    with engine.connect() as conn:
        result = conn.execute(text("SELECT COUNT(*) FROM gold.final_reviews"))
        count = result.scalar()
        print(f"✅ Data ingestion completed!")
        print(f"✅ Records ingested: {count}")
        
        # Verify schema matches target
        schema_result = conn.execute(text("""
            SELECT column_name, data_type, is_nullable 
            FROM information_schema.columns 
            WHERE table_schema = 'gold' AND table_name = 'final_reviews'
            ORDER BY ordinal_position
        """))
        
        print(f"✅ Target schema verification:")
        for col_name, data_type, nullable in schema_result.fetchall():
            print(f"   {col_name}: {data_type} ({'NULL' if nullable == 'YES' else 'NOT NULL'})")
            
except Exception as e:
    print(f"❌ Database ingestion failed: {e}")
    print("Saving to local CSV as backup...")
    final_df.to_csv('backup_final_reviews.csv', index=False)
    print("✅ Backup saved to backup_final_reviews.csv")

In [None]:
# Final project summary with target schema
print("=== TOURISM PROCESSING WITH TARGET SCHEMA COMPLETE ===")
print("✅ Data fetched from silver.silver_translated")
print("✅ Semantic phrase extraction and tourism token filtering applied")
print("✅ Data mapped to target schema with proper column names")
print("✅ Tourism tokens converted to PostgreSQL TEXT[] format")
print("✅ Target table created with exact schema specification")
print("✅ Data ingested into gold.final_reviews with proper data types")

print(f"\nTarget Schema Summary:")
print(f"- Source: silver.silver_translated")
print(f"- Destination: gold.final_reviews")
print(f"- Schema: 17 columns (id, 13 data columns, positive_tokens[], negative_tokens[], inserted_at)")
print(f"- Records processed: {len(final_df)}")
print(f"- Token arrays: PostgreSQL TEXT[] format")

print(f"\nColumn Mapping:")
print(f"- City → city (TEXT)")
print(f"- Hotel Name → hotel_name (TEXT)")
print(f"- Positive Review Translated → positive_review (TEXT)")
print(f"- Negative Review Translated → negative_review (TEXT)")
print(f"- Tourism_Tokens_Pos → positive_tokens (TEXT[])")
print(f"- Tourism_Tokens_Neg → negative_tokens (TEXT[])")
print(f"- sentiment classification → sentiment_classification (INTEGER)")

In [None]:
# Final verification of target schema compliance
print("=== TARGET SCHEMA COMPLIANCE VERIFICATION ===")

try:
    with engine.connect() as conn:
        # Verify exact schema match
        verification_query = """
        SELECT 
            column_name,
            data_type,
            CASE WHEN column_default LIKE 'nextval%' THEN 'SERIAL' ELSE data_type END as display_type,
            is_nullable,
            column_default
        FROM information_schema.columns 
        WHERE table_schema = 'gold' AND table_name = 'final_reviews'
        ORDER BY ordinal_position
        """
        
        result = conn.execute(text(verification_query))
        columns = result.fetchall()
        
        print("✅ Final table schema verification:")
        for col_name, data_type, display_type, nullable, default in columns:
            default_info = f" DEFAULT {default}" if default else ""
            print(f"   {col_name}: {display_type.upper()}{default_info}")
        
        # Verify token array functionality
        token_test = conn.execute(text("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(positive_tokens) as records_with_pos_tokens,
                COUNT(negative_tokens) as records_with_neg_tokens,
                array_length(positive_tokens, 1) as sample_pos_length,
                array_length(negative_tokens, 1) as sample_neg_length
            FROM gold.final_reviews 
            WHERE positive_tokens IS NOT NULL OR negative_tokens IS NOT NULL
            LIMIT 1
        """))
        
        token_stats = token_test.fetchone()
        if token_stats:
            print(f"\n✅ Token array verification:")
            print(f"   Total records: {token_stats[0]}")
            print(f"   Records with positive tokens: {token_stats[1]}")
            print(f"   Records with negative tokens: {token_stats[2]}")
            if token_stats[3]:
                print(f"   Sample positive token array length: {token_stats[3]}")
            if token_stats[4]:
                print(f"   Sample negative token array length: {token_stats[4]}")
        
        # Test array querying capability
        array_query_test = conn.execute(text("""
            SELECT positive_tokens[1] as first_positive_token
            FROM gold.final_reviews 
            WHERE positive_tokens IS NOT NULL 
            LIMIT 1
        """))
        
        first_token = array_query_test.scalar()
        if first_token:
            print(f"   Array indexing test successful: '{first_token}'")
        
except Exception as e:
    print(f"❌ Schema verification failed: {e}")

# Close engine connection
engine.dispose()
print("\n✅ Database connections closed")
print("✅ ETL Pipeline with target schema completed successfully!")
print("✅ Ready for tourism analytics with PostgreSQL TEXT[] token arrays")