In [3]:
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json

# Replace with your actual Keepa API key
API_KEY = '1o6o8o0bd5ko9o6o635rs85jqj0no7g1upls93u8tbtt83sdt5gak24m593s6967'
API_URL = 'https://api.keepa.com/product?key={}&domain=1&asin={}'

def fetch_keepa_data(asin):
    response = requests.get(API_URL.format(API_KEY, asin))
    if response.status_code != 200:
        raise ValueError(f"Failed to fetch data from Keepa API, status code: {response.status_code}")
    return response.json()

def process_price_history(product_data):
    csv_data = product_data.get('csv', [])
    offer_price_data = csv_data[3] if len(csv_data) > 3 and csv_data[3] is not None else []
    if not offer_price_data:
        raise ValueError("Offer price data is missing or not at the expected index.")
    
    base_date = datetime(2011, 1, 1)
    offer_price_dates = [base_date + timedelta(minutes=30 * i) for i in range(0, len(offer_price_data), 2)]
    offer_price_values = [offer_price_data[i] for i in range(1, len(offer_price_data), 2)]
    
    df_prices = pd.DataFrame({
        'Date': offer_price_dates,
        'OfferPrice': offer_price_values
    })
    return df_prices

def detect_castle_top_pattern(series, window=336):
    if len(series) < window:
        return [0] * len(series)
    pattern_detected = [0] * window
    for i in range(window, len(series) - window):
        is_castle_top = all(series[j] > series[j - 1] and series[j] > series[j + 1] for j in range(i - window // 2, i + window // 2, 2))
        pattern_detected.append(int(is_castle_top))
    pattern_detected.extend([0] * (len(series) - len(pattern_detected)))
    return pattern_detected

def detect_flat_lines(series, min_length=960, max_length=1440):
    if len(series) < min_length:
        return [0] * len(series)
    flat_line = [0] * len(series)
    count = 0
    for i in range(1, len(series)):
        if series[i] == series[i-1]:
            count += 1
        else:
            if min_length <= count <= max_length:
                for j in range(i - count, i):
                    flat_line[j] = 1
            count = 0
    return flat_line

def analyze_data(asin):
    keepa_data = fetch_keepa_data(asin)
    if 'products' not in keepa_data or len(keepa_data['products']) == 0:
        raise ValueError("No products found")
    
    product_data = keepa_data['products'][0]
    df_prices = process_price_history(product_data)
    
    df_prices['CastleTopPattern'] = detect_castle_top_pattern(df_prices['OfferPrice'])
    df_prices['FlatLine'] = detect_flat_lines(df_prices['OfferPrice'])
    
    if len(df_prices) >= 336:
        df_prices['Drop7Days'] = df_prices['OfferPrice'].diff(periods=-336)
        df_prices['LargeDrop7Days'] = (df_prices['Drop7Days'] >= 4) & (df_prices['Drop7Days'] < 7)
        df_prices['ExtraLargeDrop7Days'] = df_prices['Drop7Days'] >= 7
    else:
        df_prices['Drop7Days'] = 0
        df_prices['LargeDrop7Days'] = False
        df_prices['ExtraLargeDrop7Days'] = False

    if len(df_prices) >= 96:
        df_prices['Drop2Days'] = df_prices['OfferPrice'].diff(periods=-96)
        df_prices['LargeDrop2Days'] = df_prices['Drop2Days'] >= 4
        df_prices['ExtraLargeDrop2Days'] = df_prices['Drop2Days'] >= 7
    else:
        df_prices['Drop2Days'] = 0
        df_prices['LargeDrop2Days'] = False
        df_prices['ExtraLargeDrop2Days'] = False
    
    df_prices['IsSeasonal'] = df_prices['Date'].apply(lambda x: x.month in [10, 11, 12])
    df_prices['SeasonalDrop'] = df_prices['LargeDrop7Days'] & df_prices['IsSeasonal']
    df_prices['NonSeasonalDrop'] = df_prices['LargeDrop7Days'] & ~df_prices['IsSeasonal']
    
    risk_factors = []
    total_score, individual_scores = assign_score(df_prices, product_data, risk_factors)
    response = {
        "total_score": int(total_score),
        "individual_scores": individual_scores,
        "risk_factors": risk_factors
    }
    
    return response

def assign_score(df, product_data, risk_factors):
    scores = {
        'large_drop_7days': 20 if df['LargeDrop7Days'].sum() == 1 else 30 if 2 <= df['LargeDrop7Days'].sum() <= 4 else 40 if df['LargeDrop7Days'].sum() >= 5 else 0,
        'large_drop_2days': 25 if df['LargeDrop2Days'].sum() == 1 else 35 if 2 <= df['LargeDrop2Days'].sum() <= 4 else 45 if df['LargeDrop2Days'].sum() >= 5 else 0,
        'extra_large_drop_7days': 50 if df['ExtraLargeDrop7Days'].sum() == 1 else 55 if 2 <= df['ExtraLargeDrop7Days'].sum() <= 4 else 0,
        'extra_large_drop_2days': 80 if df['ExtraLargeDrop2Days'].sum() == 1 else 85 if df['ExtraLargeDrop2Days'].sum() > 1 else 0,
        'castle_top_pattern': 5 if df['CastleTopPattern'].sum() > 0 else 0,
        'seasonal_drop': -15 if df['SeasonalDrop'].sum() > 0 else 0,
        'non_seasonal_drop': 10 if df['NonSeasonalDrop'].sum() > 0 else 0
    }

    individual_scores = {key: val for key, val in scores.items() if val != 0}
    total_score = sum(individual_scores.values())

    for key, value in individual_scores.items():
        risk_factors.append({"factor": key, "severity": value})

    return min(total_score, 95), individual_scores

# Example Usage
asin = 'B00ABQD6OK'  # Replace with your ASIN
result = analyze_data(asin)
print(json.dumps(result, indent=2))


{
  "total_score": 95,
  "individual_scores": {
    "large_drop_2days": 45,
    "extra_large_drop_2days": 85
  },
  "risk_factors": [
    {
      "factor": "large_drop_2days",
      "severity": 45
    },
    {
      "factor": "extra_large_drop_2days",
      "severity": 85
    }
  ]
}


In [8]:
# Importing necessary libraries
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json

# Keepa API credentials
API_KEY = '1o6o8o0bd5ko9o6o635rs85jqj0no7g1upls93u8tbtt83sdt5gak24m593s6967'
API_URL = 'https://api.keepa.com/product?key={}&domain=1&asin={}'

# Fetch data from Keepa API
def fetch_keepa_data(asin):
    response = requests.get(API_URL.format(API_KEY, asin))
    if response.status_code != 200:
        raise ValueError(f"Failed to fetch data from Keepa API, status code: {response.status_code}")
    return response.json()

# Process price history from Keepa data
def process_price_history(product_data):
    csv_data = product_data.get('csv', [])
    offer_price_data = csv_data[3] if len(csv_data) > 3 and csv_data[3] is not None else []
    if not offer_price_data:
        raise ValueError("Offer price data is missing or not at the expected index.")
    
    base_date = datetime(2011, 1, 1)
    offer_price_dates = [base_date + timedelta(minutes=30 * i) for i in range(0, len(offer_price_data), 2)]
    offer_price_values = [offer_price_data[i] for i in range(1, len(offer_price_data), 2)]
    
    df_prices = pd.DataFrame({
        'Date': offer_price_dates,
        'OfferPrice': offer_price_values
    })
    return df_prices

# Detect patterns
def detect_castle_top_pattern(series, window=336):
    if len(series) < window:
        return [0] * len(series)
    pattern_detected = [0] * window
    for i in range(window, len(series) - window):
        is_castle_top = all(series[j] > series[j - 1] and series[j] > series[j + 1] for j in range(i - window // 2, i + window // 2, 2))
        pattern_detected.append(int(is_castle_top))
    pattern_detected.extend([0] * (len(series) - len(pattern_detected)))
    return pattern_detected

def detect_flat_lines(series, min_length=960, max_length=1440):
    if len(series) < min_length:
        return [0] * len(series)
    flat_line = [0] * len(series)
    count = 0
    for i in range(1, len(series)):
        if series[i] == series[i-1]:
            count += 1
        else:
            if min_length <= count <= max_length:
                for j in range(i - count, i):
                    flat_line[j] = 1
            count = 0
    return flat_line

# Analyze data
def analyze_data(asin):
    keepa_data = fetch_keepa_data(asin)
    if 'products' not in keepa_data or len(keepa_data['products']) == 0:
        raise ValueError("No products found")
    
    product_data = keepa_data['products'][0]
    df_prices = process_price_history(product_data)
    
    df_prices['CastleTopPattern'] = detect_castle_top_pattern(df_prices['OfferPrice'])
    df_prices['FlatLine'] = detect_flat_lines(df_prices['OfferPrice'])
    
    if len(df_prices) >= 17520:  # 365 days * 48 half-hour intervals
        df_prices['Drop365DaysForward'] = df_prices['OfferPrice'].diff(periods=-17520)
        df_prices['Drop365DaysBackward'] = df_prices['OfferPrice'].diff(periods=17520)

        df_prices['LargeDrop365DaysForward'] = (df_prices['Drop365DaysForward'] >= 4) & (df_prices['Drop365DaysForward'] < 7)
        df_prices['ExtraLargeDrop365DaysForward'] = df_prices['Drop365DaysForward'] >= 7

        df_prices['LargeDrop365DaysBackward'] = (df_prices['Drop365DaysBackward'] >= 4) & (df_prices['Drop365DaysBackward'] < 7)
        df_prices['ExtraLargeDrop365DaysBackward'] = df_prices['Drop365DaysBackward'] >= 7
    else:
        df_prices['Drop365DaysForward'] = 0
        df_prices['LargeDrop365DaysForward'] = False
        df_prices['ExtraLargeDrop365DaysForward'] = False

        df_prices['Drop365DaysBackward'] = 0
        df_prices['LargeDrop365DaysBackward'] = False
        df_prices['ExtraLargeDrop365DaysBackward'] = False
    
    df_prices['IsSeasonal'] = df_prices['Date'].apply(lambda x: x.month in [10, 11, 12])
    df_prices['SeasonalDropForward'] = df_prices['LargeDrop365DaysForward'] & df_prices['IsSeasonal']
    df_prices['NonSeasonalDropForward'] = df_prices['LargeDrop365DaysForward'] & ~df_prices['IsSeasonal']
    df_prices['SeasonalDropBackward'] = df_prices['LargeDrop365DaysBackward'] & df_prices['IsSeasonal']
    df_prices['NonSeasonalDropBackward'] = df_prices['LargeDrop365DaysBackward'] & ~df_prices['IsSeasonal']

    df_prices['NewListing'] = (datetime.now() - df_prices['Date'].min()).days < 60
    df_prices['ConsistentFlatLines'] = df_prices['FlatLine'].rolling(30).sum() >= 30
    
    risk_factors = []
    total_score, individual_scores = assign_score(df_prices, product_data, risk_factors)
    response = {
        "total_score": int(total_score),
        "individual_scores": individual_scores,
        "risk_factors": risk_factors
    }
    
    return response

# Assign scores
def assign_score(df, product_data, risk_factors):
    scores = {
        'forward_quick_drops': 35 if df['LargeDrop365DaysForward'].sum() <= 4 else 0,
        'forward_extra_large_drops': 20 if df['ExtraLargeDrop365DaysForward'].sum() > 0 else 0,
        'backward_quick_drops': 35 if df['LargeDrop365DaysBackward'].sum() <= 4 else 0,
        'backward_extra_large_drops': 20 if df['ExtraLargeDrop365DaysBackward'].sum() > 0 else 0,
        'extra_long_flat_lines': 25 if df['ConsistentFlatLines'].sum() > 0 else 0,
        'new_listing': -10 if df['NewListing'].sum() > 0 else 0,
        'seasonal_drop_forward': -15 if df['SeasonalDropForward'].sum() > 0 else 0,
        'non_seasonal_drop_forward': 10 if df['NonSeasonalDropForward'].sum() > 0 else 0,
        'seasonal_drop_backward': -15 if df['SeasonalDropBackward'].sum() > 0 else 0,
        'non_seasonal_drop_backward': 10 if df['NonSeasonalDropBackward'].sum() > 0 else 0
    }

    individual_scores = {key: val for key, val in scores.items() if val != 0}
    total_score = sum(individual_scores.values())

    for key, value in individual_scores.items():
        risk_factors.append({"factor": key, "severity": value})

    return min(total_score, 95), individual_scores

# Test cases
test_asins = ['B00ABQD6OK', 'B0D9YYCBSJ', 'B083Y69Z5C', 'B097TVQ1LY']  # Replace with actual ASINs for testing
results = []
# Iterate through each ASIN and test the function
for asin in test_asins:
    try:
        # Call the analyze_data function
        result = analyze_data(asin)
        results.append({"ASIN": asin, "Result": result})
        # Pretty-print the result for the current ASIN
        print(f"Results for ASIN: {asin}")
        print(json.dumps(result, indent=2))
    except Exception as e:
        # Append any errors encountered
        results.append({"ASIN": asin, "Error": str(e)})
        print(f"Error for ASIN {asin}: {str(e)}")

# Final list of results
print("Summary of Results:")
print(json.dumps(results, indent=2))


Results for ASIN: B00ABQD6OK
{
  "total_score": 95,
  "individual_scores": {
    "forward_quick_drops": 35,
    "forward_extra_large_drops": 20,
    "backward_quick_drops": 35,
    "backward_extra_large_drops": 20
  },
  "risk_factors": [
    {
      "factor": "forward_quick_drops",
      "severity": 35
    },
    {
      "factor": "forward_extra_large_drops",
      "severity": 20
    },
    {
      "factor": "backward_quick_drops",
      "severity": 35
    },
    {
      "factor": "backward_extra_large_drops",
      "severity": 20
    }
  ]
}
Results for ASIN: B0D9YYCBSJ
{
  "total_score": 70,
  "individual_scores": {
    "forward_quick_drops": 35,
    "backward_quick_drops": 35
  },
  "risk_factors": [
    {
      "factor": "forward_quick_drops",
      "severity": 35
    },
    {
      "factor": "backward_quick_drops",
      "severity": 35
    }
  ]
}
Results for ASIN: B083Y69Z5C
{
  "total_score": 70,
  "individual_scores": {
    "forward_quick_drops": 35,
    "backward_quick_drops"