# imports

In [6]:
import pandas as pd
import numpy as np
import copy
from tqdm import tqdm
from pyspark.sql import Window
from pyspark.sql.functions import col, sum, when, count, max
from dump import spark
from datetime import datetime, timedelta, date
from pyspark.sql import HiveContext, types
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

import tqdm
import requests
from bs4 import BeautifulSoup
import re
import json 

from sklearn.metrics import roc_auc_score, average_precision_score

# assets

In [None]:
assets = pd.read_csv('/usr/home/ymmorozov/urls/assets.csv')
necessary_columns = ['ts','dt','asset_id','ad_id','asset_type','asset_role_name', 'url', 'content']
mask = assets['asset_role_name'].isin(['title', 'description', 'click_url'])
assets = assets[mask][necessary_columns].sort_values(by=['ad_id'])
#assets.head(5)

In [8]:
assets = assets[mask][necessary_columns].sort_values(by=['ts'])

a1 = assets[assets['asset_role_name'] == 'click_url'].groupby('ad_id').tail(1).ad_id.unique()
a2 = assets[assets['asset_role_name'] == 'description'].groupby('ad_id').tail(1).ad_id.unique()
a3 = assets[assets['asset_role_name'] == 'title'].groupby('ad_id').tail(1).ad_id.unique()

print( len(a1),  len(a2),  len(a3))
len(set(a1) & set(a2) & set(a3))

1597 1155 1165


  assets = assets[mask][necessary_columns].sort_values(by=['ts'])


1143

In [9]:
a1 = assets[assets['asset_role_name'] == 'click_url'].groupby('ad_id').tail(1)[['ad_id', 'ts', 'url']]
a1 = a1.rename(columns={"ts": "click_url_ts"})

In [10]:
a2 = assets[assets['asset_role_name'] == 'description'].groupby('ad_id').tail(1)[['ad_id', 'ts', 'content']]
a2 = a2.rename(columns={"ts": "description_ts", 'content' : 'description_content'})

In [11]:
a3 = assets[assets['asset_role_name'] == 'title'].groupby('ad_id').tail(1)[['ad_id', 'ts', 'content']]
a3 = a3.rename(columns={"ts": "title_ts", 'content': 'title_content'})

In [None]:
joined = a1.join(a2.set_index('ad_id'), on='ad_id').join(a3.set_index('ad_id'), on='ad_id')
#joined.head(5)

In [13]:
joined.ad_id.unique().shape

(1597,)

In [14]:
joined.to_csv('ad_df.csv')

# Spark

In [None]:
try:
    spark.context.stop()
    spark_task.stop_context()
    spark_task = None
except (AttributeError, NameError):
    pass


spark_config = {
    'spark.speculation': 'True',
    'spark.sql.shuffle.partitions': '512',
    'spark.driver.extraJavaOptions': '-XX:MaxPermSize=384m',
    'spark.executor.extraJavaOptions': '-XX:MaxPermSize=384m',
    'spark.locality.wait': '3',
    'spark.locality.wait.process': '3',
    'spark.locality.wait.node': '3',
    'spark.locality.wait.rack': '3',
    'spark.master': 'yarn',
    'spark.sql.parquet.compression.codec': 'gzip',
    'spark.jars': ######
    'spark.yarn.queue': 'HungerGames',
    'spark.app.name': 'experiments',
    'spark.driver.memory': '8g',
    'spark.executor.memory': '24g',
    'spark.executor.cores': '4',
    'spark.executor.instances': '35',
    'spark.driver.maxResultSize': '20g',
    'spark.sql.execution.arrow.pyspark.enabled': 'true',
    'spark.sql.execution.arrow.pyspark.fallback.enabled': 'true',
    'hive.exec.dynamic.partition.mode': 'nonstrict',
    'hive.exec.dynamic.partition': 'true',
}

sc = spark.context.start(spark_config)
sqlContext = HiveContext(sc)
spark = SparkSession(sc)

# join

In [16]:
requests = spark.table("ods_pdsp.requests")
requests = requests.select('dt', 'bid_id', 'request_page_url')
requests

DataFrame[dt: string, bid_id: string, request_page_url: string]

In [17]:
events = spark.table("ods_pdsp.events")
events = events.select('dt', 'bid_id', 'ad_id', 'campaign_id', 'impression_flg', 'click_flg')
events

DataFrame[dt: string, bid_id: string, ad_id: int, campaign_id: int, impression_flg: boolean, click_flg: boolean]

In [18]:
ad_df = pd.read_csv('ad_df.csv')

values_list = ad_df['ad_id'].unique().tolist()

In [20]:
ad_df['click_url_ts'].min()

'2024-01-31 13:57:33'

In [34]:
min_dt = str(datetime.strptime( ad_df['click_url_ts'].min(), "%Y-%m-%d %H:%M:%S").date() )
max_dt = str(datetime.strptime( ad_df['click_url_ts'].max(), "%Y-%m-%d %H:%M:%S").date() )
print(min_dt, max_dt)

2024-01-31 2024-04-23


In [None]:
ev = events.filter((col("dt") >= min_dt) & (col("dt") <= max_dt) & col("ad_id").isin(values_list)).drop('dt')
re = requests.filter((col("dt") >= min_dt) & (col("dt") <= max_dt)).drop('dt')

re.join(ev, on='bid_id').groupBy(['campaign_id', "ad_id", 'request_page_url']).agg(
    sum(when(col("click_flg") == True, 1).otherwise(0)).alias("clicks_count"),
    sum(when(col("impression_flg") == True, 1).otherwise(0)).alias("impression_count")
).write.saveAsTable("ymmorozov_clicks_data")

# sizes

In [None]:
df_impres_100 = spark.table("ymmorozov_clicks_data").filter(col("impression_count") > 100).toPandas().sort_values(by='clicks_count')
df_impres_100['ctr'] = df_impres_100['clicks_count'] / df_impres_100['impression_count']
df_impres_100 = df_impres_100.sort_values(by='ctr')

#df_impres_100

In [None]:
urls_to_pars1 = df_impres_100['request_page_url'].unique().tolist()

with open('urls_to_pars1.pkl', 'wb') as f:
    pickle.dump(urls_to_pars1, f)

In [None]:
ad_to_pars_id = df_impres_100['ad_id'].unique().tolist()

ad_df = pd.read_csv('ad_df.csv')
ad_to_pars = ad_df[ad_df['ad_id'].isin(ad_to_pars_id)][['ad_id', 'url']]

ad_to_pars.to_csv('ad_to_pars.csv')

# html по url

собирал тексты с сайтов вот таким кодом

In [None]:
import tqdm
import json
import requests

def fetch_html(url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return True, response.text 
    except requests.RequestException as e:
        return False, None 

count_true = 0
htmls = {}
errors = []

for url in tqdm.tqdm(urls_to_pars1):
    boo, resp = fetch_html(url)
    if not boo:
        errors.append(url)
    else:
        count_true += 1
        htmls[url] = resp

print('true  count:\t', count_true)
print('erros count:\t', len(errors))

In [None]:
# html реклам
with open("htmls1.json", "w") as f:
    json.dump(htmls, f)
    
# html сайтов 
with open("ad_htmls.json", "w") as f:
    json.dump(htmls, f)

# парсинг текста из html

постобрабатывал текста таким кодом

In [None]:
import json

with open("htmls1.json", "r") as f:
    htmls1 = json.load(f)

In [None]:
def pars_html(html):
    try:      
        soup = BeautifulSoup(html, 'html.parser')
        re_  = re.sub('\n+', '\n', re.sub(' +', ' ', soup.get_text()))
        
        return True, re_
    except:
        return False, None 


count_true = 0
parsed_htmls = {}
errors_id = []

for url in tqdm.tqdm(htmls1.keys()):
    try:
        boo, pars = pars_html(htmls1[url])
        if boo:
            parsed_htmls[url] = pars
            count_true += 1
        if not boo:
            errors_id.append(url)
    except:
        errors_id.append(url)

print(count_true)
print(errors_id)

In [None]:
# текста реклам
with open("ad_parsed.json", "w") as f:
    json.dump(parsed_htmls, f)

# текста сайтов
with open("urls_parsed.json", "w") as f:
    json.dump(parsed_htmls, f)

# fasttext

fasttext векторизует каждое слово в тексте и суммирует полученные вектора (нужно было попробовать mean, что-то забыл про это)

In [None]:
import gensim

def fasttext_encode(df, model, column_to_encode):
    
    def f(text):
        separate_words = re.findall(r'\b[\w-]+\b', text)
        encodes = model[separate_words]
        return encodes.sum(axis=0).tolist() 

    encode_udf = udf(f, ArrayType(FloatType()))
    #encode_udf(df['parsed_html'])
    df = df.withColumn('vector', encode_udf(df[column_to_encode]))
    df = df.drop(column_to_encode)
    return df


# BERT 

In [None]:
# саму модельку грузим

import torch
from transformers import AutoTokenizer, AutoModel

tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased")
model = AutoModel.from_pretrained("DeepPavlov/rubert-base-cased").to('cuda')

У этой модельки ограничение на длинну контекста 512 токенов. Рассматривал 4 способа векторизации: 
* embed_bert_cls - подаём модель 512 первых букв и берём CLS токен
* embed_bert_cls2 - подаём модель 512 первых токенов и берём CLS токен
* embed_bert_cls3 - режим текст на куски 512 токенов, получаем их CLS и берём сумму 
* embed_bert_cls4 - режим текст на куски 512 токенов, получаем их CLS и берём mean 

In [None]:
def embed_bert_cls(text, model, tokenizer):
    temp_text = text[:512]
    t = tokenizer(temp_text, padding=True, truncation=True, return_tensors='pt')
    with torch.no_grad():
        model_output = model(**{k: v.to(model.device) for k, v in t.items()})
    embeddings = model_output.last_hidden_state[:, 0, :]
    embeddings = torch.nn.functional.normalize(embeddings)
    return embeddings[0].cpu().numpy()

In [None]:
def embed_bert_cls2(text, model, tokenizer):
    t = tokenizer(text, padding=True, truncation=True, return_tensors='pt')
    with torch.no_grad():
        model_output = model(**{k: v[:, :512].to(model.device) for k, v in t.items()})
    embeddings = model_output.last_hidden_state[:, 0, :]
    embeddings = torch.nn.functional.normalize(embeddings)
    return embeddings[0].cpu().numpy()

print(embed_bert_cls('привет мир! Как житуха, как дела?', model, tokenizer).shape )

In [None]:
def embed_bert_cls3(text, model, tokenizer, max_length=512):
    tokens = tokenizer(text, padding=False, truncation=False, return_tensors='pt')
    input_ids = tokens['input_ids'][0]
    attention_mask = tokens['attention_mask'][0]
    
    num_chunks = (len(input_ids) + max_length - 1) // max_length  # вычисляем количество частей
    
    embeddings = []
    with torch.no_grad():
        for i in range(num_chunks):
            chunk_input_ids = input_ids[i * max_length: (i + 1) * max_length].unsqueeze(0).to(model.device)
            chunk_attention_mask = attention_mask[i * max_length: (i + 1) * max_length].unsqueeze(0).to(model.device)
            
            model_output = model(input_ids=chunk_input_ids, attention_mask=chunk_attention_mask)
            chunk_embedding = model_output.last_hidden_state[:, 0, :]
            embeddings.append(chunk_embedding)
    
    # Суммируем все полученные CLS токены и нормализуем
    total_embedding = torch.sum(torch.stack(embeddings), dim=0)
    total_embedding = torch.nn.functional.normalize(total_embedding)
    
    return total_embedding[0].cpu().numpy()

# Пример использования
text = "Твой текст здесь"
embedding = embed_bert_cls3(text, model, tokenizer)
print(embedding.shape)

In [None]:
def embed_bert_cls4(text, model, tokenizer, max_length=512):
    tokens = tokenizer(text, padding=False, truncation=False, return_tensors='pt')
    input_ids = tokens['input_ids'][0]
    attention_mask = tokens['attention_mask'][0]
    
    num_chunks = (len(input_ids) + max_length - 1) // max_length  # вычисляем количество частей
    
    embeddings = []
    with torch.no_grad():
        for i in range(num_chunks):
            chunk_input_ids = input_ids[i * max_length: (i + 1) * max_length].unsqueeze(0).to(model.device)
            chunk_attention_mask = attention_mask[i * max_length: (i + 1) * max_length].unsqueeze(0).to(model.device)
            
            model_output = model(input_ids=chunk_input_ids, attention_mask=chunk_attention_mask)
            chunk_embedding = model_output.last_hidden_state[:, 0, :]
            embeddings.append(chunk_embedding)
    
    # Суммируем все полученные CLS токены и нормализуем
    total_embedding = torch.mean(torch.stack(embeddings), dim=0)
    total_embedding = torch.nn.functional.normalize(total_embedding)
    
    return total_embedding[0].cpu().numpy()

# Пример использования
text = "Твой текст здесь Твой текст здесь Твой текст здесь"
embedding = embed_bert_cls4(text, model, tokenizer)
print(embedding.shape)


In [None]:
import pandas as pd
from tqdm import tqdm, tqdm_pandas
from sklearn.metrics.pairwise import cosine_similarity as cos_sim
import warnings
warnings.filterwarnings('ignore')


tqdm.pandas()


st_to_encode = 'site_text' # имя столбца по которому будет строиться эмбединг сайта 
ad_to_encode = 'ad_text'   # имя столбца по которому будет строиться эмбединг рекламы
st_encodes = 'site_vec'    # имя столбца в котором будет храниться вектор сайта
ad_encodes = 'ad_vec'      # имя столбца в котором будет храниться вектор рекламы
metric_name = 'cos_sim'    # имя метрики метрики

f = embed_bert_cls4        # функция которая будет выдавать эмбединги 

df[st_encodes] = None
df.loc[df[st_to_encode].notna(), st_encodes] = df[df[st_to_encode].notna()][st_to_encode].progress_apply(lambda x: np.array(f(x, model, tokenizer)))


df[ad_encodes] = None
df.loc[df[ad_to_encode].notna(), ad_encodes] = df[df[ad_to_encode].notna()][ad_to_encode].progress_apply(lambda x: np.array(f(x, model, tokenizer)))


df[metric_name] = None
df.loc[df[st_encodes].notna() & df[ad_encodes].notna(), metric_name] = df[df[st_encodes].notna() & df[ad_encodes].notna()][[st_encodes, ad_encodes]].apply(lambda x: cos_sim([x[0]], [x[1]])[0][0], axis=1)
df.head(1)

# tf-idf

In [None]:
ad_text_to_encode_column_name   = 'ad_og_content'
site_text_to_encode_column_name = 'site_og_content'

ad_encode_column_name   = 'ad_tf_idf_og'
site_encode_column_name = 'site_tf_idf_og'



site_dict = ddf[['site_id', site_text_to_encode_column_name]].drop_duplicates().set_index('site_id')[site_text_to_encode_column_name].to_dict()
ad_dict = ddf[['ad_id', ad_text_to_encode_column_name]].drop_duplicates().set_index('ad_id')[ad_text_to_encode_column_name].to_dict()

corpus = list(site_dict.values()) + list(ad_dict.values())

vectorizer = TfidfVectorizer(use_idf=True, smooth_idf=False, norm=None)
idf_matrix = vectorizer.fit_transform(corpus)
idf_values = vectorizer.idf_

list_idf_vec = [i for i in idf_matrix]

site_list_len = len(list(site_dict.values()))
ad_list_len   = len(list(ad_dict.values()))

site_vec = list_idf_vec[:site_list_len]
ad_vec   = list_idf_vec[site_list_len:]

site_id_vec_dict = {key: value for key, value in zip(site_dict.keys(), site_vec)}
ad_id_vec_dict   = {key: value for key, value in zip(ad_dict.keys(), ad_vec)}



ddf[ad_encode_column_name] = None
ddf[site_encode_column_name] = None

ddf[ad_encode_column_name] = ddf.ad_id.apply(lambda x: ad_id_vec_dict[x])
ddf[site_encode_column_name] = ddf.site_id.apply(lambda x: site_id_vec_dict[x])

# eval

In [2]:
df = pd.read_json('/usr/home/ymmorozov/urls/data_select/new_metrics.json') 

In [9]:
ddf = df[df.cos_sim3.notna()]

In [8]:
scores_col_name = 'cos_sim3' # имя колонки явл являющаяся оценкой моделью релевантности рекламы сайту 


ddf = ddf[['ad_id', 'site_id', "impression_count", 'clicks_count', scores_col_name]]

ddf['click_flg'] = ddf.apply(lambda x: [1] * int(x[3]) + [0] * int(x[2] - x[3]), axis=1)
ddf = ddf.explode('click_flg')



y_true = ddf.click_flg.tolist()
y_scores = ddf[scores_col_name].tolist()

auc_roc = roc_auc_score(y_true, y_scores)
auc_pr = average_precision_score(y_true, y_scores)

print(auc_roc, auc_pr) 

0.5252636875500968 0.002783120198903399


In [89]:
def evaluate(df, scores_col_name, true_col_name=None):
    ddf = df[df[scores_col_name].notna()]
    
    if true_col_name is None: 
        ddf = ddf[['ad_id', 'site_id', "impression_count", 'clicks_count', scores_col_name]]
        ddf['click_flg'] = ddf.apply(lambda x: [1] * int(x[3]) + [0] * int(x[2] - x[3]), axis=1)
        ddf = ddf.explode('click_flg')
        true_col_name = 'click_flg'
    else: 
        ddf = ddf[ddf[true_col_name].notna()]
        ddf = ddf[['ad_id', 'site_id', true_col_name, scores_col_name]]

    
    def auc_count(temp_df, temp_scores, temp_true): 
        y_true = temp_df[temp_true].tolist()
        y_scores = temp_df[temp_scores].tolist()
        auc_roc = roc_auc_score(y_true, y_scores)
        auc_pr = average_precision_score(y_true, y_scores)
        return auc_roc, auc_pr 

    ddf['true_order'] = list(range(ddf.shape[0]))
    ddf['reverse_order'] = list(range(ddf.shape[0]))[::-1]
    ordinary_auc_roc, ordinary_auc_pr = auc_count(ddf, scores_col_name, true_col_name)
    max_auc_roc, max_auc_pr = auc_count(ddf, 'true_order', true_col_name)
    min_auc_roc, min_auc_pr = auc_count(ddf, 'reverse_order', true_col_name)
    
    roc_border = (min_auc_roc + (1 - max_auc_roc)) / 2
    normalized_auc_roc = (ordinary_auc_roc - roc_border) / (1 - 2 * roc_border)

    res = {
        'auc_roc' : {
            'ordinary' : ordinary_auc_roc,
            'max' : max_auc_roc,
            'min' : min_auc_roc,
            'normalized' : normalized_auc_roc,
        },
        'auc_pr' : {
            'ordinary' : ordinary_auc_pr,
            'max' : max_auc_pr,
            'min' : min_auc_pr,
        },
    }
    return res

In [90]:
ddf = df[df.cos_sim3.notna()]
evaluate(ddf, 'cos_sim3')

{'auc_roc': {'ordinary': 0.5252636875500968,
  'max': 0.7896248996726905,
  'min': 0.21037510032730955,
  'normalized': 0.5436144951256741},
 'auc_pr': {'ordinary': 0.002783120198903399,
  'max': 0.009733642307554781,
  'min': 0.0014592577487050986}}

In [None]:
"""
def evaluate_by_groups(df, scores_col_name, true_col_name=None):
    
    ddf = df[df[scores_col_name].notna()]
    
    if true_col_name is None: 
        ddf = ddf[['ad_id', 'site_id', "impression_count", 'clicks_count', scores_col_name]]
        ddf['click_flg'] = ddf.apply(lambda x: [1] * int(x[3]) + [0] * int(x[2] - x[3]), axis=1)
        ddf = ddf.explode('click_flg')
        true_col_name = 'click_flg'
    else: 
        ddf = ddf[ddf[true_col_name].notna()]
        ddf = ddf[['ad_id', 'site_id', true_col_name, scores_col_name]]
        
        
        
    def subgroup_auc(temp_df, temp_scores, temp_true):
        if temp_df[temp_true].sum() == 0 or temp_df.shape[0] == 0:
            return None
        metrict = evaluate(temp_df, temp_scores, temp_true)
        return pd.Series({'impression_count' : len(y_true),
                          'ord_auc_roc': metrict['auc_roc']['ordinary'],
                          'max_auc_roc': metrict['auc_roc']['max'],
                          'min_auc_roc': metrict['auc_roc']['min'],
                          'norm_auc_roc': metrict['auc_roc']['normalized'],

                          'ord_auc_pr': metrict['auc_pr']['ordinary'],
                          'max_auc_pr': metrict['auc_pr']['max'],
                          'min_auc_pr': metrict['auc_pr']['min'],
                            })  
    
    
    result = ddf.groupby('ad_id').apply(lambda x: subgroup_auc(x, scores_col_name, true_col_name) ).reset_index()
    return result
    
res = evaluate_by_groups(df, 'cos_sim3')
res
"""