In [1]:
import os
import psutil
import re
import subprocess
import threading
import time
import json
import requests
import pandas as pd
from appium import webdriver
from selenium.webdriver.common.actions.pointer_input import PointerInput
from selenium.webdriver.common.actions.action_builder import ActionBuilder
from selenium.webdriver.common.actions import interaction
from selenium.webdriver.common.action_chains import ActionChains
from appium.webdriver.common.appiumby import AppiumBy
from appium.options.common.base import AppiumOptions
from appium import webdriver
from appium.options.android import UiAutomator2Options
from time import sleep
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from datetime import datetime
import queue
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

ldconsole_path = 'ldconsole.exe'

def find_process_by_port(port):
    for conn in psutil.net_connections(kind='inet'):
        if conn.laddr.port == port:
            try:
                return psutil.Process(conn.pid).pid
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
    return None

coin_data_queue = queue.Queue()
coin_file_lock = threading.Lock()
COIN_FILENAME = "coin_balance.xlsx"
coin_consumer_thread:threading.Thread = None

def coin_balance_consumer():
    while True:
        instance_name, package_name, balance = coin_data_queue.get()
        if instance_name is None:
            break
        save_coin_balance(instance_name, package_name, balance, COIN_FILENAME)
        coin_data_queue.task_done()

def save_to_queue(instance_name: str, package_name: str, balance: str):
    coin_data_queue.put((instance_name, package_name, balance))

def save_coin_balance(instance_name: str, package_name: str, balance: str, filename: str = "coin_balance.xlsx"):
    package_column = f"{instance_name} - Package Name"
    balance_column = f"{instance_name} - Balance"
    updated_at_column = f"{instance_name} - Updated At"
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    with coin_file_lock:
        if os.path.exists(filename):
            df = pd.read_excel(filename)
        else:
            df = pd.DataFrame()
        if package_column not in df.columns:
            df[package_column] = pd.Series(dtype=object)
            df[balance_column] = pd.Series(dtype=object)
            df[updated_at_column] = pd.Series(dtype=object)
        if package_name in df[package_column].values:
            row_index = df[df[package_column] == package_name].index[0]
            df.at[row_index, balance_column] = float(balance)
            df.at[row_index, updated_at_column] = current_time
        else:
            num_entries_in_instance = df[package_column].count()
            df.at[num_entries_in_instance, package_column] = package_name
            df.at[num_entries_in_instance, balance_column] = float(balance)
            df.at[num_entries_in_instance, updated_at_column] = current_time
        df.to_excel(filename, index=False)

def start_consumer_thread():
    global coin_consumer_thread
    coin_consumer_thread = threading.Thread(target=coin_balance_consumer, daemon=True)
    coin_consumer_thread.start()

def stop_consumer_thread():
    global coin_consumer_thread
    coin_data_queue.put((None, None, None))
    coin_consumer_thread.join()


def list_ldplayer_instances():
    command = f'"{ldconsole_path}" list2'
    result = subprocess.run(command, stdout=subprocess.PIPE, shell=True)
    instances = result.stdout.decode().splitlines()
    with open('config.json') as f:
        config = json.load(f)
        config['instances_index'] = config['instances_index'].split(',')

    appium_port = 4723
    system_port = 8200
    adb_port = 5556

    instance_names = []
    for line in instances:
        line = line.split(',')
        if line[0] not in config['instances_index']:
            continue
        instance_names.append({
            "index": line[0],
            "name": line[1],
            "status": line[5] != "-1",
            "appium_port": appium_port,
            "system_port": system_port,
            "adb_port": adb_port,
        })
        appium_port += 1
        system_port += 1
        adb_port += 1
    return instance_names


def launch_ldplayer_instance_by_name(instance_name, adb_port):
    subprocess.Popen(f'"{ldconsole_path}" launch --name {instance_name} --adb-port {adb_port}')
    # subprocess.Popen(f'"{ldconsole_path}" launch --name {instance_name}', creationflags=0x00000008)

def launch_ldplayer_instance_by_index(instance_index, adb_port):
    subprocess.Popen(f'"{ldconsole_path}" launch --index {instance_index} --adb-port {adb_port}')
    # subprocess.Popen(f'"{ldconsole_path}" launch --index {instance_index}', creationflags=0x00000008)

def quit_ldplayer_instance_by_name(instance_name):
    subprocess.Popen(f'"{ldconsole_path}" quit --name {instance_name}')

def quit_ldplayer_instance_by_index(instance_index):
    subprocess.Popen(f'"{ldconsole_path}" quit --index {instance_index}')

def list_adb_devices(adb_port):
    result = subprocess.run(f'adb devices', stdout=subprocess.PIPE, shell=True)
    devices = result.stdout.decode().splitlines()
    device_ids = [line.split('\t')[0] for line in devices if '\tdevice' in line]
    return device_ids

def wait_for_new_LDPlayer_instance_to_appear_as_a_device(adb_port, timeout=60):
    start_time = time.time()
    logging.info("Waiting for the new LDPlayer instance to appear as a device...")
    initial_devices = set(list_adb_devices(adb_port))
    while time.time() - start_time < timeout:
        current_devices = set(list_adb_devices(adb_port))
        new_devices = current_devices - initial_devices
        if new_devices:
            device_id = list(new_devices)[0]
            logging.info(f"LDPlayer instance is running with device ID: {device_id}")
            return device_id
        time.sleep(2)
    raise Exception('time out waiting for LDPlayer instance to launch')

# Function to wait for device to be fully booted
def wait_for_device_ready(device_id, adb_port, timeout=60):
    start_time = time.time()
    while time.time() - start_time < timeout:
        # Check if device is online and fully booted
        boot_completed = subprocess.run(f'adb -s {device_id} shell getprop sys.boot_completed',
                                        stdout=subprocess.PIPE, shell=True)
        if '1' in boot_completed.stdout.decode().strip():
            logging.info(f"Device {device_id} is fully booted.")
            return True
        logging.info(f"Waiting for device {device_id} to boot...")
        time.sleep(5)  # Wait for 5 seconds before checking again
    raise Exception('device could not be ready in time')

def list_installed_plato(device_id, adb_port):
    result = subprocess.run(f'adb -s {device_id} shell pm list packages',
                                        stdout=subprocess.PIPE, shell=True)
    packages = result.stdout.decode().splitlines()

    return [x.replace('package:', '') for x in packages if x.startswith('package:com.plato.')]

# Function to start Appium session on a specific device
def start_appium_session(appium_port, system_port, adb_port, device_id, packageName, app_activity):
    desired_caps = {
        'platformName': 'Android',                   # Platform name (Android)
        # 'platformVersion': '11.0',                   # Replace with your emulator's Android version
        'deviceName': device_id,
        'systemPort': system_port,
        'udid': device_id,
        # 'adbPort': adb_port,
        # 'app': '/path/to/your/app.apk',              # Replace with the actual APK path if installing
        'appPackage': packageName,                  # Replace with your app's package name
        'appActivity': app_activity,                # Replace with your app's main activity
        'automationName': 'UiAutomator2',            # Automation engine
        'autoGrantPermissions': True,                # Auto grant permissions on app install
        'noReset': True,                             # Do not reset app state after each session
        'fullReset': False,                          # Do not uninstall app between sessions
        'newCommandTimeout': 600,                    # Timeout in seconds
        'adbExecTimeout': 30000,                    # Timeout in milliseconds
        # 'avd': 'Your_AVD_Name',                      # AVD (Android Virtual Device) name if using a specific emulator
    }

    driver = webdriver.Remote(f'http://localhost:{appium_port}', options=UiAutomator2Options().load_capabilities(desired_caps))
    return driver

def run_appium_server(appium_server_port):
    while 1:
        command = f'start cmd /c "appium --relaxed-security -p {appium_server_port} && exit"'
        process = subprocess.Popen(command, shell=True)
        server_url = f'http://localhost:{appium_server_port}/status'
        start_time = time.time()
        timeout = 30
        while time.time() - start_time < timeout:
            try:
                response = requests.get(server_url)
                if response.status_code == 200:
                    logging.info("Appium server is running!")
                    return process
            except Exception:
                pass
            time.sleep(1)
    raise Exception("Appium server failed to start within the timeout period.")

def stop_appium_server(appium_server_port):
    pid = find_process_by_port(appium_server_port)
    if pid:
        try:
            process = psutil.Process(pid)
            process.terminate()
            process.wait(timeout=5)
            logging.info(f"Appium server on port {appium_server_port} has been stopped.")
        except Exception as e:
            logging.error(f"Failed to stop Appium server: {e}")
    else:
        logging.info(f"No process found on port {appium_server_port}")


In [2]:

def is_game_favorite(d: webdriver.Remote, game_name:str):
    home_tab = WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (By.ID, 'plato_tab_home')))
    home_tab.click()
    fs = WebDriverWait(d, 30).until(EC.visibility_of_element_located(
        (By.ID, 'favorites_recycler_view')))
    f = fs.find_elements(By.CLASS_NAME, 'android.view.ViewGroup')[0]
    c_f = f.find_element(By.ID, 'title_text_view').text
    if c_f.lower() == game_name.lower():
        return True
    return False

def select_game(d: webdriver.Remote, game_name: str):
    game_tab = WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (By.ID, 'plato_image_games')))
    game_tab.click()
    game_button = WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (By.ID, 'game_list_item_container')))
    found_games = []
    while 1:
        try:
            games_button = d.find_elements(
                By.ID, "game_list_item_container")
            found_new = False
            for game in games_button:
                game_title = game.find_element(By.ID, 'game_list_item_title').text
                if game_title not in found_games:
                    if game_name.lower() == game_title.lower():
                        game.click()
                        return True
                    found_games.append(game_title)
                    found_new = True
            if not found_new:
                break
            d.press_keycode(20)
            d.press_keycode(20)
            d.press_keycode(20)
        except Exception as e:
            pass
        # d.swipe(150, 400, 150, 100, 100)
    return False

def toggle_favorite(d:  webdriver.Remote):
    WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (AppiumBy.ACCESSIBILITY_ID, 'Added to Favorites'))).click()

def get_coins(d: webdriver.Remote):
    WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (By.ID, 'plato_tab_shop'))).click()
    balance = WebDriverWait(d, 10).until(EC.visibility_of_element_located(
        (By.ID, 'wallet_view_balance'))).text
    return balance



In [3]:
launch_instance_appium_server_lock = threading.Lock()

def launch_instance(instance: dict):
    instance_index = instance["index"]
    instance_name = instance["name"]
    instance_adb_port = instance["adb_port"]
    with launch_instance_appium_server_lock:
        while 1:
            try:
                logging.info(f"Launching LDPlayer instance: {instance_name}")
                launch_ldplayer_instance_by_index(instance_index, instance_adb_port)
                device_id = wait_for_new_LDPlayer_instance_to_appear_as_a_device(instance_adb_port)
                wait_for_device_ready(device_id, instance_adb_port)
                return device_id
            except Exception as e:
                logging.error(f"failed to launch LDPlayer instance: {instance_name} --------------")
                try:
                    quit_ldplayer_instance_by_index(instance_index)
                except:
                    pass


In [4]:
instances = list_ldplayer_instances()


In [5]:
instance = instances[1]
instance

{'index': '1',
 'name': 'B',
 'status': False,
 'appium_port': 4724,
 'system_port': 8201,
 'adb_port': 5557}

In [6]:

app_activity = 'com.playchat.ui.activity.MainActivity'
instance_index = instance["index"]
instance_name = instance["name"]
instance_appium_port = instance["appium_port"]
instance_system_port = instance["system_port"]
instance_adb_port = instance["adb_port"]
device_id = launch_instance(instance)
logging.info(f"Starting Appium Server for instance: {instance_name}")
run_appium_server(instance_appium_port)
installed_platos = list_installed_plato(device_id, instance_adb_port)


2025-01-16 19:29:35 - INFO - Launching LDPlayer instance: B
2025-01-16 19:29:35 - INFO - Waiting for the new LDPlayer instance to appear as a device...
2025-01-16 19:29:53 - INFO - LDPlayer instance is running with device ID: emulator-5556
2025-01-16 19:29:54 - INFO - Device emulator-5556 is fully booted.
2025-01-16 19:29:54 - INFO - Starting Appium Server for instance: B
2025-01-16 19:29:57 - INFO - Appium server is running!


In [7]:
package_name = installed_platos[0]
package_name

'com.plato.androia'

In [8]:
logging.info(f"Starting Appium session on the device on instance {instance_name} for {package_name} app" )
d = start_appium_session(instance_appium_port, instance_system_port, instance_adb_port, device_id, package_name, app_activity)


2025-01-16 19:30:05 - INFO - Starting Appium session on the device on instance B for com.plato.androia app


In [9]:
d.find_element(AppiumBy.ANDROID_UIAUTOMATOR,
    'new UiSelector().text("Close app")'
).click()

In [9]:
logging.info(f"launching app {package_name}")
d.activate_app(package_name)

2025-01-16 19:30:39 - INFO - launching app com.plato.androia


<appium.webdriver.webdriver.WebDriver (session="6b1ecb5b-e107-4d8a-87a4-8a9a6f3595e4")>

In [9]:
size = d.get_window_size()

x1 = int(size['width'] * 0.75)
y1 = int(size['height'] * 0.25)

x2 = int(size['width'] * 0.35)
y2 = int(size['height'] * 0.25)

d.swipe(x1, y1, x2, y2, 150)


<appium.webdriver.webdriver.WebDriver (session="601ff091-22ac-447e-b46a-c90cf9bea63a")>

In [14]:
d.find_element(By.XPATH, '//android.widget.TextView[@text="FEED"]').click()


In [34]:
while 1:
    flag = False
    try:
        recycler = d.find_element(By.ID, 'favorites_recycler_view')
    except:
        break
    for title_element in recycler.find_elements(By.ID, 'title_text_view'):
        if 'Cribbage'.lower() == title_element.text.lower().strip():
            x = title_element.location_in_view['x'] + title_element.size['width']//2
            y = title_element.location_in_view['y'] - 50
            d.tap([(x, y)])
            flag = True
            break
    if flag:
        break
    else:
        print("couldn't find")
        size = d.get_window_size()
        x1 = int(size['width'] * 0.75)
        y1 = int(size['height'] * 0.25)
        x2 = int(size['width'] * 0.35)
        y2 = int(size['height'] * 0.25)
        d.swipe(x1, y1, x2, y2, 150)
    

In [None]:
d.find_element(By.ID, 'favorites_recycler_view')

# 
# //android.widget.TextView[@text='2']

In [None]:
d.find_element(By.ID, "plato_conversation_time").text

# d.find_element(AppiumBy.XPATH, "//*[@resource-id='game_type_time' or @resource-id='plato_conversation_time']")


In [10]:
from appium import webdriver
from PIL import Image, ImageDraw
import io


In [11]:
screenshot = d.get_screenshot_as_png()


In [20]:

image = Image.open(io.BytesIO(screenshot))
width, height = image.size
x = int(width * 0.5)
y = int(height * 0.89)
pixel_color = image.getpixel((x, y))
yellow = (255, 255, 0)
blue = (22, 135, 239)
def color_distance(c1, c2):
    return sum((a - b) ** 2 for a, b in zip(c1, c2)) ** 0.5
distance_to_blue = color_distance(pixel_color, blue)
distance_to_blue

155.32224567009067

In [None]:
pixel_color

In [12]:
screenshot = d.get_screenshot_as_png()



In [18]:

image = Image.open(io.BytesIO(screenshot))

width, height = image.size

x = int(width * 0.55)
y = int(height * 0.6)  # Vertically centered

draw = ImageDraw.Draw(image)
marker_radius = 10  # Radius of the marker circle
draw.ellipse((x - marker_radius, y - marker_radius, x + marker_radius, y + marker_radius), outline="red", width=3)

# Optionally show the image directly in code
image.show()


In [94]:
from selenium.webdriver.remote.webelement import WebElement

WebDriverWait(d, 10).until(EC.visibility_of_element_located(
    (By.ID, 'enterable_item_title')))
found_matchmaking_buttons: list[WebElement] = []
while 1:
    found_new = False
    for matchmaking_title in d.find_elements(By.ID, "enterable_item_title"):
        try:
            txt = matchmaking_title.text
            if txt not in [x[1] for x in found_matchmaking_buttons]:
                found_matchmaking_buttons.append((matchmaking_title, txt))
                found_new = True
        except Exception as e:
            pass
    if not found_new:
        break
    d.press_keycode(20)
    d.press_keycode(20)
    d.press_keycode(20)
    d.press_keycode(20)
if found_matchmaking_buttons:
    found_matchmaking_buttons[-1][0].click()
WebDriverWait(d, 10).until(EC.visibility_of_element_located((By.ID, 'join_button'))).click()
while 1:
    try:
        el = d.find_element(By.ID, 'enterable_item_message')
        txt = el.text
        assert 'match made' in txt.lower()
        el.click()
        break
    except:
        pass
    sleep(0.5)
sleep(2)
WebDriverWait(d, 3*60).until(EC.visibility_of_element_located((By.ID, 'plato_chat_box_fake')))


In [None]:
game_name = 'Bingo'
found_games = []

games_button = d.find_elements(By.ID, "game_list_item_container")
found_new = False
for game in games_button:
    try:
        game_title = game.find_element(By.ID, 'game_list_item_title').text
        print(game_title)
        if game_title not in found_games:
            if game_name.lower() == game_title.lower():
                game.click()
                break
            found_games.append(game_title)
            found_new = True
    except:
        pass
found_new

In [None]:

d.press_keycode(20)
d.press_keycode(20)
d.press_keycode(20)
d.press_keycode(20)
d.press_keycode(20)

In [None]:
d.find_element(By.ID, "plato_image_people")

In [None]:

d.terminate_app(package_name)


In [9]:
d.quit()


In [19]:

d.quit()

stop_appium_server(instance_appium_port)
quit_ldplayer_instance_by_index(instance_index)

2025-01-16 19:33:18 - INFO - Appium server on port 4724 has been stopped.
