## Amazon Bestseller ETL Pipeline

In [51]:
import pandas as pd
import requests
import time

### Step 1: Extract - Getting the Data from API

In [52]:
# Extract data from API
api_url = "https://data-liart.vercel.app/data"
response = requests.get(api_url)
data = response.json()

In [53]:
# Convert to DataFrame
amazon_df = pd.DataFrame(data['data'])

In [54]:
# Basic info about the data
amazon_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 999 entries, 0 to 998
Data columns (total 12 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   Unnamed: 0           999 non-null    int64  
 1   rank                 999 non-null    int64  
 2   asin                 999 non-null    object 
 3   product_title        999 non-null    object 
 4   product_price        945 non-null    object 
 5   product_star_rating  969 non-null    float64
 6   product_num_ratings  969 non-null    float64
 7   product_url          999 non-null    object 
 8   product_photo        999 non-null    object 
 9   rank_change_label    0 non-null      object 
 10  country              999 non-null    object 
 11  page                 999 non-null    int64  
dtypes: float64(2), int64(3), object(7)
memory usage: 93.8+ KB


In [55]:
amazon_df.shape

(999, 12)

In [56]:
amazon_df.isnull().sum()

Unnamed: 0               0
rank                     0
asin                     0
product_title            0
product_price           54
product_star_rating     30
product_num_ratings     30
product_url              0
product_photo            0
rank_change_label      999
country                  0
page                     0
dtype: int64

In [57]:
# Extract all data with pagination
def fetch_all_data():
    all_data = []
    page = 1

    while True:
        response = requests.get(f"https://data-liart.vercel.app/data?page={page}")
        data = response.json()

        if not data.get('data'):
            break

        all_data.extend(data['data'])
        page += 1
        time.sleep(0.5)

        if page > 10:
            break
    return all_data

In [58]:
# Extract all data
all_data = fetch_all_data()
amazon_df = pd.DataFrame(all_data)

In [59]:
all_data

[{'Unnamed: 0': 0,
  'rank': 1,
  'asin': 'B073VKKNN9',
  'product_title': 'Kaspersky | Premium - Total Security (Ultimate Security) | 1 Device | 1 Year | Email Delivery in 1 Hour',
  'product_price': '₹469.00',
  'product_star_rating': 4.3,
  'product_num_ratings': 13324.0,
  'product_url': 'https://www.amazon.in/dp/B073VKKNN9',
  'product_photo': 'https://images-eu.ssl-images-amazon.com/images/I/71wvxMSor0L._AC_UL900_SR900,600_.jpg',
  'rank_change_label': None,
  'country': 'IN',
  'page': 1},
 {'Unnamed: 0': 1,
  'rank': 2,
  'asin': 'B07PQZJ6Y8',
  'product_title': 'K7 Security K7, Total Security, 1 User, 1 Year, Email Delivery In 30 Min, No Cd',
  'product_price': '₹370.00',
  'product_star_rating': 4.4,
  'product_num_ratings': 2291.0,
  'product_url': 'https://www.amazon.in/dp/B07PQZJ6Y8',
  'product_photo': 'https://images-eu.ssl-images-amazon.com/images/I/51HB079HbPS._AC_UL900_SR900,600_.jpg',
  'rank_change_label': None,
  'country': 'IN',
  'page': 1},
 {'Unnamed: 0': 2,
  

In [60]:
amazon_df['country'].unique()

array(['IN', 'US', 'CA', 'AU', 'DE', 'FR', 'IT', 'ES', 'JP', 'MX'],
      dtype=object)

In [61]:
amazon_df.isnull().sum()

Unnamed: 0                0
rank                      0
asin                      0
product_title             0
product_price           540
product_star_rating     300
product_num_ratings     300
product_url               0
product_photo             0
rank_change_label      9990
country                   0
page                      0
dtype: int64

In [62]:
amazon_df.shape

(9990, 12)

In [63]:
# Check for duplicates
amazon_df['asin'].nunique()

961

In [64]:
# Remove duplicates based on ASIN + country(same product in same country)
amazon_df = amazon_df.drop_duplicates(subset=['asin', 'country'])

In [65]:
amazon_df.shape

(999, 12)

In [66]:
# Save extracted data
amazon_df.to_csv('extracted_amazon_data.csv', index=False)

In [67]:
# Check data quality
amazon_df.isnull().sum()

Unnamed: 0               0
rank                     0
asin                     0
product_title            0
product_price           54
product_star_rating     30
product_num_ratings     30
product_url              0
product_photo            0
rank_change_label      999
country                  0
page                     0
dtype: int64

In [68]:
amazon_df['country'].value_counts()

country
IN    100
CA    100
IT    100
AU    100
DE    100
FR    100
JP    100
ES    100
MX    100
US     99
Name: count, dtype: int64

## Transformation

In [69]:
import pandas as pd
import numpy as np
import re

In [70]:
# Load the extracted data
amazon_df = pd.read_csv('extracted_amazon_data.csv')

In [71]:
amazon_df.shape

(999, 12)

In [72]:
amazon_df.head()

Unnamed: 0.1,Unnamed: 0,rank,asin,product_title,product_price,product_star_rating,product_num_ratings,product_url,product_photo,rank_change_label,country,page
0,0,1,B073VKKNN9,Kaspersky | Premium - Total Security (Ultimate...,₹469.00,4.3,13324.0,https://www.amazon.in/dp/B073VKKNN9,https://images-eu.ssl-images-amazon.com/images...,,IN,1
1,1,2,B07PQZJ6Y8,"K7 Security K7, Total Security, 1 User, 1 Year...",₹370.00,4.4,2291.0,https://www.amazon.in/dp/B07PQZJ6Y8,https://images-eu.ssl-images-amazon.com/images...,,IN,1
2,2,3,B0D1KL34JM,Microsoft Office 2021 Professional - Lifetime ...,"₹1,799.00",4.5,388.0,https://www.amazon.in/dp/B0D1KL34JM,https://images-eu.ssl-images-amazon.com/images...,,IN,1
3,3,4,B07B9YYLGG,"Bitdefender - 1 Device,1 Year - Mobile Securit...",₹94.00,4.1,9630.0,https://www.amazon.in/dp/B07B9YYLGG,https://images-eu.ssl-images-amazon.com/images...,,IN,1
4,4,5,B073VLGMZ4,"McAfee Total Protection 2025 | 1 Device, 3 Yea...","₹1,699.00",4.4,5783.0,https://www.amazon.in/dp/B073VLGMZ4,https://images-eu.ssl-images-amazon.com/images...,,IN,1


In [73]:
# Drop irrelevant columns
amazon_df = amazon_df.drop(columns=['Unnamed: 0', 'rank_change_label', 'page'])

In [74]:
amazon_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 999 entries, 0 to 998
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   rank                 999 non-null    int64  
 1   asin                 999 non-null    object 
 2   product_title        999 non-null    object 
 3   product_price        945 non-null    object 
 4   product_star_rating  969 non-null    float64
 5   product_num_ratings  969 non-null    float64
 6   product_url          999 non-null    object 
 7   product_photo        999 non-null    object 
 8   country              999 non-null    object 
dtypes: float64(2), int64(1), object(6)
memory usage: 70.4+ KB


### Clean `product_price` column
Cleans `product_price` by removing currency symbols

In [75]:
# Clean price and extract numeric values
def clean_price(price_str):
    if pd.isna(price_str):
        return np.nan
    
    price_str = price_str.strip()

    #Handle Eurpean format: replace comma with dot for decimal
    if  '€' in price_str:
        cleaned = re.sub(r'[₹$€\xa0]', '', price_str)
        cleaned = cleaned.replace(',', '.')
    elif '￥' in price_str:
        cleaned = re.sub(r'[￥]', '', price_str)
        cleaned = cleaned.replace(',', '')
    else:
        cleaned = re.sub(r'[₹$,]', '', price_str)
        cleaned = cleaned.replace(',', '')

    try:
        return float(cleaned)
    except:
        return np.nan

In [76]:
amazon_df['cleaned_price'] = amazon_df['product_price'].apply(clean_price)

Create `price_usd` (convert to USD)

In [77]:
# Approximate exchange rates
exchange_rates = {
    'IN': 0.012,  # INR to USD (1 INR = 0.012 USD)
    'US': 1.0,    # USD to USD
    'UK': 1.25,   # GBP to USD
    'CA': 0.74,   # CAD to USD
    'AU': 0.66,   # AUD to USD
    'DE': 1.08,   # EUR to USD
    'FR': 1.08,   # EUR to USD
    'IT': 1.08,   # EUR to USD
    'ES': 1.08,   # EUR to USD
    'JP': 0.0067, # JPY to USD

}

In [78]:
def convert_to_usd(row):
    if pd.isna(row['cleaned_price']):
        return np.nan
    
    country = row['country']
    rate = exchange_rates.get(country, 1.0)
    return row['cleaned_price'] * rate
amazon_df['price_usd'] = amazon_df.apply(convert_to_usd, axis=1)

In [79]:
amazon_df.head()

Unnamed: 0,rank,asin,product_title,product_price,product_star_rating,product_num_ratings,product_url,product_photo,country,cleaned_price,price_usd
0,1,B073VKKNN9,Kaspersky | Premium - Total Security (Ultimate...,₹469.00,4.3,13324.0,https://www.amazon.in/dp/B073VKKNN9,https://images-eu.ssl-images-amazon.com/images...,IN,469.0,5.628
1,2,B07PQZJ6Y8,"K7 Security K7, Total Security, 1 User, 1 Year...",₹370.00,4.4,2291.0,https://www.amazon.in/dp/B07PQZJ6Y8,https://images-eu.ssl-images-amazon.com/images...,IN,370.0,4.44
2,3,B0D1KL34JM,Microsoft Office 2021 Professional - Lifetime ...,"₹1,799.00",4.5,388.0,https://www.amazon.in/dp/B0D1KL34JM,https://images-eu.ssl-images-amazon.com/images...,IN,1799.0,21.588
3,4,B07B9YYLGG,"Bitdefender - 1 Device,1 Year - Mobile Securit...",₹94.00,4.1,9630.0,https://www.amazon.in/dp/B07B9YYLGG,https://images-eu.ssl-images-amazon.com/images...,IN,94.0,1.128
4,5,B073VLGMZ4,"McAfee Total Protection 2025 | 1 Device, 3 Yea...","₹1,699.00",4.4,5783.0,https://www.amazon.in/dp/B073VLGMZ4,https://images-eu.ssl-images-amazon.com/images...,IN,1699.0,20.388


### Create `revenue_estimate`

In [80]:
amazon_df['revenue_estimate'] = amazon_df['price_usd'] * amazon_df['product_num_ratings']

### Create `rating_bucket`

In [81]:
# Categorize rating into buckets
def categorize_rating(rating):
    if pd.isna(rating):
        return 'No Rating'
    elif rating <= 2.0:
        return 'Low'
    elif rating <= 3.5:
        return 'Medium'
    elif rating <= 4.5:
        return 'High'
    else:
        return 'Excellent'
amazon_df['rating_bucket'] = amazon_df['product_star_rating'].apply(categorize_rating)
    

### Create `review_density`

In [82]:
amazon_df['review_density'] = amazon_df['product_num_ratings'] / amazon_df['price_usd']
amazon_df['review_density'] = amazon_df['review_density'].replace([np.inf, -np.inf], np.nan)

### Create `category` (extract from product_title)

In [83]:
# Extract product category from title
def extract_category(title):
    if pd.isna(title):
        return 'Other'
    
    title = title.lower()

    if any(word in title for word in ['antivirus', 'security', 'norton', 'mcafee', 'kaspersky', 'bitdefender', 'firewall']):
        return 'Security Software'
    elif any(word in title for word in ['office', 'word', 'excel', 'powerpoint', 'microsoft 365', 'ms office']):
        return 'Office Software'
    elif any(word in title for word in ['photo', 'video', 'adobe', 'photoshop', 'illustrator', 'premiere', 'lightroom']):
        return 'Creative Software'
    elif any(word in title for word in ['vpn', 'virtual private network', 'mullvad']):
        return 'VPN Software'
    elif any(word in title for word in ['cloud', 'aws', 'azure', 'google cloud', 'dropbox']):
        return 'Cloud Software'
    elif any(word in title for word in ['game', 'gaming', 'steam', 'epic games', 'origin']):
        return 'Gaming Software'
    elif any(word in title for word in ['utility', 'cleaner', 'ccleaner', 'defragment', 'disk']):
        return 'Utility Software'
    elif any(word in title for word in ['windows', 'win10', 'win11', 'ubuntu', 'macos']):
        return 'Operating System'   
    elif any(word in title for word in ['development', 'ide', 'visual studio', 'eclipse', 'pycharm', 'intellij']):  
        return 'Development Software'
    elif any(word in title for word in ['app store', 'gift card']):
        return 'Digital Credits'
    elif any(word in title for word in ['subscription', 'monthly', 'annual']):
        return 'Subscription Services'
    elif any(word in title for word in ['autocad', 'drafting', 'design']):
        return 'Design Software'
    else:
        return 'Other Software'
    
amazon_df['category'] = amazon_df['product_title'].apply(extract_category)

### Create `brand` (extract from product_title)

In [84]:
# Extract brand from  product title
def extract_brand(title):
    if pd.isna(title):
        return 'Other'
    
    title = title.lower()

    # Check for common brands
    brands = {
        'microsoft': 'Microsoft',
        'norton': 'Norton',
        'mcafee': 'McAfee',
        'kaspersky': 'Kaspersky',
        'bitdefender': 'Bitdefender',
        'turbotax': 'TurboTax',
        'h&r block': 'H&R Block',
        'adobe': 'Adobe',
        'quick heal': 'Quick Heal',
        'webroot': 'Webroot',
        'mullvad': 'Mullvad',
        'apple': 'Apple',
        'eset': 'ESET',
        'k7': 'K7 Security'
    }

    for brand_key, brand_name in brands.items():
        if brand_key in title:
            return brand_name

    return 'Other Brand'
amazon_df['brand'] = amazon_df['product_title'].apply(extract_brand)


### Create `country_region`

In [85]:
# Map country code to region
def map_country_to_region(country):
    regions = {
        'US': 'North America',
        'CA': 'North America',
        'UK': 'Europe',
        'DE': 'Europe',
        'FR': 'Europe',
        'IT': 'Europe',
        'ES': 'Europe',
        'IN': 'Asia',
        'JP': 'Asia',
        'AU': 'Oceania',
        'BR': 'South America',
        'MX': 'North America'
    }
    return regions.get(country, 'Other Region')
amazon_df['country_region'] = amazon_df['country'].apply(map_country_to_region)


In [86]:
amazon_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 999 entries, 0 to 998
Data columns (total 17 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   rank                 999 non-null    int64  
 1   asin                 999 non-null    object 
 2   product_title        999 non-null    object 
 3   product_price        945 non-null    object 
 4   product_star_rating  969 non-null    float64
 5   product_num_ratings  969 non-null    float64
 6   product_url          999 non-null    object 
 7   product_photo        999 non-null    object 
 8   country              999 non-null    object 
 9   cleaned_price        945 non-null    float64
 10  price_usd            945 non-null    float64
 11  revenue_estimate     915 non-null    float64
 12  rating_bucket        999 non-null    object 
 13  review_density       915 non-null    float64
 14  category             999 non-null    object 
 15  brand                999 non-null    obj

### Handling missing values

In [87]:
amazon_df.isnull().sum()

rank                    0
asin                    0
product_title           0
product_price          54
product_star_rating    30
product_num_ratings    30
product_url             0
product_photo           0
country                 0
cleaned_price          54
price_usd              54
revenue_estimate       84
rating_bucket           0
review_density         84
category                0
brand                   0
country_region          0
dtype: int64

In [88]:
# Fill missing product_star_rating with median by category
amazon_df['product_star_rating'] = amazon_df.groupby('category')['product_star_rating'].transform(lambda x: x.fillna(x.median()))

In [89]:
# Fill missing num_ratings with median by category
amazon_df['product_num_ratings'] = amazon_df.groupby('category')['product_num_ratings'].transform(lambda x: x.fillna(x.median()))

In [90]:
# Recalculate derived variables after filling missing values
amazon_df['revenue_estimate'] = amazon_df['price_usd'] * amazon_df['product_num_ratings']
amazon_df['review_density'] = amazon_df['product_num_ratings'] / amazon_df['price_usd']
amazon_df['review_density'] = amazon_df['review_density'].replace([np.inf, -np.inf], np.nan)
amazon_df['rating_bucket'] = amazon_df['product_star_rating'].apply(categorize_rating)

### Transformation Summarry

In [91]:
# Final shape
amazon_df.shape

(999, 17)

In [92]:
# Missing values after cleaning
amazon_df.isnull().sum()

rank                    0
asin                    0
product_title           0
product_price          54
product_star_rating     0
product_num_ratings     0
product_url             0
product_photo           0
country                 0
cleaned_price          54
price_usd              54
revenue_estimate       54
rating_bucket           0
review_density         54
category                0
brand                   0
country_region          0
dtype: int64

In [93]:
# Check problematic prices
print("Sample of missing price_usd where product_price exists:")
mask = amazon_df['price_usd'].isna() & amazon_df['product_price'].notna()
print(amazon_df[mask]['product_price'].head(10).tolist())

Sample of missing price_usd where product_price exists:
[]


In [94]:
# Save transformed data
amazon_df.to_csv('transformed_amazon_data.csv', index=False)

### Loading

In [95]:
from sqlalchemy import create_engine
import psycopg2

In [96]:
# Load transformed data
amazon_df = pd.read_csv('transformed_amazon_data.csv')
print(f"Loading {amazon_df.shape[0]} records to PostgreSQL database")


Loading 999 records to PostgreSQL database


In [97]:
# Database connection 
conn_string = 'postgresql://avnadmin:AVNS_dkpx47HSLApO3Wbk3-J@pg-204f0187-os679736-e1d8.c.aivencloud.com:22022/defaultdb?sslmode=require'
engine = create_engine(conn_string)

In [98]:
# Load data into PostgreSQL
amazon_df.to_sql('amazon_data', engine, if_exists='replace', index=False)

999

In [99]:
# Verify the load
verification_query = """ 
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT country) as unique_countries,
    COUNT(DISTINCT category) as unique_categories,
    COUNT(DISTINCT brand) as unique_brands,
    AVG(price_usd) as avg_price_usd,
    AVG(product_star_rating) as avg_rating
FROM amazon_data
WHERE price_usd IS NOT NULL;
"""
result = pd.read_sql(verification_query, engine)

In [100]:
result.head()

Unnamed: 0,total_records,unique_countries,unique_categories,unique_brands,avg_price_usd,avg_rating
0,945,10,12,15,235.445672,4.136878
