In [3]:
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float, DateTime, ForeignKey, text
from snowflake.sqlalchemy import URL
import os
from dotenv import load_dotenv
import logging
import sys

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# Load environment variables
load_dotenv()

# Get Snowflake connection details from environment variables
def get_snowflake_engine():
    logger.info("Attempting to create Snowflake engine.")
    try:
        engine = create_engine(URL(
            user=os.getenv("MY_SNOWFLAKE_USER"),
            password=os.getenv("MY_SNOWFLAKE_PASSWORD"),
            account=os.getenv("MY_SNOWFLAKE_ACCOUNT"),
            database=os.getenv("SNOWFLAKE_DATABASE"),
            schema=os.getenv("SNOWFLAKE_SCHEMA"),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
            role=os.getenv("SNOWFLAKE_ROLE"),
        ))
        logger.info("Snowflake engine created successfully.")
        return engine
    except Exception as e:
        logger.error("Error creating Snowflake engine: %s", e)
        sys.exit(1)

def __run_query(query, engine = get_snowflake_engine(), update = False):
    connection = engine.connect()
    logging.info("Connected to Snowflake")
    
    try:
        # connection.execute(text("USE ASG_4_P2.PUBLIC;"))
        # logging.info("Switched to ASG_4_P2.PUBLIC")
        
        # Make parameterized query such that no DELETE / UPDATE queries can be run
        query = text(query)
        logger.info(f"Executing query: {query}")
        result = connection.execute(query).fetchall()
        if update:
            connection.commit()
            logging.info("Transaction committed.")
        try:
            result_df = pd.DataFrame(result)
            logging.info("Query executed successfully and results fetched")
            return result_df
        except Exception as e:
            logging.error(f"Error converting query results to DataFrame: {e}")
            return None
    except Exception as e:
        logging.error(f"Error executing query: {e}")
        return None
    finally:
        connection.close()
        logging.info("Connection closed")

def update_churn_flag(churned_cust_ids, non_churned_customer_ids):
    update_query_churn = f"""
    UPDATE "Customers"
    SET churn_flag = TRUE
    WHERE customer_id IN ({', '.join(f"'{customer_id}'" for customer_id in churned_cust_ids)});
    """
    __run_query(update_query_churn, update=True)
    
    update_query_non_churn = f"""
    UPDATE "Customers"
    SET churn_flag = TRUE
    WHERE customer_id IN ({', '.join(f"'{customer_id}'" for customer_id in non_churned_customer_ids)});
    """
    __run_query(update_query_non_churn, update=True)

    

  functions.register_function("flatten", flatten)
2024-08-10 19:51:31,279 - INFO - Attempting to create Snowflake engine.
2024-08-10 19:51:31,324 - INFO - Snowflake engine created successfully.


In [44]:

customers = __run_query('SELECT * FROM "Customers";')
orders = __run_query('SELECT * FROM "Orders";')

2024-08-04 11:36:20,290 - INFO - Snowflake Connector for Python Version: 3.12.0, Python Version: 3.10.12, Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35
2024-08-04 11:36:20,301 - INFO - Connecting to GLOBAL Snowflake domain
2024-08-04 11:36:20,303 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2024-08-04 11:36:21,079 - INFO - Number of results in first chunk: 1
2024-08-04 11:36:21,255 - INFO - Number of results in first chunk: 1
2024-08-04 11:36:21,256 - INFO - Connected to Snowflake
2024-08-04 11:36:21,579 - INFO - Number of results in first chunk: 0
2024-08-04 11:36:24,162 - INFO - Query executed successfully and results fetched
2024-08-04 11:36:24,339 - INFO - Number of results in first chunk: 1
2024-08-04 11:36:24,339 - INFO - Connection closed
2024-08-04 11:

# Calculate Churn

In [4]:
def calculate_churn_rate(orders, customers, churn_period_days=200):
    # Convert order_purchase_timestamp to datetime
    orders['order_purchase_timestamp'] = pd.to_datetime(orders['order_purchase_timestamp'])

    # Determine the reference date (e.g., the latest order date in the dataset)
    reference_date = orders['order_purchase_timestamp'].max()

    # Calculate the last purchase date for each customer
    last_purchase_date = orders.groupby('customer_id')['order_purchase_timestamp'].max().reset_index()

    # Identify churned customers: those who haven't purchased within the churn period
    churned_customers = last_purchase_date[
        last_purchase_date['order_purchase_timestamp'] < reference_date - pd.Timedelta(days=churn_period_days)
    ]

    # Identify non-churned customers: those who have purchased within the churn period
    non_churned_customers = last_purchase_date[
        last_purchase_date['order_purchase_timestamp'] >= reference_date - pd.Timedelta(days=churn_period_days)
    ]

    # Convert customer IDs to lists
    churned_customer_ids = churned_customers['customer_id'].tolist()
    non_churned_customer_ids = non_churned_customers['customer_id'].tolist()

    # Count the total number of unique customers
    total_customers = customers['customer_id'].nunique()

    # Count the number of churned customers
    num_churned_customers = len(churned_customer_ids)

    # Calculate churn rate
    churn_rate = (num_churned_customers / total_customers) * 100

    # Output the results
    logger.info(f"Total Customers: {total_customers}")
    logger.info(f"Churned Customers: {num_churned_customers}")
    logger.info(f"Churn Rate: {churn_rate:.2f}%")

    return churned_customer_ids, non_churned_customer_ids

In [50]:
churned_cust_ids, non_churned_customer_ids = calculate_churn_rate(orders, customers)

2024-08-04 11:55:48,648 - INFO - Total Customers: 99441
2024-08-04 11:55:48,649 - INFO - Churned Customers: 66569
2024-08-04 11:55:48,649 - INFO - Churn Rate: 66.94%


In [53]:
update_churn_flag(churned_cust_ids, non_churned_customer_ids)

2024-08-04 12:01:49,235 - INFO - Snowflake Connector for Python Version: 3.12.0, Python Version: 3.10.12, Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35
2024-08-04 12:01:49,236 - INFO - Connecting to GLOBAL Snowflake domain
2024-08-04 12:01:49,237 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2024-08-04 12:01:49,883 - INFO - Number of results in first chunk: 1
2024-08-04 12:01:50,025 - INFO - Number of results in first chunk: 1
2024-08-04 12:01:50,026 - INFO - Connected to Snowflake
2024-08-04 12:02:03,469 - INFO - Number of results in first chunk: 1
2024-08-04 12:02:03,470 - INFO - Transaction committed.
2024-08-04 12:02:03,471 - INFO - Query executed successfully and results fetched
2024-08-04 12:02:03,683 - INFO - Number of results in first chunk: 1
2024-08-0

# Get Churn Status by cust_id

In [11]:
#Query to get the churn flag for a customer
def get_churn_flag(customer_id):
    sql_query = f'''select churn_flag from "Customers" where customer_id = '{customer_id}';'''
    return __run_query(sql_query)

#Query to get the min and max price of a product
def __get_min_max_price(product_id):
    sql_query = f'''SELECT 
        MAX(price) AS max_price,
        MIN(price) AS min_price
    FROM 
        "Order_Items"
    GROUP BY 
        product_id
    having product_id = '{product_id}';'''
    return __run_query(sql_query)

#Function to get the price for product according to churn flag
def get_price(product_id, churn_flag):
    min_max_price = __get_min_max_price(product_id)
    if churn_flag:
        return min_max_price['min_price'][0]
    else:
        return min_max_price['max_price'][0]



In [13]:
customer_id = "00012a2ce6f8dcda20d059ce98491703"
print(get_price("4244733e06e7ecb4970a6e2683c13e61", get_churn_flag(customer_id)['churn_flag'][0]))

2024-08-06 00:01:23,386 - INFO - Connected to Snowflake
2024-08-06 00:01:23,388 - INFO - Executing query: select churn_flag from "Customers" where customer_id = '00012a2ce6f8dcda20d059ce98491703';
2024-08-06 00:01:23,740 - INFO - Number of results in first chunk: 1
2024-08-06 00:01:23,742 - INFO - Query executed successfully and results fetched
2024-08-06 00:01:23,834 - INFO - Number of results in first chunk: 1
2024-08-06 00:01:23,835 - INFO - Connection closed
2024-08-06 00:01:23,836 - INFO - Connected to Snowflake
2024-08-06 00:01:23,836 - INFO - Executing query: SELECT 
        MAX(price) AS max_price,
        MIN(price) AS min_price
    FROM 
        "Order_Items"
    GROUP BY 
        product_id
    having product_id = '4244733e06e7ecb4970a6e2683c13e61';
2024-08-06 00:01:23,941 - INFO - Number of results in first chunk: 1
2024-08-06 00:01:23,943 - INFO - Query executed successfully and results fetched
2024-08-06 00:01:24,032 - INFO - Number of results in first chunk: 1
2024-08-06

55.9


## Update the products table

In [25]:
import os
from dotenv import load_dotenv
import requests
import base64
import json

# Load environment variables from .env file
load_dotenv()

# Get the environment variables
CLIENT_ID = os.getenv('EBAY_APP_ID')
CLIENT_SECRET = os.getenv('EBAY_CERT_ID')
OAUTH_URL = 'https://api.ebay.com/identity/v1/oauth2/token'

# Encode the client ID and client secret
credentials = base64.b64encode(f'{CLIENT_ID}:{CLIENT_SECRET}'.encode('utf-8')).decode('utf-8')

headers = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Authorization': f'Basic {credentials}'
}

data = {
    'grant_type': 'client_credentials',
    'scope': 'https://api.ebay.com/oauth/api_scope'
}

response = requests.post(OAUTH_URL, headers=headers, data=data)
if response.status_code == 200:
    access_token = response.json()['access_token']
    # print(f'Access Token: {access_token}')
else:
    print(f'Error: {response.status_code}')
    print(response.json())



In [30]:
import requests
import json

def search_ebay_items(params, access_token=access_token):
    endpoint = 'https://api.ebay.com/buy/browse/v1/item_summary/search'
    
    # Set up the request headers
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json',
    }
    
    # Make the API request
    response = requests.get(endpoint, headers=headers, params=params)
    
    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        items = data.get('itemSummaries', [])

        q_param = params.get('q', 'results').replace(' ', '_')
        file_name = f'data_ebay/ebay_browse_search_results_{q_param}_2.json'
        
        # Print and save the response
        # with open(file_name, 'w') as file:
        #     json.dump(items, file, indent=4)
        #     print(f'Saved {len(items)} items to {file_name}')
        print(len(items))
        return items
    else:
        print(f'Error: {response.status_code}')
        print(response.json())

def get_category_id_by_name(category_name, access_token=access_token):
    # Define the endpoint and headers
    url = 'https://api.ebay.com/commerce/taxonomy/v1/category_tree/0/get_category_suggestions'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json',
    }
    
    # Define the query parameters
    params = {
        'q': category_name  # The name of the category you want to search for
    }
    
    # Make the GET request
    response = requests.get(url, headers=headers, params=params)
    
    # Check the response status and parse the response
    if response.status_code == 200:
        data = response.json()
        # formatted_json = json.dumps(data, indent=4)
        # print(formatted_json)
        categories = data.get('categorySuggestions')
        
        # Print out category names and IDs
        for category in categories:
            print(f"Category Name: {category['category']['categoryName']}, Category ID: {category['category']['categoryId']}")
        return categories
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return None
    
def call_ebay_api_by_price(q, min_price, max_price):
    params = {
        'q': q,
        'filter': f'price:[{min_price}..{max_price}]',
        'limit': 1
    }
    search_ebay_items(params)

In [12]:
categories = __run_query('SELECT DISTINCT PRODUCT_CATEGORY_NAME_ENGLISH  FROM "Product_Category_Translation";')['product_category_name_english'].tolist()

2024-08-12 03:51:45,099 - INFO - Connected to Snowflake
2024-08-12 03:51:45,102 - INFO - Executing query: SELECT DISTINCT PRODUCT_CATEGORY_NAME_ENGLISH  FROM "Product_Category_Translation";
2024-08-12 03:51:45,203 - INFO - Number of results in first chunk: 71
2024-08-12 03:51:45,204 - INFO - Query executed successfully and results fetched
2024-08-12 03:51:45,263 - INFO - Number of results in first chunk: 1
2024-08-12 03:51:45,264 - INFO - Connection closed


In [34]:

print(call_ebay_api_by_price(i, 70, 80))


Saved 1 items to data_ebay/ebay_browse_search_results_construction_tools_construction_2.json
1
None


In [19]:
categories[1]

'computers_accessories'