In [1]:
import mysql.connector
import os
import requests
from dotenv import load_dotenv
import time
import asyncio
import aiohttp
import nest_asyncio  # This allows nesting of event loops in Jupyter

# Apply nest_asyncio to allow nested event loops in environments like Jupyter
nest_asyncio.apply()

# Load environment variables
load_dotenv()

# MySQL credentials
mysql_host = os.getenv("MYSQL_HOST", "127.0.0.1")
mysql_port = int(os.getenv("MYSQL_PORT", 3306))
mysql_user = os.getenv("MYSQL_USER")
mysql_password = os.getenv("MYSQL_PASSWORD")
mysql_database = os.getenv("MYSQL_DATABASE")

# Universalis API Base URL
UNIVERSALIS_BASE = "https://universalis.app/api/v2/"

# Server to fetch data from (adjust as needed)
SERVER = "Louisoix"

# Connect to MySQL
try:
    conn = mysql.connector.connect(
        host=mysql_host,
        port=mysql_port,
        user=mysql_user,
        password=mysql_password,
        database=mysql_database
    )
    cursor = conn.cursor()
    print("Connected to MySQL successfully.")
except mysql.connector.Error as err:
    print(f"Connection failed: {err}")
    exit()  # Stop script if connection fails

# Fetch all marketable item IDs from items table
cursor.execute("SELECT item_id FROM items")
item_ids = [row[0] for row in cursor.fetchall()]

# Step 1: Asynchronous function to fetch and store market data and sales history
async def fetch_and_store_data(session, item_id):
    try:
        url = f"{UNIVERSALIS_BASE}{SERVER}/{item_id}"
        async with session.get(url) as response:
            if response.status != 200:
                return  # Skip items with errors

            data = await response.json()

            # Market Data
            if "listings" in data and data["listings"]:
                current_price = data["listings"][0]["pricePerUnit"]
                last_upload_time = data["lastUploadTime"]

                cursor.execute("""
                    INSERT INTO market_data (item_id, price, last_upload_time)
                    VALUES (%s, %s, FROM_UNIXTIME(%s))
                    ON DUPLICATE KEY UPDATE price = VALUES(price), last_upload_time = VALUES(last_upload_time)
                """, (item_id, current_price, last_upload_time))

            # Sales History
            if "recentHistory" in data:
                for sale in data["recentHistory"]:
                    timestamp = sale["timestamp"]
                    quantity = sale["quantity"]
                    price_per_unit = sale["pricePerUnit"]

                    cursor.execute("""
                        INSERT IGNORE INTO sales_history (item_id, price, quantity, timestamp)
                        VALUES (%s, %s, %s, FROM_UNIXTIME(%s))
                    """, (item_id, price_per_unit, quantity, timestamp))

            conn.commit()

    except Exception as e:
        return  # Skip items that cause an error

# Step 2: Asynchronous function to process multiple items concurrently
async def fetch_and_process_in_parallel():
    async with aiohttp.ClientSession() as session:
        tasks = []
        
        # Create a list of tasks for all item requests
        for item_id in item_ids:
            tasks.append(fetch_and_store_data(session, item_id))
        
        # Run all tasks concurrently
        await asyncio.gather(*tasks)

# Step 3: Run the asyncio event loop directly in Jupyter (using nest_asyncio)
await fetch_and_process_in_parallel()

# Close MySQL connection
cursor.close()
conn.close()
print("Done updating Universalis market data.")


Connected to MySQL successfully.
Done updating Universalis market data.
