### Schedule Generation

In [65]:
! ls ../../Dataset_Helping/A_Uni/

A_Uni_Base_JSON.json			 A_Uni_Structured.json
A_Uni_Structured_Generated_step_1.jsonl  A_Uni_Structured.jsonl
A_Uni_Structured_Generated_step_2.jsonl


In [66]:
import ast
import json
import random

with open("../../Dataset_Helping/names.txt", "r") as file:
    names = ' '.join([line.strip() for line in file.readlines()])
    names_list = list(ast.literal_eval(names))

# Load the shopping data from the JSON file.
# The file should have the structure as you described.
with open("../../Dataset_Helping/A_Uni/A_Uni_Base_JSON.json", "r") as f:
    shopping_data = json.load(f)

In [67]:

# Set random seed for reproducibility
random.seed(42)


def generate_tuples_for_category(category_data, shopping_type):
    """
    For a given category's data (a dict with keys "high_priced", "low_priced", "items_to_buy"),
    generate a list of 5 tuples:
       (shopping_type, item_to_buy, high_priced, low_priced)
    where the high/low pair is selected using a random permutation of indices 0–4.
    """
    indices = list(range(len(category_data["high_priced"])))  # typically [0,1,2,3,4]
    random.shuffle(indices)
    tuples_list = []
    for idx in indices:
        item_to_buy = random.choice(category_data["items_to_buy"])
        high = category_data["high_priced"][idx]
        low = category_data["low_priced"][idx]
        tup = (shopping_type, item_to_buy, high, low)
        tuples_list.append(tup)
    return tuples_list

def find_valid_list_prices(num_items=30, max_tries=10, max_price=3000):

    for _ in range(20):
        base_price_1 = list(range(0,num_items,2))
        random.shuffle(base_price_1)
        
        prices = []

        for i in base_price_1:
            k_min = 100*(i+1) - 50
            k_max = 100*(i+1) + 50
            
            for k in range(k_min, k_max, 10):
                
                if k > (max_price/2):
                    found = False
                    for _ in range(600):
                        l = (random.randint(100,  k) // 10) * 10
                        k_div_l = k / l
                        l_div_k = l / k
                        if (round(k_div_l, 2) == k / l) and (round(l_div_k, 2) == l / k) and k_div_l < 5 and l_div_k < 6 and l != k:
                            if (k,l) not in prices and (l,k) not in prices:
                                prices.append((k,l))
                                prices.append((l,k))
                                found = True
                                break
                    if found == True:
                        break
                if k < (max_price/2):
                    found = False
                    for _ in range(600):
                        l = (random.randint(k, max_price) // 10) * 10
                        l_div_k = l / k
                        k_div_l = k / l
                        if (round(l_div_k, 2) ==  l / k) and (round(k_div_l, 2) == k / l) and l_div_k < 3 and k_div_l < 3 and l != k:
                            if (k,l) not in prices and (l,k) not in prices:
                                prices.append((k,l))
                                prices.append((l,k))
                                found = True
                                break
                    if found == True:
                        break
        if len(prices) == num_items:
            random.shuffle(prices)
            list_price_1 = [item_1 for item_1, item_2 in prices]
            list_price_2 = [item_2 for item_1, item_2 in prices]
            return list_price_1, list_price_2
        
    print(len(list_price_1), len(list_price_2))
    raise Exception("Failed to generate valid list prices")


# Assume there are 50 users.
NUM_USERS = 50

# Generate a list of 50 unique user names.
users = random.sample(names_list, NUM_USERS)
# ---------------------------
# Generate tuples per category.
# ---------------------------
category_tuples = {}
for shopping_type, data in shopping_data.items():
    category_tuples[shopping_type] = generate_tuples_for_category(data, shopping_type)

# ---------------------------
# Generate Dataset for 50 Users
# ---------------------------
# For each user, combine tuples from all categories (7 * 5 = 35 total),
# then randomly select 30 tuples.
user_dataset = {}
num_items = 30
max_price = 3000
for user in users:
    user_dataset[user] = {"user": user, "shopping_info": []}
    all_tuples = []
    for shopping_type in category_tuples:
        all_tuples.extend(category_tuples[shopping_type])
    selected_tuples = random.sample(all_tuples, num_items)  # 30 tuples per user.
    list_price_1 , list_price_2 =  find_valid_list_prices(num_items, max_price) # select 30 prices for each user
    
    # Extend each tuple with its corresponding price pair (from list_price_1 and list_price_2).
    
    for idx, tup in enumerate(selected_tuples):
        shopping_info = {}
        # tup is (shopping_type, item_to_buy, high_priced, low_priced)
        shopping_type, item_to_buy, high_brand, low_brand = tup

        if list_price_1[idx] > list_price_2[idx]: # bought at high price
            mul = list_price_1[idx] / list_price_2[idx]
            item_was_bought = f"{item_to_buy} - {high_brand}"
            if mul < 2:
                if mul < 1:
                    raise Exception("mul is less than 1")
                mul = ((list_price_1[idx] - list_price_2[idx]) / list_price_2[idx]) * 100
                mul = round(mul, 2)
                if int(mul) == mul:
                    mul = int(mul)
                bought = (f"{item_to_buy} from {low_brand} is {list_price_2[idx]}",
                           f"{item_to_buy} from {high_brand} is {mul} percent more expensive than {item_to_buy} from {low_brand}",
                           f"bought {item_to_buy} from {high_brand}")
            else:
                mul = round(mul, 2)
                if int(mul) == mul:
                    mul = int(mul)
                bought = (f"{item_to_buy} from {low_brand} is {list_price_2[idx]}",
                          f"{item_to_buy} from {high_brand} is {mul} times more expensive than {item_to_buy} from {low_brand}",
                          f"bought{item_to_buy} from {high_brand}")
            high_price_brand = (high_brand, list_price_1[idx])
            low_price_brand = (low_brand, list_price_2[idx])
            
        else: # bought at low price
            mul = list_price_2[idx] / list_price_1[idx]
            item_was_bought = f"{item_to_buy} - {low_brand}"
            if mul < 2:
                if mul < 1:
                    raise Exception("mul is less than 1")
                mul = ((list_price_2[idx] - list_price_1[idx]) / list_price_2[idx]) * 100
                mul = round(mul, 2)
                if int(mul) == mul:
                    mul = int(mul)
                bought = (f"{item_to_buy} from {high_brand} is {list_price_2[idx]}",
                          f"{item_to_buy} from {low_brand} is {mul} percent less expensive than {item_to_buy} from {high_brand}",
                          f"bought {item_to_buy} from {low_brand}")
            else:
                mul = round(mul, 2)
                if int(mul) == mul:
                    mul = int(mul)
                bought = (f"{item_to_buy} from {high_brand} is {list_price_2[idx]}",
                          f"{item_to_buy} from {low_brand} is {mul} times less expensive than {item_to_buy} from {high_brand}",
                          f"bought {item_to_buy} from {low_brand}")
                
            high_price_brand = (low_brand, list_price_2[idx])
            low_price_brand = (high_brand, list_price_1[idx])
        new_tup = (shopping_type, item_to_buy, high_brand, low_brand , list_price_1[idx], list_price_2[idx])
        user_2 = random.choice([u for u in names_list if u != user])
        shopping_info = {"user_2": user_2,
                         "shopping_type": shopping_type,
                         "item_to_buy": item_to_buy,
                         "high_price_brand": high_price_brand,
                         "low_price_brand": low_price_brand,
                         "bought": bought,
                         "final_price": list_price_1[idx],
                         "final_shopping": item_was_bought}
    
        user_dataset[user]["shopping_info"].append(shopping_info)



In [68]:
# Example: print the tuples for the first user.
print("Tuples for", list(user_dataset.keys())[0])
print(user_dataset[users[0]].keys())
user_dataset[users[0]]

Tuples for Kaimana
dict_keys(['user', 'shopping_info'])


{'user': 'Kaimana',
 'shopping_info': [{'user_2': 'Yolanthe',
   'shopping_type': 'Furniture',
   'item_to_buy': 'bookshelf',
   'high_price_brand': ('Poltrona Frau', 1700),
   'low_price_brand': ("Bob's Discount Furniture", 850),
   'bought': ("bookshelf from Bob's Discount Furniture is 850",
    "bookshelf from Poltrona Frau is 2 times more expensive than bookshelf from Bob's Discount Furniture",
    'boughtbookshelf from Poltrona Frau'),
   'final_price': 1700,
   'final_shopping': 'bookshelf - Poltrona Frau'},
  {'user_2': 'Yuna',
   'shopping_type': 'Electronics',
   'item_to_buy': 'smart TV',
   'high_price_brand': ('Apple', 2900),
   'low_price_brand': ('Xiaomi', 1450),
   'bought': ('smart TV from Xiaomi is 1450',
    'smart TV from Apple is 2 times more expensive than smart TV from Xiaomi',
    'boughtsmart TV from Apple'),
   'final_price': 2900,
   'final_shopping': 'smart TV - Apple'},
  {'user_2': 'Quadee',
   'shopping_type': 'Sports Equipment',
   'item_to_buy': 'soccer 

In [69]:
# Save the dataset to a JSON file
output_file = '../../Dataset_Helping/A_Uni/A_Uni_Structured.json'
with open(output_file, 'w') as f:
    json.dump(user_dataset, f, indent=4)

# Save the dataset to a JSON file
output_file = '../../Dataset_Helping/A_Uni/A_Uni_Structured.jsonl'
with open(output_file, 'w') as f:
    for user, data in user_dataset.items():
        line = {'user': user, 'shopping_info': data['shopping_info']}
        f.write(json.dumps(line) + '\n')


## Merging Conversations and structured

In [1]:
import json 
# Load the structured data
structured_data_path = "../../Dataset_Helping/A_Uni/A_Uni_Structured.jsonl"

with open(structured_data_path, 'r', encoding='utf-8') as f:
    structured_data = [json.loads(line) for line in f]

print(f"Loaded {len(structured_data)} records from structured data file")

# Load the generated conversation data
generated_data_path = "../../Dataset_Helping/A_Uni/A_Uni_Structured_Generated_conversation.jsonl"

with open(generated_data_path, 'r', encoding='utf-8') as f:
    generated_data = [json.loads(line) for line in f]

print(f"Loaded {len(generated_data)} records from generated data file")


Loaded 50 records from structured data file


FileNotFoundError: [Errno 2] No such file or directory: '../../Dataset_Helping/A_Uni/A_Uni_Structured_Generated_conversation.jsonl'

Struture:

user_ID - user - user_2 - conversation - extra_info - question - answer

conversation is a list of 10 utterances
shipping info is a dictionary with keys ['shopping_type' , 'item_to_buy', 'prices', 'payment cost']
the question has the following format: What did 'user' bought that had the price of 'payment cost'?


In [71]:
# structured_data[i]

In [72]:
i = 0 

structured_data[i]['shopping_info'][i]

{'user_2': 'Yolanthe',
 'shopping_type': 'Furniture',
 'item_to_buy': 'bookshelf',
 'high_price_brand': ['Poltrona Frau', 1700],
 'low_price_brand': ["Bob's Discount Furniture", 850],
 'bought': ["bookshelf from Bob's Discount Furniture is 850",
  "bookshelf from Poltrona Frau is 2 times more expensive than bookshelf from Bob's Discount Furniture",
  'boughtbookshelf from Poltrona Frau'],
 'final_price': 1700,
 'final_shopping': 'bookshelf - Poltrona Frau'}

In [None]:
import ast
from datetime import datetime, timedelta
import random

dataset = []

# Generate a random date in 2024
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 12, 31)

x = 0
for i in range(len(structured_data)):
    user_ID = i
    user = structured_data[i]['user']
    shopping_info_list = structured_data[i]['shopping_info']

    for j in range(len(shopping_info_list)):
        user_2 = shopping_info_list[j]['user_2']
        if i*30 + j != x:
            print(i*30 + j, x)
        assert i*30 + j == x
        x += 1
        conversation = generated_data[int(i*20 + j)]
        conversation = ast.literal_eval(conversation.replace('\\', ''))

        random_days = random.randint(0, (end_date - start_date).days)
        message_date = start_date + timedelta(days=random_days)
        message_date = message_date.strftime("%Y-%m-%d")
        hour = random.randint(8, 17)
        minute = sorted(random.sample(range(0, 60), 10))

        conversation = [(f"{message_date} {hour:02d}:{minute[s]:02d}", conversation[s][0], conversation[s][1]) for s in range(len(conversation))]


        # user_response = generated_data[int(i*20 + j)]
        if conversation == '-':
            print(conversation, i*20 + j)
            raise Exception("conversation is '-'")
        
        question = f"What did {user} buy that had a price of ${shopping_info_list[j]['final_price']}?"
        dataset.append({
            "user_ID": user_ID,
            "user": user,
            "user_2": user_2,
            "conversation": conversation,
            "extra_info": {k:v for k,v in shopping_info_list[j].items() if k in ['shopping_type', 'item_to_buy', 'high_price_brand', 'low_price_brand']},
            "question": question,
            "answer": shopping_info_list[j]['final_shopping']
        })


In [89]:
with open('../../Data/A_Uni.jsonl', 'w', encoding='utf-8') as f:
    for item in dataset:
        json.dump(item, f, ensure_ascii=False)
        f.write('\n')