### 登入

In [None]:
import os
import requests

BASE_URL = 'http://localhost:1234'

def write_token(token):
    if not os.path.exists('token.txt'):
        with open('token.txt', 'w') as f:
            f.write(token)
    else:
        with open('token.txt', 'w') as f:
            f.write(token)

def login(user_name, pwd):
    try:
        response = requests.post(
            f'{BASE_URL}/users/login',
            json={
                'user_name': user_name,
                'pwd': pwd,
            }
        )
        response.raise_for_status()
        token = response.json().get('token', '')
        if token:
            write_token(token)
    except requests.exceptions.RequestException as err:
        print(err)
login(user_name='versus7063', pwd='a123456+')

## Def

In [None]:
import json
import time
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

BASE_URL = 'http://localhost:1234'

def read_token(file_path='./token.txt'):
    try:
        with open(file_path, 'r') as file:
            return file.read().strip()
    except IOError as err:
        print(f'Error reading token from file: {err}')
        return None
    

def get_headers(token):
    return {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

token = read_token()

def _make_get_request(endpoint, params=None):
    """共用的 HTTP GET 請求處理函數"""
    try:
        response = requests.get(f"{BASE_URL}/{endpoint}", headers=get_headers(token), params=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as err:
        print(f"Request failed: {err}")
        return None

def register(user_name, pwd, email):
    try:
        response = requests.post(
            f'{BASE_URL}/users/register',
            json={
                'user_name': user_name,
                'pwd': pwd,
                'email': email,
            }
        )
        response.raise_for_status()
        token = response.json().get('token', '')
        if token:
            write_token(token)
    except requests.exceptions.RequestException as err:
        print(err)
            
def get_all_momentum():
    return _make_get_request('marketIndex/momentum')

def get_momentum_by_range(days):
    return _make_get_request(f'marketIndex/momentum/range/{days}')
    
def get_market_weights():
    return _make_get_request('marketIndex/weights')

def parse_date(date_str):
    date_format = '%Y-%m-%d'
    try:
        date = datetime.strptime(date_str, date_format)
        return date.strftime(date_format)
    except ValueError:
        return None

def record_my_transactions(transactions):
    try:
        for stock_id, transaction_type, quantity, price, transaction_date in transactions:
            data = {
                'stock_id': stock_id,
                'transaction_type': transaction_type,
                'quantity': quantity,
                'price': price,
                'transaction_date': parse_date(transaction_date) or datetime.now().strftime('%Y-%m-%d')
            }
            response = requests.post(f'{BASE_URL}/transactions', headers=get_headers(token), json=data)
            if response.status_code in range(200, 300):
                print(f'Successfully inserted: {data}')
            else:
                print(f'Failed to insert: {data}, Status code: {response.status_code}, Response: {response.text}')
            time.sleep(2)
    except requests.exceptions.RequestException as err:
        print(err)

def get_my_portfolio():
    return _make_get_request('portfolio')

def update_my_portfolio(params):
    try:
        response = requests.put(f'{BASE_URL}/portfolio', headers=get_headers(token), json=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as err:
        print(err)

def get_technews_by_keyword(keyword):
    data =  _make_get_request(f"technews/search?keyword={keyword}")
    return data[::-1]
    
def getAllTechnews (page, size):
	if not isinstance(page, int) or not isinstance(size, int) or page <= 0 or size <= 0:
		print('Valid page & size required')
		return []

	params = {
		'page': page,
		'size': size
	}
	data =  _make_get_request('technews', params)
	return data[::-1]
    
def create_one_company_news (params):
	try:
		response = requests.post(f"{BASE_URL}/company-news", headers=get_headers(token), json=params)
		response.raise_for_status()
		return response.json()
	except requests.exceptions.RequestException as err:
		print(err)
    
def get_all_company_news (page, size):
	if not isinstance(page, int) or not isinstance(size, int) or page <= 0 or size <=0:
		print('Valid page & size required')
		return []

	params = {
		'page': page,
		'size': size
	}
	data =  _make_get_request('company-news', params)
	return data[::-1]
    
def get_all_company_news_by_keyword (keyword, page=1, size=10):
	if not isinstance(page, int) or not isinstance(size, int) or page <= 0 or size <=0:
		print('Valid page & size required')
		return
	return _make_get_request('/company-news/query', {
		'page': page,
		'size': size,
        'keyword': keyword
	})

def get_statement_by_symbol(symbol):
    return _make_get_request(f"statements/{symbol}")
    
def get_user_subscribe_news():
    return _make_get_request('subscribe/technews')
    
def user_subscribe_news(params):
    try:
        url = f"{BASE_URL}/subscribe/technews"
        response = requests.post(url, headers=get_headers(token), json=params)
        response.raise_for_status()
        data = response.json()
        return data
    except requests.exceptions.RequestException as err:
        print(err)
    
def print_news(news):
    for item in news:
        company_name = item.get('companyName', '')
        news_id = item.get('id', '--')
        title = item.get('title', '--')
        release_time = item.get('release_time', '--')
        web_url = item.get('web_url', '--')
        print(company_name)
        print(f"{news_id} : {title} {release_time} \n")
        if web_url:
            print(web_url)
        print('----------')
        

def draw_pie_chart(obj):
	labels = obj.keys()
	sizes = obj.values()
	colors = plt.get_cmap('Set2').colors
	explode = [0.1] * len(labels)

	plt.figure(figsize=(5, 5))
	plt.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', colors=colors, startangle=140)
	plt.title('Portfolio Distribution by Stock')
	plt.show()


def draw_line_chart(x_data, y_data, label, title, xlabel, ylabel, interval=10, figsize=(10, 6), color='#efb441'):
    plt.figure(figsize=figsize)
    plt.plot(x_data, y_data, marker='', linestyle='-', color=color, label=label)

    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)

    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=interval))
    plt.gcf().autofmt_xdate()
    plt.grid(True)

    plt.legend()

    plt.tight_layout()
    plt.show()
    
def plot_pe_forward_vs_time(data):
    timestamps = []
    pe_forward_values = []
    
    for entry in data:
        created_at = entry.get('createdAt', '')
        pe_forward = entry.get('PE_Forward', None)
        
        if created_at and pe_forward is not None:
            timestamp = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
            timestamps.append(timestamp)
            
            pe_forward_values.append(float(pe_forward))
    
    draw_line_chart(timestamps, pe_forward_values, label='PE Forward', title='PE Forward vs Time', xlabel='Date', ylabel='PE Forward')


#### 註冊

In [None]:
register(user_name='', pwd='', email='')

In [None]:
res = get_market_weights()
res

### 取得動能指標

In [None]:
# my_data = get_all_momentum()

my_data = get_momentum_by_range(7)
print(my_data)

#### 畫出動能

In [None]:

import pandas as pd

df = pd.DataFrame(my_data)

df['createdAt'] = pd.to_datetime(df['createdAt'])
draw_line_chart(df['createdAt'], df['volume'], label='Momentum', title='Time Series Momentum', xlabel='Time', ylabel='Volume', interval=1)

### 記錄交易

In [None]:
transactions = [
    # ('AMZN', 'buy', 1, 180.53, '2025-04-15'),
	# ('AMZN', 'buy', 1, 181.08, '2025-04-15'),
	# ('NET', 'buy', 2, 108.28, '2025-04-15'),
	# ('GOOG', 'buy', 1, 159.74, '2025-04-15'),
	# ('GOOG', 'buy', 1, 159.91, '2025-04-15'),
]

In [None]:
record_my_transactions(transactions)

### MY Portfolio

In [None]:
my_portfolio = get_my_portfolio()

PORTFOLIO_USD = 14309

portfolio_values = {}
for item in my_portfolio:
    stock_id = item['stock_id']
    quantity = item['quantity']
    average_price = float(item['average_price'])  # 转换为浮点数
    total_value = quantity * average_price
    if stock_id in portfolio_values:
        portfolio_values[stock_id] += total_value
    else:
        portfolio_values[stock_id] = total_value
    
    print(f'{stock_id}: {quantity} * {average_price} = {total_value}'  + '\n')

# 计算投资组合中剩余的 USD 价值
portfolio_values['USD'] = PORTFOLIO_USD

portfolio_values = {k: v for k, v in portfolio_values.items() if v > 0}

sorted_portfolio_values = dict(sorted(portfolio_values.items(), key=lambda item: item[1], reverse=True))

draw_pie_chart(sorted_portfolio_values)

In [None]:

data = {
	'stock_id': 'MU',
	'average_price': 127.33
}
update_my_portfolio(data)

### search news by keyword

In [None]:
keyword = 'ai'

news = get_technews_by_keyword(keyword)

print_news(news)

In [None]:
all_news = getAllTechnews(page=1, size=5000)
print_news(all_news)

#### 搜索Company news

In [None]:
all_news = get_all_company_news(page=1, size=100)

print_news(all_news)

In [None]:
keyword = 'ai'
search_news = get_all_company_news_by_keyword(page=1, size=100, keyword=keyword)

print_news(search_news)

#### 讀取Html 轉成json資料

In [None]:
symbol = 'NVDA'

In [None]:
from bs4 import BeautifulSoup
from datetime import datetime, timezone

with open('index.html', 'r', encoding='utf-8') as file:
	html_content = file.read()

soup = BeautifulSoup(html_content, 'html.parser')

news = []

for row in soup.find_all('tr', class_='cursor-pointer has-label'):
	time_cell = row.find('td', align='right')
	time_text = time_cell.get_text(strip=True).replace('Today', '')

	if 'Today' in time_text:
		time_text = time_text.replace('Today', '')
		now = datetime.now(timezone.utc)
		date_str = now.date().isoformat()
	else:
		now = datetime.now(timezone.utc)
		date_str = now.date().isoformat()

	try:
		time_obj = datetime.strptime(time_text, '%I:%M%p')
		time_24hr = time_obj.strftime('%H:%M:%S')
	except ValueError:
		# 如果时间格式无法解析，使用默认时间
		time_24hr = '00:00:00'
		
	# 创建 MySQL 支持的时间字符串
	release_time = f"{date_str} {time_24hr}"

	link = row.find('a', class_='tab-link-news')
	title = link.get_text(strip=True)
	web_url = link['href']

	publisher = row.find('span').get_text(strip=True).strip('()')

	news_item = {
        "title": title,
        "symbol": symbol,
        "release_time": release_time,
        "publisher": publisher,
        "web_url": web_url
    }

	news.append(news_item)

print(json.dumps(news[0:2], indent=4))


#### 單筆寫入

In [None]:
from time import sleep

SLEEP_SECOND = 1 / 10
for item in news:
	res = create_one_company_news(item)
	print(res)
	sleep(SLEEP_SECOND)

#### 多筆寫入至資料庫 Company_news

In [None]:
url = BASE_URL + '/company-news/bulk'

response = requests.post(url, headers=headers, json=news)

print(f"Status Code: {response.status_code}")
print("Response Text:", response.text)

try:
    response_data = response.json()
    print(json.dumps(response_data, indent=2))
except ValueError as e:
    # 捕获并打印异常详细信息
    print(f"Failed to parse JSON: {e}")
    print("Response Text:", response.text)

#### 取得statement

In [None]:
response = get_statement_by_symbol('CRWD')
    
plot_pe_forward_vs_time(response[-500:])

In [None]:
# user_subscribe_news({'newsId': 80497})
data = get_user_subscribe_news()
data