In [55]:
import asyncio
import os
import time
import random
import string
import json
import traceback

from common.database_v2 import make_connection, make_engine
from common.gdb_helper_v2 import GDBHelper
from common.helpers import MakeTimedUniqueId
from modules.application.models import Profile, Account

from stocktwits.stocktwits import Stocktwits

In [56]:
session = make_connection()
gdb = GDBHelper(session)

query = gdb.query(Account).filter(
    Account.created.is_(True),
    Account.verified.is_(True),
    Profile.status == 'Inactive'
).join(
    Profile, Account.profile_id == Profile.profile_id
).limit(1)
account = await gdb.one_or_none(query)

if account is None:
    print('===> no accounts to start activity')

if account is not None:
        
    print('start account activity', account.fullname)
    
    profile: Profile = await gdb.get(Profile, account.profile_id)

    # create stocktwits account
    app = Stocktwits(
        account.account_id, 'Stocktwits', 
        proxy_type=profile.proxy_type, proxy_address=profile.proxy_address,
        username=account.username, password=account.password, fullname=account.fullname, 
        avatar=profile.avatar, bio=profile.bio)

    await gdb.close()



start account activity Charming Wolf


In [57]:
await app.initialize()

login_url = "https://stocktwits.com/signin"
await app.login(login_url)

StockTwits login page opened successfully.
Username entered.




Password entered.
Login button clicked.
Current URL after login: https://stocktwits.com/
Login successful


In [44]:
search_bar = await app.driver.find_element("xpath", "//input[@name='desktopSearch' and @placeholder='Search stocks, crypto, and people']", timeout=60)
await search_bar.click()

ticker_to_search = 'BURU'
print(f"Searching for ticker: {ticker_to_search}")
await search_bar.send_keys(ticker_to_search)

result_element = await app.driver.find_element("xpath", "//div[contains(@class, 'List_list__VdL2T')]//div[contains(@class, 'Item_item__CUnwa')]", timeout=20)
await result_element.click()

Searching for ticker: GOOGL


In [59]:

cond1 = "//div[contains(concat(' ', normalize-space(@class), ' '), ' WatchButton_container')]"
cond2 = "//button[contains(concat(' ', normalize-space(@class), ' '), ' WatchButton_button')]"
watchlist_button = await app.driver.find_element("xpath", f"{cond1}{cond2}", timeout=10)

button_text = await watchlist_button.text
button_text = button_text.strip()
print(f"watchlist button text: {button_text}")


watchlist button text: Watch


In [52]:
button_text = (await watchlist_button.text).strip()
if button_text == 'Watch':
    print('click to add to watchlist')
    await watchlist_button.click()


click to add to watchlist


In [82]:
posts = await app.driver.find_elements("xpath", "//div[contains(concat(' ', normalize-space(@class), ' '), ' StreamMessage_clickable')]", timeout=10)
print(f"Found {len(posts)} posts in the current scroll view.")

random_post = random.choice(posts)
# print(f"random post: {await random_post.text}")

link_a = await random_post.find_element("xpath", ".//span[starts-with(@class, 'StreamMessage_username')]/..", timeout=10)
await app.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", link_a, timeout=10)

print(f"Clicked on user: {await link_a.text}")
await link_a.click()


Found 89 posts in the current scroll view.
Clicked on user: stabbed


In [85]:
cond1 = "//div[contains(concat(' ', normalize-space(@class), ' '), ' UserPageHeader_actions')]"
cond2 = "//button[contains(concat(' ', normalize-space(@class), ' '), ' FollowButton_button')]"
follow_button = await app.driver.find_element("xpath", f"{cond1}{cond2}", timeout=20)

button_text = await follow_button.text
button_text = button_text.strip()
print(f"Follow button text: {button_text}")

await follow_button.click()


Follow button text: Follow


In [54]:

logo_element = await app.driver.find_element("id", "sidebar_logo_id", timeout=20)
await logo_element.click()


In [86]:
await app.close()