In [None]:
%pip install numpy==2.4.2
%pip install python-dotenv==1.2.1
%pip install openai==2.16.0
%pip install readability-lxml==0.8.4.1 w3lib==2.4.0 httpx==0.28.1 beautifulsoup4==4.14.3 azure-ai-inference==1.0.0b9
%pip install sentencepiece==0.2.1 protobuf==6.33.5
%pip install sentence-transformers==5.2.2 tiktoken==0.12.0

In [None]:
import os
import json
import asyncio
import numpy  as np
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from dotenv   import load_dotenv
from logging  import Logger, getLogger
from openai   import OpenAI

import pandas as pd
from pyspark import StorageLevel
from pyspark.sql import types, Window
from pyspark.sql.functions import col
import pyspark.sql.functions as F

from pyspark.sql import SparkSession, DataFrame
from delta import configure_spark_with_delta_pip

from azure.ai.inference.models import SystemMessage, UserMessage
from sentence_transformers     import SentenceTransformer

from url_scraper import fetch_web

In [None]:
builder = SparkSession.builder\
            .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")\
            .config("spark.sql.parquet.output.committer.class", "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter")\
            .config("spark.mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
            .config("spark.sql.adaptive.enabled", True)\
            .config("spark.sql.shuffle.partitions", "auto")\
            .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100MB")\
            .config("spark.sql.adaptive.coalescePartitions.enabled", True)\
            .config("spark.sql.dynamicPartitionPruning.enabled", True)\
            .config("spark.sql.autoBroadcastJoinThreshold", "10MB")\
            .config("spark.sql.session.timeZone", "Asia/Tokyo")\
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
            .config("spark.databricks.delta.write.isolationLevel", "SnapshotIsolation")\
            .config("spark.databricks.delta.optimizeWrite.enabled", True)\
            .config("spark.databricks.delta.autoCompact.enabled", True)
            # Delta Lake 用の SQL コミットプロトコルを指定
            # Parquet 出力時のコミッタークラスを指定
            # Azure Blob Storage (ABFS) 用のコミッターファクトリを指定
            # '_SUCCESS'で始まるファイルを書き込まないように設定
            # AQE(Adaptive Query Execution)の有効化
            # パーティション数を自動で調整するように設定
            # シャッフル後の1パーティションあたりの最小サイズを指定
            # AQEのパーティション合成の有効化
            # 動的パーティションプルーニングの有効化
            # 小さいテーブルのブロードキャスト結合への自動変換をするための閾値調整
            # SparkSessionのタイムゾーンを日本標準時刻に設定
            # Delta Lake固有のSQL構文や解析機能を拡張モジュールとして有効化
            # SparkカタログをDeltaLakeカタログへ変更
            # Delta Lake書き込み時のアイソレーションレベルを「スナップショット分離」に設定
            # 書き込み時にデータシャッフルを行い、大きなファイルを生成する機能の有効化
            # 書き込み後に小さなファイルを自動で統合する機能の有効化


spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
# .env ファイルを読み込む
load_dotenv()

# 環境変数の取得
AI_FOUNDRY_ENDPOINT = os.environ.get("AI_FOUNDRY_ENDPOINT")
AI_FOUNDRY_API_KEY  = os.environ.get("AI_FOUNDRY_API_KEY")
AI_FOUNDRY_MODEL    = os.environ.get("AI_FOUNDRY_MODEL")
MAX_TOKENS          = os.environ.get("MAX_TOKENS")
TEMPERATURE         = os.environ.get("TEMPERATURE")
TOP_P               = os.environ.get("TOP_P")

# メモ：
# ウィジット経由でのパラメータの取得方法では、無条件にパラメータの型がStringに変換されてしまう
# そのため、受け取ったパラメータを適切に型変換する必要がある
MAX_TOKENS  = int(MAX_TOKENS)
TEMPERATURE = float(TEMPERATURE)
TOP_P       = float(TOP_P)


# 簡易デバッグ用
print(f'AI_FOUNDRY_ENDPOINT: {AI_FOUNDRY_ENDPOINT}')
print(f'AI_FOUNDRY_API_KEY:  {AI_FOUNDRY_API_KEY}')
print(f'AI_FOUNDRY_MODEL:    {AI_FOUNDRY_MODEL}')
print(f'MAX_TOKENS:          {MAX_TOKENS}')
print(f'TEMPERATURE:         {TEMPERATURE}')
print(f'TOP_P:               {TOP_P}')

In [None]:
logger    = getLogger(__name__)
semaphore = asyncio.Semaphore(10)

LP_URL   = "https://lp.br-lb.com/"

res_dict = await fetch_web(logger, semaphore, LP_URL)
print(res_dict)

In [None]:
messages  = []
messages.append(SystemMessage(content=(
				"あなたは商品LPの分析を行うマーケティングの専門家です。\n"
				"提供された商品情報を読み解き、この商品が「マッチする領域（適合）」と「マッチしない領域（不適合）」を明確に分類して抽出してください。\n\n"
				
				"【出力形式（厳守）】\n"
				"回答は必ず以下のJSON形式のみを出力してください。\n"
				"各単語をキーとし、その関連度の強さ（重み）を 0.0〜1.0 の数値で値として設定してください。\n"
				"Markdown記法（```json 等）は含めず、生のJSONテキストのみを返してください。\n\n"
					
				"""
				{
					"positive": {
						"places": {"単語": 0.9, "単語": 0.7},
						"scenes": {"単語": 0.8, "単語": 0.6},
						"traits": {"単語": 0.9}
					},
					"negative": {
						"places": {"単語": 0.9},
						"scenes": {"単語": 0.8},
						"traits": {"単語": 0.7}
					}
				}
				"""
				"\n\n"
				
				"【分析項目と定義】\n"
				"--- positive（適合）：商品が積極的に活用される文脈 ---\n"
				"・places（場所）：実際に使われる場所（※可能な限り指定マスターリストから選択）\n"
				"・scenes（場面）：利用状況やタイミング\n"
				"・traits（行動・心理）：ターゲットユーザーの特徴\n\n"
				
				"--- negative（不適合）：利用が想定されない、または不向きな文脈 ---\n"
				"・places（場所）：適さない場所、不要な場所（※リスト外の言葉も可）\n"
				"・scenes（場面）：機能が発揮できない状況\n"
				"・traits（行動・心理）：この商品を必要としない人の特徴\n\n"
				
				"【重み（スコア）の基準】\n"
				"・1.0に近いほど：その傾向が非常に強い、確信度が高い\n"
				"・0.0に近いほど：関連性が薄い\n"
				"・positiveの場合：適合度の高さ\n"
				"・negativeの場合：不適合度の高さ（明確に避けるべき度合い）"
			)))
messages.append(UserMessage(content=json.dumps(res_dict)))

llmClient = OpenAI(base_url=AI_FOUNDRY_ENDPOINT, api_key=AI_FOUNDRY_API_KEY)
response  = llmClient.chat.completions.create(
				messages=messages,
				tools=None,
				tool_choice=None,
				max_tokens=MAX_TOKENS,
				temperature=TEMPERATURE,
				top_p=TOP_P,
				model=AI_FOUNDRY_MODEL
			)

result_text = response.choices[0].message.content
analysis_data = json.loads(result_text)
analysis_data

In [None]:
LATEST_UPDATE_DATE   = datetime.now(tz=ZoneInfo('Asia/Tokyo')).replace(day=1) - timedelta(days=1)
SPECIFIED_START_DATE = LATEST_UPDATE_DATE.replace(day=1).strftime('%Y-%m-%d')
SPECIFIED_END_DATE   = LATEST_UPDATE_DATE.strftime('%Y-%m-%d')

VISIT_BEHAVIOR_TABLE = "adinte_datainfrastructure.master.relational_spot"
window_moving        = Window.partitionBy('adid')
sdf_visit_behav      = spark.read.table(VISIT_BEHAVIOR_TABLE)\
							.select(['adid', 'prefecture', 'city', 'countofcontact', 'start_date', 'end_date'])\
                            .filter(col('start_date') == SPECIFIED_START_DATE)\
                            .filter(col('end_date')   == SPECIFIED_END_DATE)\
                            .withColumn('location', F.concat_ws('', F.col('prefecture'), F.col('city')))\
                            .select( ['location', 'adid', 'countofcontact'])\
                            .groupBy(['location', 'adid'])\
                            .agg(F.sum('countofcontact').alias('countofcontact'))\
                            .withColumn('visit_frequency', col('countofcontact') / F.sum('countofcontact').over(window_moving))\
                            .filter(col('visit_frequency') != 1)\
                            .select( ['location', 'adid', 'visit_frequency'])\
                            .orderBy(['location', 'adid'])

sdf_visit_behav = sdf_visit_behav.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
sdf_visit_behav.display()

In [None]:
items            = list(analysis_data['positive']['places'].items())
lp_keywords      = [k for k, v in items]
lp_weights       = [v for k, v in items]
relational_spots = [row['location'] for row in sdf_visit_behav.select('location').dropDuplicates().collect()]

model            = SentenceTransformer('pkshatech/GLuCoSE-base-ja')
lp_vector        = np.array(lp_weights).reshape(1, -1)               # 1 × キーワード数M
lp_matrix        = model.encode(lp_keywords)                         # キーワード数M × 768
spots_matrix     = model.encode(relational_spots)                    # スポット数N × 768
lp_coefficient   = lp_vector @ lp_matrix @ spots_matrix.T            # LPのスポット係数
lp_coefficient

In [None]:
weights_data = list(zip(relational_spots, lp_coefficient.flatten().astype(float).tolist()))
schema = types.StructType([
    types.StructField('location', types.StringType(), True),
    types.StructField('weight',   types.DoubleType(), True)
])
sdf_weights = spark.createDataFrame(weights_data, schema)\
				.withColumns({
                    'mean'   : F.mean('weight').over(Window.partitionBy()),
                    'stddev' : F.stddev('weight').over(Window.partitionBy()),
				})\
				.withColumn('weight', (col('weight') - col('mean')) / col('stddev'))\
                .select(['location', 'weight'])

sdf_scored = sdf_visit_behav\
    			.join(sdf_weights, on='location', how='inner')\
    			.withColumn('weighted_score', F.col('visit_frequency') * F.col('weight'))\
                .groupBy('adid')\
    			.agg(F.sum('weighted_score').alias('final_score'))\
                .select(['adid', 'final_score'])\
                .orderBy('final_score')\
                .limit(10000)

sdf_scored.display()

In [None]:
REASON_THRESHOLD = 0.5
sdf_reason       = sdf_weights\
						.filter(col('weight') >= REASON_THRESHOLD)\
                        .join(sdf_visit_behav, on='location', how='inner')\
                        .select(['adid', 'location'])\
                        .groupBy('adid')\
                        .agg(F.concat_ws(', ', F.collect_set('location')).alias('reason'))\
                        .select(['adid', 'reason'])\
                		.orderBy('adid')\
                        .limit(10000)

sdf_reason.display()

In [None]:
# 不要なデータフレームのメモリ解放
sdf_visit_behav.unpersist()