# Part 1
Data is already in hdfs, we will use it

In [1]:
! hdfs dfs -ls /user/hw2/data

Found 7 items
-rw-r--r--   1 ubuntu hadoop  3214004224 2023-03-02 19:23 /user/hw2/data/AdsInfo.tsv
-rw-r--r--   1 ubuntu hadoop  9469373867 2023-02-11 12:57 /user/hw2/data/SearchInfo.tsv
-rw-r--r--   1 ubuntu hadoop   104614699 2023-02-13 10:30 /user/hw2/data/UserInfo.tsv
-rw-r--r--   1 ubuntu hadoop     1413261 2023-03-03 10:01 /user/hw2/data/test_AdsInfo.tsv
-rw-r--r--   1 ubuntu hadoop      807058 2023-03-03 10:01 /user/hw2/data/test_SearchInfo.tsv
-rw-r--r--   1 ubuntu hadoop      237259 2023-03-03 10:01 /user/hw2/data/test_trainSearchStream.tsv
-rw-r--r--   1 ubuntu hadoop 11023566785 2023-02-11 22:54 /user/hw2/data/trainSearchStream.tsv


In [11]:
import pandas as pd
pd.set_option('display.max_columns', 500)

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="lsml-mhw-3")

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
se = SparkSession(sc)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-03-04 19:09:44,963 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2023-03-04 19:09:57,700 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2023-03-04 19:09:57,720 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpo

In [5]:
data = se.read.option("mode", "DROPMALFORMED").option('sep', '\t').csv("/user/hw2/data/trainSearchStream.tsv", header=True, inferSchema=True)
data.printSchema()

searchinfo = se.read.option("mode", "DROPMALFORMED").option('sep', '\t').csv("/user/hw2/data/SearchInfo.tsv", header=True, inferSchema=True)
searchinfo.printSchema()

adsinfo = se.read.option("mode", "DROPMALFORMED").option('sep', '\t').csv("/user/hw2/data/AdsInfo.tsv", header=True, inferSchema=True)
adsinfo.printSchema()

                                                                                

root
 |-- SearchID: integer (nullable = true)
 |-- AdID: integer (nullable = true)
 |-- Position: integer (nullable = true)
 |-- ObjectType: integer (nullable = true)
 |-- HistCTR: double (nullable = true)
 |-- IsClick: integer (nullable = true)



                                                                                

root
 |-- SearchID: integer (nullable = true)
 |-- SearchDate: string (nullable = true)
 |-- IPID: integer (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- IsUserLoggedOn: integer (nullable = true)
 |-- SearchQuery: string (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- SearchParams: string (nullable = true)





root
 |-- AdID: integer (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- Params: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Title: string (nullable = true)
 |-- IsContext: integer (nullable = true)



                                                                                

In [6]:
! hdfs dfs -rm -r /user/hw2/parquet
! hdfs dfs -mkdir -p /user/hw2/parquet

data.write.parquet("/user/hw2/parquet/data.parquet")
data_parquet = se.read.parquet("/user/hw2/parquet/data.parquet")
data_parquet.printSchema()
data = data_parquet

adsinfo.write.parquet("/user/hw2/parquet/adsinfo.parquet")
adsinfo_parquet = se.read.parquet("/user/hw2/parquet/adsinfo.parquet")
adsinfo_parquet.printSchema()
adsinfo = adsinfo_parquet

searchinfo.write.parquet("/user/hw2/parquet/searchinfo.parquet")
searchinfo_parquet = se.read.parquet("/user/hw2/parquet/searchinfo.parquet")
searchinfo_parquet.printSchema()
searchinfo = searchinfo_parquet

Deleted /user/hw2/parquet


                                                                                

root
 |-- SearchID: integer (nullable = true)
 |-- AdID: integer (nullable = true)
 |-- Position: integer (nullable = true)
 |-- ObjectType: integer (nullable = true)
 |-- HistCTR: double (nullable = true)
 |-- IsClick: integer (nullable = true)



                                                                                

root
 |-- AdID: integer (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- Params: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Title: string (nullable = true)
 |-- IsContext: integer (nullable = true)



                                                                                

root
 |-- SearchID: integer (nullable = true)
 |-- SearchDate: string (nullable = true)
 |-- IPID: integer (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- IsUserLoggedOn: integer (nullable = true)
 |-- SearchQuery: string (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- SearchParams: string (nullable = true)



In [7]:
data.registerTempTable("searchstream")
adsinfo.registerTempTable("adsinfo")
searchinfo.registerTempTable("searchinfo")

In [8]:
searchstreamfeatures = se.sql("""
    SELECT SearchID as search_id, AdID as f_cat_ad_id, Position as f_num_position, HistCTR as f_num_hist_ctr, IsClick as target
    FROM searchstream
    WHERE ObjectType = 3
""")

searchinfofeatures = se.sql("""
    SELECT SearchID as search_id, SearchQuery as f_cat_search_query, UserID as f_cat_user_id, SearchParams as f_cat_search_params, CategoryID as f_cat_search_category
    FROM searchinfo
""")

adsinfofeatures = se.sql("""
    SELECT AdID as f_cat_ad_id, CategoryID as f_cat_ad_category, Title as f_cat_ad_title, Price as f_num_ad_price
    FROM adsinfo
""")

Предобрабатываем тексты

In [9]:
import string
import re

allowed = set(string.ascii_letters + string.digits + " ," +
              u"АаБбВвГгДдЕеЁёЖжЗзИиЙйКкЛлМмНнОоПпРрСсТтУуФфХхЦцЧчШшЩщЪъЫыЬьЭэЮюЯя")

def slugify_ohe(text):
    if not text:
        return ""
    text = "".join([ch for ch in text.lower() if ch in allowed])
    return text

f_slugify_ohe = se.udf.register("slugify_ohe", slugify_ohe, "string")

def parse_search_params(text):
    if not text:
        return ""
    pattern = re.compile(r"('.+?')")
    params = pattern.findall(text)
    return " ".join(["".join([ch for ch in s.lower() if ch in allowed]).replace(" ", "_")
            for s in pattern.findall(text)])

f_parse_search_params = se.udf.register("parse_search_params", parse_search_params, "string")



adsinfofeatures.select(
    "f_cat_ad_id", "f_cat_ad_category", f_slugify_ohe(adsinfofeatures.f_cat_ad_title).alias("f_cat_ad_title"),
    "f_num_ad_price").registerTempTable("adsinfofeatures")
adsinfofeatures = se.sql("""
    SELECT *
    FROM adsinfofeatures
""")

searchinfofeatures.select(
    "search_id", f_slugify_ohe(searchinfofeatures.f_cat_search_query).alias("f_cat_search_query"),
    "f_cat_user_id", f_parse_search_params(searchinfofeatures.f_cat_search_params).alias("f_cat_search_params"),
    "f_cat_search_category").registerTempTable("searchinfofeatures")
searchinfofeatures = se.sql("""
    SELECT *
    FROM searchinfofeatures
""")

searchstreamfeatures.registerTempTable("searchstreamfeatures")

In [10]:
bias = searchstreamfeatures.select('search_id',
                                   F.when(F.col('search_id') == F.col('search_id'), 1).alias('f_bias')).distinct()
bias.registerTempTable("bias")

2023-03-04 16:05:45,094 WARN sql.Column: Constructing trivially true equals predicate, ''search_id = 'search_id'. Perhaps you need to use aliases.


In [11]:
bias.select("*").limit(10).toPandas()

                                                                                

Unnamed: 0,search_id,f_bias
0,4360759,1
1,4360862,1
2,4361152,1
3,4361362,1
4,4361517,1
5,4361591,1
6,4361951,1
7,4362155,1
8,4362323,1
9,4362540,1


In [12]:
searchinfofeatures.select("*").limit(10).toPandas()

                                                                                

Unnamed: 0,search_id,f_cat_search_query,f_cat_user_id,f_cat_search_params,f_cat_search_category
0,1,,3640266,,5
1,2,,769304,,50
2,3,,640089,,12
3,4,,3573776,обувь женская_одежда 38,22
4,5,,320674,,1
5,6,,1665156,,27
6,7,,3434614,,500001
7,8,,905821,,12
8,9,,1106541,,4
9,10,,3310798,,12


In [13]:
searchstreamfeatures.select("*").limit(10).toPandas()

                                                                                

Unnamed: 0,search_id,f_cat_ad_id,f_num_position,f_num_hist_ctr,target
0,2,11441863,1,0.001804,0
1,2,22968355,7,0.004723,0
2,3,212187,7,0.029701,0
3,3,34084553,1,0.0043,0
4,4,20653823,1,0.003049,0
5,5,11219482,1,0.043897,0
6,5,13375896,7,0.001563,0
7,6,6303835,1,0.007044,0
8,6,28312593,7,0.002199,0
9,8,24728248,1,0.003647,0


In [14]:
adsinfofeatures.select("*").limit(10).toPandas()

                                                                                

Unnamed: 0,f_cat_ad_id,f_cat_ad_category,f_cat_ad_title,f_num_ad_price
0,1,43,"toyota estima, 1993",160000.0
1,2,34,передние брызговики форд фокус 2 родные,750.0
2,3,53,дровокол,18000.0
3,4,57,продам ходули складные,1500.0
4,5,34,поворотник r carina 20317,800.0
5,6,34,ваз дверь 2106 передняя левая,0.0
6,7,26,видео,30.0
7,8,22,дизайнерское платье,7000.0
8,9,47,светодиодный конструктор lite brix подиум,2460.0
9,10,54,"3к квартира, 92 м, 44 эт",7000000.0


In [20]:
dataset_df = searchstreamfeatures.join(searchinfofeatures,
                                       ["search_id"], "left")\
            .join(adsinfofeatures, ["f_cat_ad_id"], "left")\
            .join(bias, ["search_id"], "left")
dataset_df = dataset_df.drop(adsinfofeatures.f_cat_ad_id)

assert dataset_df.count() == searchstreamfeatures.count()

                                                                                

In [21]:
dataset_df.count(), searchstreamfeatures.count()

                                                                                

(190157735, 190157735)

In [22]:
cols = dataset_df.columns

non_features_c = [
    c for c in cols
    if not (c == 'target' or c.startswith('f_'))
]

se.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
dataset_df.drop(*non_features_c).write.parquet("/user/hw2/parquet/dataset.parquet")
dataset_fd = se.read.parquet('/user/hw2/parquet/dataset.parquet')

                                                                                

In [24]:
f_real_cols = [f for f in dataset_fd.columns if f.startswith('f') and not f.startswith('f_cat') and f != 'f_bias']
f_cat_cols = [f for f in dataset_fd.columns if f.startswith('f') and (f.startswith('f_cat') or f == 'f_bias')]

exps_mean = [
    F.mean(c).alias('{}_mean'.format(c))
    for c in f_real_cols
]

exps_dev = [
    F.stddev(c).alias('{}_dev'.format(c))
    for c in f_real_cols
]

norm_f = dataset_fd.select(*exps_mean, *exps_dev).rdd.take(1)
norm_dict = norm_f[0].asDict()
print(norm_dict)

exps = [
    (
        (F.col(c) - norm_dict["{}_mean".format(c)]) / (1 + norm_dict["{}_dev".format(c)])
    ).alias(c)
    for c in f_real_cols
]

dataset_fd.select("target", *f_cat_cols, *exps).limit(10).toPandas()

                                                                                

{'f_num_position_mean': 3.5966107873550346, 'f_num_hist_ctr_mean': 0.010295077818140446, 'f_num_ad_price_mean': 19687.08405264681, 'f_num_position_dev': 2.972755824078931, 'f_num_hist_ctr_dev': 0.015561120810695718, 'f_num_ad_price_dev': 141027.54172019535}


Unnamed: 0,target,f_cat_ad_id,f_cat_search_query,f_cat_user_id,f_cat_search_params,f_cat_search_category,f_cat_ad_category,f_cat_ad_title,f_bias,f_num_position,f_num_hist_ctr,f_num_ad_price
0,0,26654369,платье,70934,платья_и_юбки женская_одежда 4648_l,22,,,1,-0.653604,-0.008303,
1,0,29080446,платье,70934,платья_и_юбки женская_одежда 4648_l,22,,,1,0.856682,-0.003335,
2,0,25863656,,2842273,туризм,5,,,1,0.856682,0.003995,
3,0,34514394,,2842273,туризм,5,,,1,-0.653604,0.025578,
4,0,22884453,,3478915,,22,,,1,-0.653604,0.019101,
5,0,36419838,,3478915,,22,,,1,0.856682,-0.00657,
6,0,32660272,,1634102,мелкая_кухонная_техника для_кухни,27,,,1,-0.653604,-0.003375,
7,0,3843783,,1634102,мелкая_кухонная_техника для_кухни,27,27.0,мультиварка redmond rmcm4502,1,0.856682,-0.008886,-0.10415
8,0,20297160,,1313625,часы,11,11.0,наручные часы timex t49828,1,-0.653604,0.016191,-0.090528
9,0,6942172,,1313625,часы,11,11.0,наручные часы citizen as202011h,1,0.856682,0.001687,0.15779


In [25]:
train_df, test_df = dataset_fd.select("target", *f_cat_cols, *exps).randomSplit([0.8, 0.2], 432)
train_df.write.parquet("/user/hw2/parquet/train_df.parquet")
test_df.write.parquet("/user/hw2/parquet/test_df.parquet")

                                                                                

In [23]:
train = train_df.rdd.map(lambda row : f"{row.target - (1 - row.target)}" + 
         "".join([f" |{cat_col} {getattr(row, cat_col)}"
                  for cat_col in f_cat_cols if (
                      getattr(row, cat_col) is not None and getattr(row, cat_col) not in ["", "NaN", "None"])]) + 
         "".join([f" |{real_col}:{getattr(row, real_col)}" for real_col in f_real_cols if (
                      getattr(row, real_col) is not None and getattr(row, real_col) not in ["", "NaN", "None"])])).cache()
test = test_df.rdd.map(lambda row : f"{row.target - (1 - row.target)}" + 
         "".join([f" |{cat_col} {getattr(row, cat_col)}"
                  for cat_col in f_cat_cols if (
                      getattr(row, cat_col) is not None and getattr(row, cat_col) not in ["", "NaN", "None"])]) + 
         "".join([f" |{real_col}:{getattr(row, real_col)}" for real_col in f_real_cols if (
                      getattr(row, real_col) is not None and getattr(row, real_col) not in ["", "NaN", "None"])])).cache()

In [None]:
! hdfs dfs -rm -r /user/vw_train_data
! hdfs dfs -rm -r /user/vw_test_data
train.saveAsTextFile("/user/vw_train_data")
test.saveAsTextFile("/user/vw_test_data")

In [None]:
# now, do the same with test dataset

test_data = se.read.option("mode", "DROPMALFORMED").option('sep', '\t').csv("/user/hw2/data/testSearchStream.tsv", header=True, inferSchema=True)
test_data.printSchema()

test_data.write.parquet("/user/hw2/parquet/test_data.parquet")
test_data_parquet = se.read.parquet("/user/hw2/parquet/test_data.parquet")
test_data_parquet.printSchema()
test_data = test_data_parquet

test_data.registerTempTable("test_searchstream")

test_searchstreamfeatures = se.sql("""
    SELECT SearchID as search_id, AdID as f_cat_ad_id, Position as f_num_position, HistCTR as f_num_hist_ctr
    FROM test_searchstream
""")
test_searchstreamfeatures.registerTempTable("test_searchstreamfeatures")

test_bias = test_searchstreamfeatures.select('search_id',
                                   F.when(F.col('search_id') == F.col('search_id'), 1).alias('f_bias')).distinct()
test_bias.registerTempTable("test_bias")

test_dataset_df = test_searchstreamfeatures.join(searchinfofeatures,
                                       ["search_id"], "left")\
            .join(adsinfofeatures, ["f_cat_ad_id"], "left")\
            .join(test_bias, ["search_id"], "left")
test_dataset_df = test_dataset_df.drop(adsinfofeatures.f_cat_ad_id)

assert test_dataset_df.count() == test_searchstreamfeatures.count()
# print(f"Number of rows: {test_dataset_df.count()}")

cols = test_dataset_df.columns

test_dataset_df.drop(*non_features_c).write.parquet("/user/hw2/parquet/test_dataset.parquet")
test_dataset_fd = se.read.parquet('/user/hw2/parquet/test_dataset.parquet')

# print("Norm dict", norm_dict)

exps = [
    (
        (F.col(c) - norm_dict["{}_mean".format(c)]) / (1 + norm_dict["{}_dev".format(c)])
    ).alias(c)
    for c in f_real_cols
]

# print("First 10 rows of dataset", test_dataset_fd.select(*f_cat_cols, *exps).limit(10).toPandas())

test_dataset_fd.select(*f_cat_cols, *exps).write.parquet("/user/hw2/parquet/real_test_df.parquet")

real_test = test_dataset_fd.select(*f_cat_cols, *exps).rdd.map(lambda row : f"-1" + 
         "".join([f" | {cat_col} {getattr(row, cat_col)}" for cat_col in f_cat_cols if getattr(row, cat_col) != ""]) + 
         "".join([f" | {real_col}:{getattr(row, real_col)}" for real_col in f_real_cols])).cache()
         
! hdfs dfs -rm -r /user/vw_real_test_data
real_test.saveAsTextFile("/user/vw_real_test_data")

# Part 2

In [None]:
! sudo wget http://finance.yendor.com/ML/VW/Binaries/vw-8.20190624 -O /usr/bin/vw
! sudo chmod +x /usr/bin/vw
! sudo chown ubuntu /usr/bin/vw
! sudo apt-get update -y && sudo apt-get install graphviz -y
! pip install numpy pandas scikit-learn dateparser pandarallel ipywidgets catboost graphviz
! /opt/conda/bin/jupyter nbextension enable --py widgetsnbextension

In [26]:
# Проверяем, что vw работает
! vw --help | head

Num weight bits = 18
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = 
num sources = 1
driver:
  --onethread           Disable parse thread
VW options:
  --ring_size arg (=256, ) size of example ring
  --strict_parse           throw on malformed examples
Update options:
  -l [ --learning_rate ] arg Set learning rate
  --power_t arg              t power value
  --decay_learning_rate arg  Set Decay factor for learning_rate between passes
  --initial_t arg            initial t value


In [27]:
# dont have time for whole dataset, sorryy (
# ! hdfs dfs -cat /user/vw_test_data/* > test.vw
! hdfs dfs -cat /user/vw_train_data/part-00000 > train.vw



In [36]:
! hdfs dfs -cat /user/vw_test_data/part-00000 > test.vw

[Stage 7:==>(23 + 2) / 25][Stage 8:=> (11 + 4) / 25][Stage 9:>   (0 + 0) / 25]

In [28]:
! vw --final_regressor clicks.model.bin \
    -d train.vw \
    --loss_function logistic \
    --learning_rate 20.0 \
    --bit_precision 23 \
    --passes 5 \
    --cache -k

final_regressor = clicks.model.bin
Num weight bits = 23
learning rate = 20
initial_t = 0
power_t = 0.5
decay_learning_rate = 1
creating cache_file = train.vw.cache
Reading datafile = train.vw
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.693147 0.693147            1            1.0  -1.0000   0.0000       22
0.420946 0.148745            2            2.0  -1.0000  -1.8302       20
0.211014 0.001083            4            4.0  -1.0000  -6.6177       20
0.105840 0.000666            8            8.0  -1.0000  -7.3022       20
0.053190 0.000539           16           16.0  -1.0000  -7.5376       20
0.026823 0.000455           32           32.0  -1.0000  -7.4487       20
0.013898 0.000974           64           64.0  -1.0000  -8.5800       20
0.007161 0.000423          128          128.0  -1.0000 -12.9176       20
0.003589 0.000018          256          256.0  -1.0000 -13.795



0.000000 0.000000      4194304      4194304.0  -1.0000 -15.7833       11
0.000018 0.000018      8388608      8388608.0  -1.0000 -21.3361       23 h
0.000123 0.000228     16777216     16777216.0  -1.0000 -17.1289       24 h

finished run
number of examples per pass = 5488368
passes used = 4
weighted example sum = 21953472.000000
weighted label sum = -21689128.000000
average loss = 0.000027 h
best constant = -5.106538
best constant's loss = 0.036783
total feature number = 391510608


In [37]:
! vw --binary --testonly --initial_regressor clicks.model.bin --predictions clicks_bin.predictions.txt test.vw

only testing
predictions = clicks_bin.predictions.txt
Num weight bits = 23
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = test.vw
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
1.000000 1.000000            1            1.0  -1.0000   1.0000       22
1.000000 1.000000            2            2.0  -1.0000   1.0000       20
1.000000 1.000000            4            4.0  -1.0000   1.0000       20
1.000000 1.000000            8            8.0  -1.0000   1.0000       20
1.000000 1.000000           16           16.0  -1.0000   1.0000       20
1.000000 1.000000           32           32.0  -1.0000   1.0000       20
1.000000 1.000000           64           64.0  -1.0000   1.0000       22
1.000000 1.000000          128          128.0  -1.0000   1.0000       19
1.000000 1.000000          256          256.0  -1.0000   1.0000       23
1.000000 1.00000

[Stage 7:==>(23 + 2) / 25][Stage 8:=> (13 + 4) / 25][Stage 9:>   (0 + 0) / 25]

0.999998 1.000000      2097152      2097152.0  -1.0000   1.0000       18
0.999998 0.999999      4194304      4194304.0  -1.0000   1.0000       13

finished run
number of examples = 6098186
weighted example sum = 6098186.000000
weighted label sum = -6024758.000000
average loss = 0.993978
best constant = -0.987959
best constant's loss = 0.023937
total feature number = 108753043
