In [6]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
import pymysql , subprocess , time
from sshtunnel import SSHTunnelForwarder
from sqlalchemy.exc import SQLAlchemyError
import settings

# 現在の作業ディレクトリの絶対パスを取得
import os
current_directory = os.getcwd()

# SSH接続情報
ssh_host = settings.SSH_HOST
ssh_port = settings.SSH_PORT  # デフォルトのSSHポート
ssh_user = settings.SSH_USER
ssh_key = current_directory + settings.SSH_KEY
print(ssh_host , ssh_port , ssh_user , ssh_key)

#port
local_port = 3307  # 任意のローカルポート
remote_port = 3306  # リモートMySQLサーバーのポート

# データベースの接続情報を設定
db_user = settings.DB_USER
db_password = settings.DB_PASSWORD
db_host = settings.DB_HOST
db_port = settings.DB_PORT
db_name = settings.DB_NAME


# SSHトンネルを作成
ssh_command = [
    'ssh',
    '-i', ssh_key,
    '-L', f'{local_port}:127.0.0.1:{remote_port}',
    '-N',  # コマンドを実行せずにフォアグラウンドで実行
    '-f',  # バックグラウンドで実行
    '-p', str(ssh_port),
    f'{ssh_user}@{ssh_host}'
]

# 集計期間とサイト
collect_term=[["2023-01-01" , "2024-11-30" ]]
siteid_list=[443, 427 , 486 , 423 , 477 ,483 , 484 , 486]

#CV データの取得
subscribe_query = text("""
    SELECT *
    FROM swan_analyze.analyze_ppv_all
    WHERE public_flg =1 AND site_id=:site_id AND date >=:start_day AND date <=:end_day
    """)

try:
    # SSHトンネルの開始
    subprocess.run(ssh_command, check=True)
    print(f"SSH tunnel established on local port {local_port}")
    
    # SSHトンネルが確立されるまで少し待つ
    time.sleep(5)

    # データベース接続エンジンの作成
    engine = create_engine(f'mysql+pymysql://{db_user}:{db_password}@127.0.0.1:{local_port}/{db_name}')
    print("Database connection established.")
    
    #ここでエンジンを使用してデータベース操作を行います
    with engine.connect() as connection:
        for d in collect_term[0:1]:
            for s in siteid_list[5:6]:
                print(s , d[0] , d[1])
                
                # CVデータ集計
                result = connection.execute(subscribe_query , {"site_id":s , "start_day":d[0] , "end_day":d[1]})
                print(result)

except SQLAlchemyError as e:
    print(f"An error occurred: {e}")
    

columns = result.keys()

# クエリの結果をPandas DataFrameに変換
df = pd.DataFrame(result.fetchall(), columns=columns)
#df.tail(50)


52.199.34.55 22 wwwuser /work/mobweb_key1.pem
[['2025-01-01', '2025-01-31'], ['2024-12-01', '2024-12-31'], ['2024-11-01', '2024-11-30'], ['2024-10-01', '2024-10-31'], ['2024-09-01', '2024-09-30'], ['2024-08-01', '2024-08-31'], ['2024-07-01', '2024-07-31']]


Host key verification failed.


CalledProcessError: Command '['ssh', '-i', '/work/mobweb_key1.pem', '-L', '3307:127.0.0.1:3306', '-N', '-f', '-p', '22', 'wwwuser@52.199.34.55']' returned non-zero exit status 255.

In [15]:
#エラーをチェック
print(df.isnull().sum())
print(len(df))


NameError: name 'df' is not defined

In [3]:
#### 付け加えカラム（別データ　）
#全購入金額
df_total_price = df[['member_id' , 'price']].groupby('member_id').sum().reset_index()
#print(df_total_price)

#商品購入リスト
data_item = df.groupby('member_id')['menuid'].apply(list).reset_index()
#print(data_item)


#日付　リスト
data_inflow = df.groupby('member_id')['start_date'].apply(list).reset_index()
#print(data_inflow)

#別データをマージ
df_add = pd.merge(df , df_total_price, on=['member_id'] , how='outer')
df_add = pd.merge(df_add , data_item, on=['member_id'] , how='outer')
df_add = pd.merge(df_add , data_inflow, on=['member_id'] , how='outer')

#### data 整理####
# 列名を変更
df_add = df_add.rename(columns={
    'price_y': 'total_sale',
    'menuid_y':'ppv_history' ,
    'start_date_y':'date_history' ,
    })

#uniq personal_data
df_uiq = df_add.drop_duplicates(subset=['member_id'])

#必要なカラムのみ抽出
column_list = ['site_id', 'member_id' , 'total_sale' , 'ppv_history', 'date_history'] #,'date_history' ]
df_uiq = df_uiq[column_list]

#6000円以上の人を抽出
df_o6000 = df_uiq.query(' 6000 < total_sale ') 


print(df_o6000.tail(50) , len(df_o6000) )


      site_id                         member_id  total_sale  \
7570      483  ec19ae36a9fd4af9cfdd1b602994cc55       30900   
7583      483  ec4a787381fa9277828e6e17365fbce9        8500   
7586      483  ec4db839b29659c804bc5a835814d1bb       14500   
7592      483  ec6b9b8fab9ed3bf15cf5c97c0613546        9000   
7596      483  ec919cb134c42ee153006fbab5fb532f       12600   
7605      483  ecb6ae9dbcf2026a1f08c48c43d8409f       44200   
7631      483  eddbf44d686cc3d85dd8de1b9c59e6a0        7000   
7635      483  eddfcc7a417552ad272d908df8c0fb7d        8700   
7643      483  ee28b68c39945141903940344f62efe3        7500   
7646      483  ee45c4175f4725e82ec3a0910217bff4        6200   
7653      483  eeb8a81b319088272da0ba07e4d1dbeb        9500   
7657      483  eec0fd4eb66f16f6a6d74f3840a69032        8500   
7668      483  efb13c38f8666422530de370ea0423f5       13500   
7674      483  f05b84f366256c82f2918f188692571b        8800   
7682      483  f1111a0d83a65658692660f4ace1a288        

In [4]:
df_o6000.to_csv('hayatomo_6000_oh.csv' ,  encoding='utf-8_sig')