In [26]:
import pymysql
import requests
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import time
import logging
import boto3
import pendulum
import pytz
import os, shutil
from io import StringIO
from tqdm import tqdm
import swifter


from keys import *


In [28]:
# today = datetime.now().strftime("%Y%m%d")
today = "20240826"
types = {'Name':'str','Code':'str'}
code_df = pd.read_csv("./code.csv", dtype=types).rename(columns={"Name":'name',"Code":"stock_code"})

In [29]:
df_types = {'stck_bsop_date' : "str",
'stck_clpr' : "int",
'stck_oprc' : "int",
'stck_hgpr' : "int",
'stck_lwpr' : "int",
'acml_vol' : "int",
'acml_tr_pbmn' : "int",
'flng_cls_code' : "int",
'prtt_rate' : "float",
'mod_yn' : "str",
'prdy_vrss_sign' : "int",
'prdy_vrss' : "int",
'revl_issu_reas' : "str",
'hts_avls' : "int",
'prdy_vol' : "int",
'stock_code' : "str"}
# s3랑 연결 설정
access = aws_access_key() # git 올릴 때를 위한 암호화 
secret = aws_secret()

s3 = boto3.client(
    's3',
    aws_access_key_id= access, 
    aws_secret_access_key= secret,
    region_name='ap-northeast-2'
)

bucket_name = 'antsdatalake'
folder = 'once_time/' 

# rds와 연결
user = 'ants'
password = rds_password()
host= end_point()
port = 3306
database = 'datawarehouse'
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}")

# conn = pymysql.connect(host=host, user=user, passwd=password, db=database)
# cursor = conn.cursor()

df = pd.read_csv(f"~/airflow/stock_data/data/{today}.csv", dtype=df_types)
df.drop(columns=['stck_oprc','acml_vol', 'stck_hgpr','stck_lwpr','acml_vol','acml_tr_pbmn','flng_cls_code','prtt_rate','mod_yn','prdy_vrss_sign','prdy_vrss','revl_issu_reas'], inplace=True)
df.rename(columns={"stck_bsop_date":"date","stck_clpr":"closing_price","hts_avls":"hts_total","prdy_vol":"prev_trading"}, inplace=True)
df = pd.merge(df, code_df, on = 'stock_code', how='left')

df_columns = ['stock_code', 'name', 'date', 'closing_price', 'hts_total', 'prev_trading']
upload_df = df[df_columns]
upload_df['date'] = pd.to_datetime(upload_df['date'], format="%Y%m%d",errors='coerce')

upload_df.to_sql('once_time', index=False, if_exists="append", con=engine)

2716

In [36]:
from tqdm import tqdm

# 데이터베이스에서 데이터를 읽어옴
select_sql = "SELECT stock_code, date, closing_price FROM `once_time` WHERE `date` >= CURDATE() - INTERVAL 180 DAY;"
rds_df = pd.read_sql(select_sql, con=engine)

rds_df['date'] = pd.to_datetime(rds_df['date'])
rds_df = rds_df.sort_values(by=['stock_code', 'date'])
print("read 완료")

# 업데이트할 날짜
target_date = pd.Timestamp(f'{today}')

if rds_df.empty:
    print(f"No data available up to {target_date}")
else:
    if 'closing_price' in rds_df.columns:
        # tqdm을 사용하여 진행률을 표시하며 이동 평균 계산
        tqdm.pandas(desc="Calculating Moving Averages")
        
        rds_df['MA5'] = rds_df.groupby('stock_code')['closing_price'].progress_apply(lambda x: x.rolling(window=5, min_periods=1).mean()).reset_index(level=0, drop=True).round(1)
        rds_df['MA20'] = rds_df.groupby('stock_code')['closing_price'].progress_apply(lambda x: x.rolling(window=20, min_periods=1).mean()).reset_index(level=0, drop=True).round(1)
        rds_df['MA60'] = rds_df.groupby('stock_code')['closing_price'].progress_apply(lambda x: x.rolling(window=60, min_periods=1).mean()).reset_index(level=0, drop=True).round(1)
        rds_df['MA120'] = rds_df.groupby('stock_code')['closing_price'].progress_apply(lambda x: x.rolling(window=120, min_periods=1).mean()).reset_index(level=0, drop=True).round(1)
        
        # 특정 날짜의 데이터만 업데이트
        for index, row in rds_df[rds_df['date'] == target_date].iterrows():
            update_sql = """
                UPDATE `once_time` 
                SET MA5 = %s, MA20 = %s, MA60 = %s, MA120 = %s
                WHERE stock_code = %s AND `date` = %s
            """
            with engine.connect() as connection:
                connection.execute(update_sql, (
                    row['MA5'], row['MA20'], row['MA60'], row['MA120'],
                    row['stock_code'], row['date']
                ))
            print(row)
            
print("update 완료")


read 완료


Calculating Moving Averages: 100%|██████████| 2716/2716 [00:00<00:00, 4296.89it/s]


TypeError: incompatible index of inserted column with frame index