In [1]:
import pandas as pd
import re
import json
import boto3
import awswrangler as wr
import concurrent.futures
from math import ceil

In [None]:
# Pandas version
import boto3
print(pd.__version__)

# Boto3 version
print(boto3.__version__)

## Extract Data

In [3]:
# Directory containing the CSV files
# dataset_dir = '../dataset_large/amazon_co-ecommerce_sample.csv'

# Si quieren practicar:
dataset_dir = '../dataset/sample_raw_productos_por_categoria_10_total_1343_productos.csv'

# Read the file, assuming the first line is a header
df = pd.read_csv(dataset_dir, header=0, sep=',', quotechar='"')

## Transform Data

In [4]:
# Drop columns:
df = df.drop(["customers_who_bought_this_item_also_bought", 
              "items_customers_buy_after_viewing_this_item",
              "customer_questions_and_answers",
              "number_of_answered_questions",
              "sellers",
              "description"], axis=1)

In [None]:
# Group by amazon_category_and_sub_category
distinct_counts = df.groupby('amazon_category_and_sub_category')['amazon_category_and_sub_category'].nunique().count()

# Display the result
print(distinct_counts)

In [6]:
# Split the category and subcategories
category_and_subcategories = df['amazon_category_and_sub_category'].str.split(' > ', expand=True)

# Determine the number of subcategories
num_subcategories = category_and_subcategories.shape[1]

# Create the new columns
col_name = f'subcategory_1'
for i in range(num_subcategories):
    df[col_name] = category_and_subcategories[i]
    col_name = f'subcategory_{i+1}'

# Assign the first column as the 'category'
df['category'] = category_and_subcategories[0]

# Drop columns:
df = df.drop(["amazon_category_and_sub_category"], axis=1)

In [7]:
# If category and subcategory_1 are null, fill it with "others"
df["category"] = df["category"].fillna("others")
df["subcategory_1"] = df["subcategory_1"].fillna("others")

In [None]:
# Group by 'sub_category' and count distinct values in 'name'
distinct_counts = df.groupby('subcategory_1')['subcategory_1'].nunique().count()

# Display the result
print(distinct_counts)

In [None]:
df.groupby('subcategory_1')['subcategory_1'].nunique()

In [None]:
df.head()

In [11]:
# Leave only the Review rate, removing the "average_review_rating"
df['average_review_rating'] = df['average_review_rating'].str.replace(' out of 5 stars', '')

In [12]:
# Remove GBP symbol from price
df['price'] = df['price'].str.replace('£', '')

In [13]:
# Remove " new" from column "number_available_in_stock"
df['number_available_in_stock'] = df['number_available_in_stock'].str.extract(r'(\d+)', expand=False)

In [14]:
# There's a lot of garbage in some product_information values; e.g. "...Customer Reviews amznJQ.onReady(..."
df['product_information'] = df['product_information'].str.replace(r'amznJQ.*', '', regex=True)

# Same for column "description", after a string> #productDescription
df['product_description'] = df['product_description'].str.replace(r'#productDescription*', '', regex=True)

In [15]:
# As reviews can be quite long, we just get some of these:
df['customer_reviews'] = df['customer_reviews'].str.slice(0, 600)

In [16]:
# Rename column uniq_id for product_id
df = df.rename(columns={'uniq_id': 'product_id'})

In [None]:
# Count some value
df['subcategory_1'].value_counts()['Pencils']

### Begin: Creamos subsets para pruebas, enfocado a usuarios con poca RAM, CPU, etc

> No podemos hacer un df.head(), ya que los datos están ordenados. Por ej, los primeros 1000 rows son Trenes

In [18]:
def sample_n_per_category(df, category_column, n):
    # Create a list to store the sampled data
    sampled_data = []

    # Iterate through each unique category
    for category in df[category_column].unique():
        # Get the subset of data for this category
        category_data = df[df[category_column] == category]

        # Sample n rows or all if less than n
        sampled = category_data.sample(min(len(category_data), n))

        # Append to our list
        sampled_data.append(sampled)

    # Concatenate all the sampled data
    result = pd.concat(sampled_data, ignore_index=True)

    return result

In [19]:
# Set the number of rows you want per subcategory
N = 100

# Create the new subset
subset_df = sample_n_per_category(df, 'subcategory_1', N)

# Write to local
total_rows = len(subset_df)
output_path = f"../dataset/subset_productos_por_categoria_{N}_total_{total_rows}_productos.csv"
subset_df.to_csv(output_path, index=False)

# Check the distribution of subcategories
subcategory_counts = subset_df['subcategory_1'].value_counts()
print("\nSubcategory distribution:")
print(subcategory_counts)

#### End: Creamos subsets para pruebas, enfocado a usuarios con poca RAM, CPU, etc

---
## Writing data

> Creamos un nuevo DF, acorde al número de productos que queremos cargar en el Knowledge Base

In [20]:
# Directory containing the CSV files
dataset_dir = '../dataset/subset_productos_por_categoria_100_total_4476_productos.csv'

# Read the CSV lines, assuming the first line is a header
df = pd.read_csv(dataset_dir, header=0, sep=',', quotechar='"')

---
> Opcional: Creamos una lista con los distintos subcategorías, para añadirlos a nuestra streamlit app:

In [21]:
# Get distinct values from the 'subcategory_1' column, sort them, and convert to a list
distinct_values = sorted(df['subcategory_1'].unique().tolist())

# Function to chunk the list into groups of 5
def chunk_list(lst, chunk_size):
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

# Chunk the distinct values into groups of 5
chunked_values = chunk_list(distinct_values, 5)

# Write to a local file
with open('subcategory_1_values.txt', 'w') as file:
    file.write('[\n')
    for i, chunk in enumerate(chunked_values):
        formatted_chunk = ", ".join(f'"{value}"' for value in chunk)
        if i < len(chunked_values) - 1:
            file.write('    ' + formatted_chunk + ',\n')
        else:
            file.write('    ' + formatted_chunk + '\n')
    file.write(']')

---

> Escribimos datos a S3, a partir del nuevo DF cargado

In [22]:
# S3 settings
s3_bucket = "genai-carlos-contreras-bucket-data-quarks-labs-oregon-01"
s3_key = "datasets/demo_kb/knowledge-base-ecommerce-s3-001/v1"

In [23]:
# Set SDK
s3_client = boto3.client('s3')

#### Importante:

- Este método usa paralelismo. El número de threads se puede cambiar para datasets mayores.

- Aquí hacemos el chunking manualmente, almacenando un archivo por subcategory_1 (columna creada por nosotros)

- Análisis hecho: 10 rows por archivo parece llegar al límite de Chunk Size para Titan Embeddings V2

In [24]:
def write_csv_to_s3(args):
    category, sub_category, s3_bucket, s3_key, num_rows_per_file = args
    
    # Format names
    file_category = re.sub(r"[,\s&']", "_", category)
    file_subcategory = re.sub(r"[,\s&']", "_", sub_category)

    try:
            
        # Calculate the number of files needed
        subset = df[(df['category'] == category) & (df['subcategory_1'] == sub_category)]
        num_rows = len(subset)
        num_files = ceil(num_rows / num_rows_per_file)

        # Create the files
        for i in range(num_files):
            start_row = i * num_rows_per_file
            end_row = min((i + 1) * num_rows_per_file, num_rows)
            file_name = f"{file_category}_{file_subcategory}_{i+1}.csv"
            full_path_file_name = f"s3://{s3_bucket}/{s3_key}/{file_name}"

            # Write the CSV file
            # Optional, without WR: subset.iloc[start_row:end_row].to_csv(full_path_file_name, index=False)
            df_output = subset.iloc[start_row:end_row]
            wr.s3.to_csv(df_output, full_path_file_name, index=False)

            # Write Metadata Filter files
            file_metadata = {
                "metadataAttributes": {
                    "category" : category,
                    "subcategory_1" : sub_category,
                    "file_part" : i+1,
                    "total_files" : num_files
                }
            }
            
            # Metadata File
            s3_metadata_file = f"{s3_key}/{file_name}.metadata.json"

            # Write JSON metadata to S3. Do not return "response", as it's too much for logging/printing
            response = s3_client.put_object(Bucket=s3_bucket, Key=s3_metadata_file, Body=json.dumps(file_metadata))

        return None
    
    except Exception as e:
        print(f"Error while writing data: {e}")
        return None

In [None]:
# Define params. Reduce threads according to the environment and dataset size
num_rows_per_file = 10
local_number_of_threads = 30

with concurrent.futures.ThreadPoolExecutor(max_workers=local_number_of_threads) as executor:
    tasks = [(category, sub_category, s3_bucket, s3_key, num_rows_per_file) for category, sub_category in df[['category', 'subcategory_1']].drop_duplicates().itertuples(index=False)]
    executor.map(write_csv_to_s3, tasks)