In [1]:
import requests
import pandas as pd
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv



In [2]:
load_dotenv()  # Load environment variables from .env file


True

In [3]:

# --- EXTRACTION ---
print("=== EXTRACTION PHASE ===")
base_url = "https://data-liart.vercel.app/data"
all_products = []
page = 1
has_more = True

print("Extracting data from API...")
while has_more:
    response = requests.get(f"{base_url}?page={page}")
    data = response.json()

    # Use the correct key: 'data'
    all_products.extend(data.get('data', []))

    has_more = data.get('hasMore', False)
    page += 1
    print(f"Fetched page {page-1}. Total products so far: {len(all_products)}")

print(f"Extraction complete. Total products: {len(all_products)}")
df = pd.DataFrame(all_products)


=== EXTRACTION PHASE ===
Extracting data from API...
Fetched page 1. Total products so far: 999
Extraction complete. Total products: 999


In [4]:

# --- TRANSFORMATION ---
print("\n=== TRANSFORMATION PHASE ===")
df_clean = df.copy()
df_clean = df_clean.drop_duplicates()



=== TRANSFORMATION PHASE ===


# 1. Handle missing values using the CORRECT column names

In [5]:

df_clean['product_num_ratings'] = df_clean['product_num_ratings'].fillna(0).astype(int)
df_clean['product_star_rating'] = df_clean['product_star_rating'].fillna(0)



# 2. CRITICAL FIX: Clean and convert the 'product_price' column from string to float

In [6]:
# The price might have symbols like '$', '£', '€' or commas ','
def clean_price(price_value):
    if pd.isna(price_value):
        return 0.0
    # If it's already a number, return it
    if isinstance(price_value, (int, float)):
        return float(price_value)
    # If it's a string, remove any non-digit characters except the decimal point
    price_str = str(price_value)
    # Remove currency symbols, commas, and any other non-numeric characters (except .)
    cleaned_str = ''.join(char for char in price_str if char.isdigit() or char == '.')
    # Convert the cleaned string to a float. If it's empty, return 0.
    try:
        return float(cleaned_str) if cleaned_str else 0.0
    except ValueError:
        return 0.0


In [7]:

# Apply the cleaning function to the price column
df_clean['product_price'] = df_clean['product_price'].apply(clean_price)
print("Cleaned product_price column. Sample prices:", df_clean['product_price'].head().tolist())


Cleaned product_price column. Sample prices: [469.0, 370.0, 1799.0, 94.0, 1699.0]


# 3. Define a simple exchange rate dictionary (Assumes prices are in local currency)

In [8]:

# Note: We need to know the currency from the country. Let's map countries to currencies.
country_to_currency = {
    'US': 'USD', 'CA': 'CAD', 'UK': 'GBP', 'DE': 'EUR', 'FR': 'EUR',
    'IT': 'EUR', 'ES': 'EUR', 'NL': 'EUR', 'AU': 'AUD', 'IN': 'INR', 'JP': 'JPY'
}
exchange_rates = {'USD': 1.0, 'CAD': 0.73, 'GBP': 1.22, 'EUR': 1.07, 'AUD': 0.65, 'INR': 0.012, 'JPY': 0.0067}


# 4. Create Derived Variables

In [9]:

# First, get the currency based on the country
df_clean['product_currency'] = df_clean['country'].map(country_to_currency)



In [10]:
# Now calculate price in USD (This will now work because product_price is a number)
df_clean['price_usd'] = df_clean.apply(
    lambda row: row['product_price'] * exchange_rates.get(row['product_currency'], 1),
    axis=1
)
df_clean['revenue_estimate'] = df_clean['price_usd'] * df_clean['product_num_ratings']

def rating_bucket(rating):
    if rating >= 4.5:
        return 'Excellent'
    elif rating >= 3.5:
        return 'High'
    elif rating >= 2.0:
        return 'Medium'
    else:
        return 'Low'

df_clean['rating_bucket'] = df_clean['product_star_rating'].apply(rating_bucket)
# Avoid division by zero by adding 1 to the denominator
df_clean['review_density'] = df_clean['product_num_ratings'] / (df_clean['price_usd'] + 1)


In [11]:

# Simple category extraction from title
def extract_category(title):
    if pd.isna(title):
        return 'Other'
    title_lower = title.lower()
    if 'game' in title_lower:
        return 'Gaming'
    elif 'security' in title_lower or 'antivirus' in title_lower or 'vpn' in title_lower:
        return 'Security'
    elif 'office' in title_lower or 'microsoft' in title_lower:
        return 'Productivity'
    elif 'photo' in title_lower or 'adobe' in title_lower:
        return 'Creative'
    else:
        return 'Other'

df_clean['category'] = df_clean['product_title'].apply(extract_category)


In [12]:

# Simple brand extraction (assumes brand is first word in title)
df_clean['brand'] = df_clean['product_title'].str.split().str[0]


In [13]:

# Map countries to regions
region_map = {
    'US': 'North America', 'CA': 'North America',
    'UK': 'Europe', 'DE': 'Europe', 'FR': 'Europe', 'IT': 'Europe', 'ES': 'Europe', 'NL': 'Europe',
    'AU': 'Oceania',
    'IN': 'Asia', 'JP': 'Asia'
}
df_clean['country_region'] = df_clean['country'].map(region_map).fillna('Other')

print("Transformation complete. Sample of new calculated columns:")
print(df_clean[['product_price', 'product_currency', 'price_usd', 'rating_bucket']].head())


Transformation complete. Sample of new calculated columns:
   product_price product_currency  price_usd rating_bucket
0          469.0              INR      5.628          High
1          370.0              INR      4.440          High
2         1799.0              INR     21.588     Excellent
3           94.0              INR      1.128          High
4         1699.0              INR     20.388          High


In [14]:

# --- LOADING ---
print("\n=== LOADING PHASE ===")
# Build the database connection string from environment variables
db_connection_url = f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"



=== LOADING PHASE ===


In [15]:

# Create SQLAlchemy engine
engine = create_engine(db_connection_url)


In [16]:

# Define a table name
table_name = 'amazon_bestsellers'

print("Loading data into PostgreSQL...")
# Use `if_exists='replace'` for the first run. Change to `'append'` for updates.
df_clean.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"Successfully loaded {len(df_clean)} records into table '{table_name}'.")
print("ETL Pipeline completed successfully! 🎉")


Loading data into PostgreSQL...
Successfully loaded 999 records into table 'amazon_bestsellers'.
ETL Pipeline completed successfully! 🎉


In [17]:

# --- ANALYSIS ---
print("\n=== SQL ANALYSIS IN PYCHARM (POSTGRESQL COMPATIBLE) ===")



=== SQL ANALYSIS IN PYCHARM (POSTGRESQL COMPATIBLE) ===


In [18]:

# Create a list of the analysis queries with PostgreSQL-compatible syntax
analysis_queries = {
    "1. Top 5 Highest Revenue Products by Country": """
        SELECT product_title, country, revenue_estimate
        FROM amazon_bestsellers
        ORDER BY revenue_estimate DESC
        LIMIT 5;
    """,

    "2. Average Product Price per Category": """
        SELECT category, ROUND(AVG(price_usd)::numeric, 2) AS avg_price_usd
        FROM amazon_bestsellers
        GROUP BY category
        ORDER BY avg_price_usd DESC;
    """,

    "3. Products Count per Rating Bucket": """
        SELECT rating_bucket, COUNT(*) AS product_count
        FROM amazon_bestsellers
        GROUP BY rating_bucket
        ORDER BY product_count DESC;
    """,

    "4. Top 5 Products with Highest Review Density": """
        SELECT product_title, review_density
        FROM amazon_bestsellers
        WHERE price_usd > 0
        ORDER BY review_density DESC
        LIMIT 5;
    """,

    "5. Average Star Rating per Brand (with multiple products)": """
        SELECT brand, ROUND(AVG(product_star_rating)::numeric, 2) AS avg_rating
        FROM amazon_bestsellers
        GROUP BY brand
        HAVING COUNT(*) > 1
        ORDER BY avg_rating DESC;
    """,

    "6. Total Revenue Estimate by Country Region": """
        SELECT country_region, ROUND(SUM(revenue_estimate)::numeric, 2) AS total_revenue
        FROM amazon_bestsellers
        GROUP BY country_region
        ORDER BY total_revenue DESC;
    """,

    "7. Top 5 Most Expensive Products": """
        SELECT product_title, country, price_usd
        FROM amazon_bestsellers
        ORDER BY price_usd DESC
        LIMIT 5;
    """
}


In [19]:

# Execute each query and display results
for query_name, query_sql in analysis_queries.items():
    print(f"\n📊 {query_name}")
    print("-" * 60)

    try:
        # Execute the query and load results into a DataFrame
        result_df = pd.read_sql_query(query_sql, engine)

        # Display the results
        if not result_df.empty:
            print(result_df.to_string(index=False))
        else:
            print("No results found.")

    except Exception as e:
        print(f"Error executing query: {e}")

print("\n✅ All analysis completed successfully!")



📊 1. Top 5 Highest Revenue Products by Country
------------------------------------------------------------
                                                                                                                product_title country  revenue_estimate
                                                                     Microsoft Office Home 2024 | Aktivierungscode per E-Mail      DE      246366227.77
Microsoft 365 Family | 1 Jahr | bis zu 6 Personen | Office Apps mit KI | bis zu 6 TB Cloudspeicher | Amazon Abonnement mit au      DE      223780774.77
      Microsoft 365 Single | 1 Jahr | 1 Person | Office Apps mit KI | 1 TB Cloudspeicher | Abo mit automatischer Verlängerung      DE      174503422.77
      WISO Steuer 2025 (für Steuerjahr 2024) Für Windows, Mac, Smartphones und Tablets | Digitaler Download | Amazon Exklusiv      DE       56870296.70
     WISO Steuer 2025 (für Steuerjahr 2024)|Für Windows, Mac, Smartphones und Tablets|frustfreie Verpackung | Amazon Exklusiv      

In [20]:

# --- PANDAS ANALYSIS (ALTERNATIVE) ---
print("\n=== PANDAS ANALYSIS (ALTERNATIVE) ===")

# 2. Average product price per category (using pandas)
avg_price_by_category = df_clean.groupby('category')['price_usd'].mean().round(2)
print("\nAverage Price by Category:")
print(avg_price_by_category.sort_values(ascending=False))

# 3. Count products in each rating bucket
rating_bucket_count = df_clean['rating_bucket'].value_counts()
print("\nProducts per Rating Bucket:")
print(rating_bucket_count)

# 6. Total revenue by region
revenue_by_region = df_clean.groupby('country_region')['revenue_estimate'].sum().round(2)
print("\nTotal Revenue by Region:")
print(revenue_by_region.sort_values(ascending=False))

print("\n🎉 ETL Pipeline and Analysis completely finished!")


=== PANDAS ANALYSIS (ALTERNATIVE) ===

Average Price by Category:
category
Creative        8298.00
Productivity    3112.52
Security        2188.56
Other           1506.05
Gaming           959.51
Name: price_usd, dtype: float64

Products per Rating Bucket:
rating_bucket
High         671
Excellent    228
Medium        64
Low           36
Name: count, dtype: int64

Total Revenue by Region:
country_region
Europe           2.092738e+09
Other            4.120061e+07
North America    1.763607e+07
Asia             7.173317e+06
Oceania          8.781538e+05
Name: revenue_estimate, dtype: float64

🎉 ETL Pipeline and Analysis completely finished!
