In [1]:
import re
import sys
import time

import numpy as np
import pandas as pd
import pymysql
import requests
from sshtunnel import SSHTunnelForwarder

sys.path.append("../")
import datetime
import json
import urllib
from time import sleep

from vook_db_lambda.local_config import CLIENT_ME, ClientId, aff_id, pid, sid

Now: Local env


In [2]:
SSH_PKEY_PATH = "~/.ssh/vook-rails-ssh-key-dev.pem"
ng_ok_table = pd.read_csv("../data/input/query_ng_ok.csv")
size_id = 999
sleep_second = 1
WANT_ITEMS_RAKUTEN = [
    "itemName",
    "itemPrice",
    "itemUrl",
]
HITS_PER_PAGE = 30
req_params = {
    "applicationId": CLIENT_ME["APPLICATION_ID"],
    "affiliateId": CLIENT_ME["AFF_ID"],
    "format": "json",
    "formatVersion": "2",
    "keyword": "",
    "hits": HITS_PER_PAGE,
    "sort": "-itemPrice",
    "page": 0,
    "minPrice": 100,
}
REQ_URL = "https://app.rakuten.co.jp/services/api/IchibaItem/Search/20170706"
MAX_PAGE = 10
WANT_ITEMS_YAHOO = [
    "id",
    "name",
    "url",
    "price",
    "knowledge_id",
    "platform_id",
    "size_id",
    "created_at",
    "updated_at",
]
REQ_URL_CATE = "https://shopping.yahooapis.jp/ShoppingWebService/V3/itemSearch"

In [3]:
def get_ec2_config():
    return {
        "host_name": "3.114.154.242",
        "ec2_port": 22,
        "ssh_username": "ec2-user",
        "ssh_pkey": "~/.ssh/vook-rails-ssh-key.pem",
        "rds_end_point": "vook-rails-db.ctutkiavfpne.ap-northeast-1.rds.amazonaws.com",
        "rds_port": 3306,
    }


def read_sql_file(file_path):
    """
    指定されたファイルパスからSQLファイルを読み込み、その内容を文字列として返す。

    :param file_path: 読み込む.sqlファイルのパス
    :return: ファイルの内容を含む文字列
    """
    try:
        with open(file_path, "r") as file:
            return file.read()
    except IOError as e:
        # ファイルが開けない、見つからない、などのエラー処理
        return f"Error reading file: {e}"


def get_knowledges():
    config_ec2 = get_ec2_config()
    query = read_sql_file("../vook_db_lambda/sql/knowledges.sql")
    df_from_db = pd.DataFrame()
    with SSHTunnelForwarder(
        (config_ec2["host_name"], config_ec2["ec2_port"]),
        ssh_username=config_ec2["ssh_username"],
        ssh_pkey=config_ec2["ssh_pkey"],
        remote_bind_address=(
            config_ec2["rds_end_point"],
            config_ec2["rds_port"],
        ),
    ) as server:
        print(f"Local bind port: {server.local_bind_port}")
        conn = None
        try:
            conn = pymysql.connect(
                **get_rds_config(server.local_bind_port), connect_timeout=10
            )
            cursor = conn.cursor()
            cursor.execute(query)
            for (
                row
            ) in (
                cursor
            ):  # column1, column2, ...は取得したいカラム名に合わせて変更してください
                df_from_db = pd.concat(
                    [df_from_db, pd.DataFrame([row])], ignore_index=True
                )
            return df_from_db
        except pymysql.MySQLError as e:
            print(f"Error connecting to MySQL: {e}")
        finally:
            if conn is not None:
                conn.close()


def get_rds_config(port):
    return {
        "user": "root",
        "password": "rds-vook",
        "port": port,
        "host": "localhost",
        "database": "vook_web_v3_production",
        "charset": "utf8mb4",
        "cursorclass": pymysql.cursors.DictCursor,
    }


def create_wort_list(df_from_db: pd.DataFrame, unit: str) -> list:
    """brand,line,knowledgeの連続2文字以上ワードかどうかを判定、修正する"""
    words = df_from_db[
        f"{unit}_name"
    ].values.copy()  # NOTE:copyしないと関数内部で_nameカラムが更新される。
    for row in np.arange(len(words)):
        word = words[row]
        words[row] = validate_input(word)
    return list(words)


def convertor(input_string, ng_ok_table):
    # 特定のワードが DataFrame に含まれているかどうかを確認し、行番号を表示
    row_indices = ng_ok_table.index[
        ng_ok_table.apply(lambda row: input_string in row.values, axis=1)
    ].tolist()
    if row_indices:
        output = ng_ok_table["corrected"][row_indices[0]]
        print(f"{input_string}を{output}に変換します")
        return output
    else:
        print(f"{input_string}は対応表に存在しません。")
        return input_string


def validate_input(input_string):
    """
    連続する2文字以上で構成されたワードのみをOKとし、単体1文字またはスペースの前後に単体1文字が含まれるワードをNGとするバリデータ関数
    """
    # 正規表現パターン: 単体1文字またはスペースの前後に単体1文字が含まれるワードを検出
    pattern_ng = re.compile(r"^[!-~]$|\s[!-~]$|^[!-~]\s")
    # 入力文字列がNGパターンに一致するか確認
    if not pattern_ng.search(input_string):
        return input_string
    else:
        # エラーワードがあればメッセージを吐き、convertor関数によって対応する
        print(f"エラーワード　{input_string}が存在しました:")
        return convertor(input_string, ng_ok_table)


def create_df_no_ng_keyword(
    df_from_db, words_knowledge_name, words_brand_name, words_line_name
):
    df_no_ng_keyword = pd.DataFrame(columns=df_from_db.columns)
    df_no_ng_keyword["knowledge_id"] = df_from_db["knowledge_id"].values
    df_no_ng_keyword["knowledge_name"] = words_knowledge_name
    df_no_ng_keyword["brand_name"] = words_brand_name
    df_no_ng_keyword["line_name"] = words_line_name
    return df_no_ng_keyword


def create_api_input() -> pd.DataFrame:
    # 知識情報の取得
    df_from_db = get_knowledges()
    # 対象のワードリスト作成
    words_brand_name = create_wort_list(df_from_db, "brand")
    words_line_name = create_wort_list(df_from_db, "line")
    words_knowledge_name = create_wort_list(df_from_db, "knowledge")
    # 修正版のテーブルを作成
    df_api_input = create_df_no_ng_keyword(
        df_from_db, words_knowledge_name, words_brand_name, words_line_name
    )
    return df_api_input

In [4]:
# APIのインプットデータ作成
df_api_input = create_api_input()

Local bind port: 52911
エラーワード　BIG Eが存在しました:
BIG EをBIGEに変換します
エラーワード　BIG Eが存在しました:
BIG EをBIGEに変換します
エラーワード　BIG Eが存在しました:
BIG EをBIGEに変換します
エラーワード　BIG Eが存在しました:
BIG EをBIGEに変換します


In [5]:
def DataFrame_maker_rakuten(keyword, platform_id, knowledge_id, size_id):
    """apiコールした結果からdataframeを出力する関数を定義"""
    cnt = 1
    df = pd.DataFrame(columns=WANT_ITEMS_RAKUTEN)
    req_params["page"] = cnt
    req_params["keyword"] = keyword
    while True:
        req_params["page"] = cnt
        res = requests.get(REQ_URL, req_params)
        res_code = res.status_code
        res = json.loads(res.text)
        if res_code != 200:
            print(
                f"""
            ErrorCode -> {res_code}\n
            Error -> {res['error']}\n
            Page -> {cnt}"""
            )
        else:
            if res["hits"] == 0:
                print("返ってきた商品数の数が0なので、ループ終了")
                break
            tmp_df = pd.DataFrame(res["Items"])[WANT_ITEMS_RAKUTEN]
            df = pd.concat([df, tmp_df], ignore_index=True)
        if cnt == MAX_PAGE:
            print("MAX PAGEに到達したので、ループ終了")
            break
        # logger.info(f"{cnt} end!")
        cnt += 1
        # リクエスト制限回避
        sleep(1)
        print("Finished!!")

    df["platform_id"] = platform_id
    df["knowledge_id"] = knowledge_id
    df["size_id"] = size_id
    df_main = df.rename(
        columns={"itemName": "name", "itemPrice": "price", "itemUrl": "url"}
    )
    df_main = df_main.reindex(
        columns=[
            "id",
            "name",
            "url",
            "price",
            "knowledge_id",
            "platform_id",
            "size_id",
        ]
    )
    print("price type before:", df_main["price"].dtype)
    df_main["price"] = df_main["price"].astype(int)
    print("price type after:", df_main["price"].dtype)
    run_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    df_main["created_at"] = run_time
    df_main["updated_at"] = run_time
    return df_main


def DataFrame_maker_yahoo(keyword, platform_id, knowledge_id, size_id):
    start_num = 1
    step = 100
    max_products = 1000

    params = {
        "appid": ClientId,
        "output": "json",
        "query": keyword,
        "sort": "-price",
        "affiliate_id": aff_id,
        "affiliate_type": "vc",
        "results": 100,  # NOTE: 100個ずつしか取得できない。
    }

    l_df = []
    for inc in range(0, max_products, step):
        params["start"] = start_num + inc
        df = pd.DataFrame(columns=WANT_ITEMS_YAHOO)
        res = requests.get(url=REQ_URL_CATE, params=params)
        res_cd = res.status_code
        if res_cd != 200:
            print("Bad request")
            break
        else:
            res = json.loads(res.text)
            if len(res["hits"]) == 0:
                print("If the number of returned items is 0, the loop ends.")
            print("Get Data")
            l_hit = []
            for h in res["hits"]:
                l_hit.append(
                    (
                        h["index"],
                        h["name"],
                        create_url_yahoo(h["url"]),
                        h["price"],
                        knowledge_id,
                        platform_id,
                        size_id,
                        # 現在の日付と時刻を取得 & フォーマットを指定して文字列に変換
                        datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
                        # 現在の日付と時刻を取得 & フォーマットを指定して文字列に変換
                        datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
                    )
                )
            df = pd.DataFrame(l_hit, columns=WANT_ITEMS_YAHOO)
            l_df.append(df)
    if not l_df:
        print("no df")
    else:
        return pd.concat(l_df, ignore_index=True)


def time_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__}の実行時間: {round(end_time - start_time)}秒")
        return result

    return wrapper


@time_decorator
def repeat_dataframe_maker(
    df_no_ng_keyword,
    platform_id,
    func,
    size_id=size_id,
    sleep_second=sleep_second,
):
    n_bulk = len(df_no_ng_keyword)
    df_bulk = pd.DataFrame()
    for i, n in enumerate(np.arange(n_bulk)):
        brand_name = df_no_ng_keyword.brand_name[n]
        line_name = df_no_ng_keyword.line_name[n]
        knowledge_name = df_no_ng_keyword.knowledge_name[n]
        query = f"{brand_name} {line_name} {knowledge_name} 中古"
        # query validatorが欲しい　半角1文字をなくす
        knowledge_id = df_no_ng_keyword.knowledge_id[n]
        print(f"{i:03}, 検索キーワード:[" + query + "]", "knowledge_id:", knowledge_id)
        output = func(query, platform_id, knowledge_id, size_id)
        df_bulk = pd.concat([df_bulk, output], ignore_index=True)
        sleep(sleep_second)
        # 開発環境では、1知識で試す
        # break
        # BIG Eでとめる
        if "Levi's 501 BIGE 中古" == query:
            break
    return df_bulk  # TODO:lambda実行でempty dataframe 原因調査から


def create_url_yahoo(aff_url):
    """アフィリエイトURLがリンク切れのため一時的に素のURLを返す"""
    return urllib.parse.unquote(aff_url.split("vc_url=")[1])

In [6]:
# df_bulkの作成
l_df_bulk = []
for platform_id, func in zip([1, 2], [DataFrame_maker_rakuten, DataFrame_maker_yahoo]):
    df_bulk = repeat_dataframe_maker(df_api_input, platform_id, func)
    l_df_bulk.append(df_bulk)

000, 検索キーワード:[Levi's 501 66前期 中古] knowledge_id: 5
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
返ってきた商品数の数が0なので、ループ終了
price type before: object
price type after: int64
001, 検索キーワード:[U.S.ARMY M-65 FIELD JACKET 1st 中古] knowledge_id: 8
Finished!!
返ってきた商品数の数が0なので、ループ終了
price type before: object
price type after: int64
002, 検索キーワード:[Levi's 501 BIGE 中古] knowledge_id: 10
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
Finished!!
返ってきた商品数の数が0なので、ループ終了
price type before: object
price type after: int64
repeat_dataframe_makerの実行時間: 21秒
000, 検索キーワード:[Levi's 501 66前期 中古] knowledge_id: 5
Get Data
Get Data
Get Data
Get Data
Get Data
If the number of returned items is 0, the loop ends.
Get Data
If the number of returned items is 0, the loop ends.
Get Data
If the number of returned items is 0, the loop ends.
Get Data
If the number of returned items is 0, the loop ends.
Get Data
Bad request
001, 検索キーワード:[U.S.ARMY M-65 FIELD JACKET 1st 中古] knowledge_id: 8


In [7]:
df_bulk = pd.concat(l_df_bulk, axis=0, ignore_index=True)

In [12]:
df_big_e = df_bulk[df_bulk["knowledge_id"] == 10].copy()

In [16]:
df_big_e["price"].quantile(0.10)

7248.000000000001