# 1 Делаем признаковое пространство

Для решения задачи создадим 8 следующих логических признаков:
1. Search Stream - position (вполне вероятно, что место расположения контекстной рекламы оказывает влияние на итоговый клик)
2. Search Stream - histCTR (допускаю, что имеющаяся оценка вероятности кликов может быть хорошим бейзлайном для классификации, её существование в модели можно расценивать как композицию имеющегося наивного подхода с реализуемым мной)
3. deviceId или agentOSId (используется как своеобразная категориальная метка богатства юзера и на основе этого можно сделать какие-то далекоидущие выводы по кликам по контекстной рекламе)
4. Совокупность бинарных меток совпадения Params из AdsInfo и SearchInfo
5. Мера близости между Title из AdsInfo и Search Query из SearchInfo
6. бинарные метки соотнесённости category из AdsInfo и SearchInfo
7. бинарные метки соотнесённости  location level из AdsInfo и SearchInfo
8. Бинарная метка is User Logged On

## Обработаем файлики и сложим в паркет

In [12]:
!ls 

AdsInfo.tsv   MHW3.ipynb	    trainSearchStream.tsv  VisitsStreamRaw.tsv
Category.tsv  SearchInfo.tsv	    Untitled.ipynb
Location.tsv  testSearchStream.tsv  UserInfo.tsv


In [2]:
! pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [9]:
! hdfs dfs -ls /user/avito/data/

Found 1 items
-rw-r--r--   3 ubuntu hadoop 13180996366 2022-02-21 15:24 /user/avito/data/VisitsStreamRaw.tsv


In [11]:
%%time
! hdfs dfs -put AdsInfo.tsv /user/avito/data/
! hdfs dfs -put Category.tsv /user/avito/data/
! hdfs dfs -put Location.tsv /user/avito/data/
! hdfs dfs -put SearchInfo.tsv /user/avito/data/
! hdfs dfs -put trainSearchStream.tsv /user/avito/data/
! hdfs dfs -put testSearchStream.tsv /user/avito/data/
! hdfs dfs -put UserInfo.tsv /user/avito/data/

put: `/user/avito/data/Category.tsv': File exists
put: `/user/avito/data/Location.tsv': File exists
CPU times: user 13.8 s, sys: 3.19 s, total: 17 s
Wall time: 14min 3s


In [12]:
! sudo -u hdfs hdfs balancer 

2022-03-15 00:32:56,249 INFO balancer.Balancer: namenodes  = [hdfs://rc1a-dataproc-m-5qpakob61ma7t4ug.mdb.yandexcloud.net]
2022-03-15 00:32:56,253 INFO balancer.Balancer: parameters = Balancer.BalancerParameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, #excluded nodes = 0, #included nodes = 0, #source nodes = 0, #blockpools = 0, run during upgrade = false]
2022-03-15 00:32:56,253 INFO balancer.Balancer: included nodes = []
2022-03-15 00:32:56,253 INFO balancer.Balancer: excluded nodes = []
2022-03-15 00:32:56,253 INFO balancer.Balancer: source nodes = []
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved  NameNode
2022-03-15 00:32:56,258 INFO balancer.NameNodeConnector: getBlocks calls for hdfs://rc1a-dataproc-m-5qpakob61ma7t4ug.mdb.yandexcloud.net will be rate-limited to 20 per second
2022-03-15 00:32:57,569 INFO balancer.Balancer: dfs.namenode.get-blocks.max-qps = 20 (default=20)
2022-03-15 00:32:57,570 INFO balance

In [15]:
! hdfs dfs -ls /user/avito/data/

Found 8 items
-rw-r--r--   3 ubuntu hadoop  5350670676 2022-03-15 00:22 /user/avito/data/AdsInfo.tsv
-rw-r--r--   3 ubuntu hadoop         752 2022-03-15 00:18 /user/avito/data/Category.tsv
-rw-r--r--   3 ubuntu hadoop       58976 2022-03-15 00:18 /user/avito/data/Location.tsv
-rw-r--r--   3 ubuntu hadoop  9469373867 2022-03-15 00:28 /user/avito/data/SearchInfo.tsv
-rw-r--r--   3 ubuntu hadoop   104614699 2022-03-15 00:32 /user/avito/data/UserInfo.tsv
-rw-r--r--   3 ubuntu hadoop 13180996366 2022-02-21 15:24 /user/avito/data/VisitsStreamRaw.tsv
-rw-r--r--   3 ubuntu hadoop   557829937 2022-03-15 00:32 /user/avito/data/testSearchStream.tsv
-rw-r--r--   3 ubuntu hadoop  5648044032 2022-03-15 00:32 /user/avito/data/trainSearchStream.tsv


In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="lsml-mhw-3")

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-03-15 19:43:16,310 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

In [18]:
ads_info = se.read.csv("/user/avito/data/AdsInfo.tsv", header=True, inferSchema=True, sep='\t')
category = se.read.csv("/user/avito/data/Category.tsv", header=True, inferSchema=True, sep='\t')
location = se.read.csv("/user/avito/data/Location.tsv", header=True, inferSchema=True, sep='\t')
search_info = se.read.csv("/user/avito/data/SearchInfo.tsv", header=True, inferSchema=True, sep='\t')
user_info = se.read.csv("/user/avito/data/UserInfo.tsv", header=True, inferSchema=True, sep='\t')
visits_stream_no_header = se.read.csv("/user/avito/data/VisitsStreamRaw.tsv", header=False, inferSchema=True, sep='\t')
train_search_stream = se.read.csv("/user/avito/data/trainSearchStream.tsv", header=True, inferSchema=True, sep='\t')
test_search_stream = se.read.csv("/user/avito/data/testSearchStream.tsv", header=True, inferSchema=True, sep='\t')

In [20]:
ads_info.printSchema()
category.printSchema()
location.printSchema()
search_info.printSchema()
user_info.printSchema()
visits_stream_no_header.printSchema()
train_search_stream.printSchema()
test_search_stream.printSchema()

root
 |-- AdID: integer (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- Params: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Title: string (nullable = true)
 |-- IsContext: integer (nullable = true)

root
 |-- CategoryID: integer (nullable = true)
 |-- Level: integer (nullable = true)
 |-- ParentCategoryID: integer (nullable = true)
 |-- SubcategoryID: integer (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Level: integer (nullable = true)
 |-- RegionID: integer (nullable = true)
 |-- CityID: integer (nullable = true)

root
 |-- SearchID: integer (nullable = true)
 |-- SearchDate: string (nullable = true)
 |-- IPID: integer (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- IsUserLoggedOn: integer (nullable = true)
 |-- SearchQuery: string (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- SearchParams: string (n

In [21]:
! hdfs dfs -mkdir -p /user/avito/parquet

In [22]:
ads_info.write.parquet("/user/avito/parquet/ads_info.parquet")
category.write.parquet("/user/avito/parquet/category.parquet")
location.write.parquet("/user/avito/parquet/location.parquet")
search_info.write.parquet("/user/avito/parquet/search_info.parquet")
user_info.write.parquet("/user/avito/parquet/user_info.parquet")
visits_stream_no_header.write.parquet("/user/avito/parquet/visits_stream_no_header.parquet")
train_search_stream.write.parquet("/user/avito/parquet/train_search_stream.parquet")
test_search_stream.write.parquet("/user/avito/parquet/test_search_stream.parquet")

In [4]:
ads_info = se.read.parquet('/user/avito/parquet/ads_info.parquet')
category = se.read.parquet('/user/avito/parquet/category.parquet')
location = se.read.parquet('/user/avito/parquet/location.parquet')
search_info = se.read.parquet('/user/avito/parquet/search_info.parquet')
user_info = se.read.parquet('/user/avito/parquet/user_info.parquet')
visits_stream_no_header = se.read.parquet('/user/avito/parquet/visits_stream_no_header.parquet')
train_search_stream = se.read.parquet('/user/avito/parquet/train_search_stream.parquet')
test_search_stream = se.read.parquet('/user/avito/parquet/test_search_stream.parquet')

ads_info.registerTempTable("ads_info")
category.registerTempTable("category")
location.registerTempTable("location")
search_info.registerTempTable("search_info")
user_info.registerTempTable("user_info")
visits_stream_no_header.registerTempTable("visits_stream_no_header")
train_search_stream.registerTempTable("train_search_stream")
test_search_stream.registerTempTable("test_search_stream")

                                                                                

Скачаем Vowpal Wabbit

In [None]:
! sudo wget http://finance.yendor.com/ML/VW/Binaries/vw-8.20190624 -O /usr/bin/vw
! sudo chmod +x /usr/bin/vw
! sudo chown ubuntu /usr/bin/vw

In [None]:
! sudo apt-get update -y && sudo apt-get install graphviz -y
! pip install numpy pandas sklearn dateparser pandarallel ipywidgets catboost graphviz
! /opt/conda/bin/jupyter nbextension enable --py widgetsnbextension

Посмотрим на наши столбцы и релевантность выбранных фичей

In [17]:
se.sql("""
    SELECT *
    FROM train_search_stream
    WHERE ObjectType=3
    LIMIT 10
""").toPandas()

Unnamed: 0,SearchID,AdID,Position,ObjectType,HistCTR,IsClick
0,2,11441863,1,3,0.001804,0
1,2,22968355,7,3,0.004723,0
2,3,212187,7,3,0.029701,0
3,3,34084553,1,3,0.0043,0
4,4,20653823,1,3,0.003049,0
5,5,11219482,1,3,0.043897,0
6,5,13375896,7,3,0.001563,0
7,6,6303835,1,3,0.007044,0
8,6,28312593,7,3,0.002199,0
9,8,24728248,1,3,0.003647,0


In [12]:
se.sql("""
    SELECT Position, COUNT(Position)
    FROM train_search_stream
    WHERE ObjectType=3
    GROUP BY Position
    LIMIT 10
""").toPandas()

                                                                                

Unnamed: 0,Position,count(Position)
0,1,55689797
1,7,42485004


In [30]:
se.sql("""
    SELECT IsContext, COUNT(IsContext)
    FROM ads_info
    GROUP BY IsContext
    ORDER BY COUNT(IsContext) DESC
    LIMIT 10
""").toPandas()

Unnamed: 0,IsContext,count(IsContext)
0,0,36864728
1,1,28570


In [14]:
se.sql("""
    SELECT Position, COUNT(Position), COUNT(ai.IsContext)
    FROM train_search_stream tss
        inner join ads_info ai on tss.AdID = ai.AdID
    WHERE ObjectType=3
    GROUP BY Position
    LIMIT 10
""").toPandas()

2022-03-15 13:05:51,131 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

Unnamed: 0,Position,count(Position),count(IsContext)
0,1,55689797,55689797
1,7,42485004,42485004


In [27]:
se.sql("""
    SELECT UserAgentOSID, COUNT(UserAgentOSID)
    FROM user_info
    GROUP BY UserAgentOSID
    ORDER BY COUNT(UserAgentOSID) DESC
    LIMIT 10
""").toPandas()

Unnamed: 0,UserAgentOSID,count(UserAgentOSID)
0,20,2412273
1,43,585163
2,7,464987
3,35,270792
4,30,254897
5,9,133989
6,15,55087
7,2,36441
8,44,34838
9,14,11244


In [33]:
se.sql("""
    SELECT CategoryID, COUNT(CategoryID)
    FROM ads_info
    GROUP BY CategoryID
    ORDER BY COUNT(CategoryID) DESC
    LIMIT 10
""").toPandas()

                                                                                

Unnamed: 0,CategoryID,count(CategoryID)
0,34,5568133
1,22,3916136
2,54,2847511
3,60,2467065
4,43,1955900
5,250005,1577031
6,38,1526317
7,50,1396914
8,47,1362431
9,42,1345012


In [32]:
se.sql("""
    SELECT CategoryID, COUNT(CategoryID)
    FROM search_info
    GROUP BY CategoryID
    ORDER BY COUNT(CategoryID) DESC
    LIMIT 10
""").toPandas()

                                                                                

Unnamed: 0,CategoryID,count(CategoryID)
0,34,12162432
1,22,11899437
2,38,10287657
3,12,8727169
4,60,7700443
5,47,7626132
6,0,7360264
7,41,5301616
8,50,4805718
9,26,4001764


Проработаем подход для расчёта меры близости между заголовком-запросом и схожстью параметров запроса и параметров рекламы

In [21]:
se.sql("""
    SELECT ai.Params, si.SearchParams
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
    LIMIT 10
""").toPandas()

                                                                                

Unnamed: 0,Params,SearchParams
0,"{110:'Верхняя одежда', 178:'Для мальчиков'}","{119:'86-92 см (1-2 года)', 110:'Верхняя одежд..."
1,"{110:'Верхняя одежда', 178:'Для мальчиков'}","{119:'86-92 см (1-2 года)', 110:'Верхняя одежд..."
2,{104:'Ювелирные изделия'},{104:'Ювелирные изделия'}
3,{104:'Ювелирные изделия'},{104:'Ювелирные изделия'}
4,{127:'Детская мебель'},
5,{127:'Постельные принадлежности'},
6,"{5:'Запчасти', 598:'Для автомобилей'}",
7,"{5:'Запчасти', 598:'Для автомобилей'}",
8,"{5:'Запчасти', 598:'Для автомобилей'}",
9,"{5:'Запчасти', 598:'Для автомобилей'}",


In [50]:
from collections import Counter
from itertools import chain

dic_a = {110:'Верхняя одежда', 178:'Для мальчиков'}
dic_b = {119:'86-92 см (1-2 года)', 110:'Верхняя одежда', 178:'Для мальчиков'}

len(dic_a.keys() & dic_b.keys())

2

In [5]:
pre_params = se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.Params, si.SearchParams
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").toPandas()

2022-03-15 15:08:57,047 WARN memory.TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
2022-03-15 15:09:02,473 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:02,708 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:03,361 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:03,361 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:03,361 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:03,581 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:04,222 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:04,222 WARN memory.TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
2022-03-15 15:09:0

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:45129)

2 часа пытался разобраться как посчитать пересечение ключей по словарям, попробуем регулярки

In [9]:
import re

def extract_keys(text):
    """
    Получаем ключи через регулярные выражения
    """
    if not text:
        return ""
    regexp = r'(\d+):'
    return ' '.join(re.findall(regexp, text))

se.udf.register("extract_keys", extract_keys, "string")

<function __main__.extract_keys(text)>

In [8]:
pre_params = se.sql("""
    SELECT tss.AdID, tss.SearchID, extract_keys(ai.Params) as ai_params, extract_keys(si.SearchParams) as si_params
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1 and ai.Params IS NOT NULL and si.SearchParams IS NOT NULL
    LIMIT 10
""").toPandas()

                                                                                

In [12]:
pre_params

Unnamed: 0,AdID,SearchID,ai_params,si_params
0,27429113,148,110 178,119 110 178
1,27819278,148,110 178,119 110 178
2,25678701,463,104,104
3,35474830,463,104,104
4,27318390,1088,127,127
5,14142705,1342,83 175,83 175 85
6,34105752,1580,181,181
7,8152267,1580,181,181
8,11204789,1645,110 178,178
9,15861848,1645,110 178,178


Ура!

## Соберём датасет

In [5]:
# features 1-2
se.sql("""
    SELECT AdID, SearchID, Position as f_pos, HistCTR as f_ctr
    FROM train_search_stream
    WHERE ObjectType=3
""").registerTempTable("initial_features")

In [6]:
# feature 3
se.sql("""
SELECT SearchID, UserAgentOSID as f_OS_cat
FROM user_info ui
    join search_info si on ui.UserID = si.UserID
""").registerTempTable("OS_features")

In [10]:
# feature 4
se.sql("""
    SELECT tss.AdID, tss.SearchID, extract_keys(ai.Params) as f_ai_params, extract_keys(si.SearchParams) as f_si_params
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1 and ai.Params IS NOT NULL and si.SearchParams IS NOT NULL
""").registerTempTable("params_features")

In [11]:
# feature 5
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.Title as f_title, si.SearchQuery as f_query
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("title_features")

In [12]:
# feature 6
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.CategoryID as f_ai_cat, si.CategoryID as f_si_cat
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("category_features")

In [13]:
# feature 7
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.LocationID as f_ai_loc, si.LocationID as f_si_loc
    FROM train_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("location_features")

In [14]:
# feature 8
se.sql("""
    SELECT tss.AdID, tss.SearchID,  si.IsUserLoggedOn as f_log
    FROM train_search_stream tss
        join search_info si on tss.SearchID = si.SearchID
    WHERE tss.ObjectType=3
""").registerTempTable("login_features")

In [17]:
datadet_df = se.sql("""
SELECT IsClick as target, tss.AdID as f_AdID, tss.SearchID as f_SearchID, *
FROM
    train_search_stream tss
    join initial_features if on if.AdID = tss.AdID AND if.SearchID = tss.SearchID
    join OS_features osf on osf.SearchID = tss.SearchID
    join params_features pf on pf.AdID = tss.AdID AND pf.SearchID = tss.SearchID
    join title_features tf on tf.AdID = tss.AdID AND tf.SearchID = tss.SearchID
    join category_features cf on cf.AdID = tss.AdID AND cf.SearchID = tss.SearchID
    join location_features locf on locf.AdID = tss.AdID AND locf.SearchID = tss.SearchID
    join login_features logf on logf.SearchID = tss.SearchID
WHERE 
    tss.ObjectType=3
""")

[Stage 16:=> (6 + 4) / 14][Stage 17:>  (0 + 0) / 24][Stage 18:>   (0 + 0) / 4]

In [37]:
datadet_df.limit(5).toPandas()

2022-03-15 16:24:25,577 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2022-03-15 16:30:14,400 ERROR executor.Executor: Exception in task 1.0 in stage 48.0 (TID 2322)
java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)

Py4JJavaError: An error occurred while calling o162.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 47.0 failed 1 times, most recent failure: Lost task 13.0 in stage 47.0 (TID 2320, rc1a-dataproc-c-0430dmrwee1whrn9.mdb.yandexcloud.net, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


2022-03-15 16:30:38,067 WARN scheduler.TaskSetManager: Lost task 12.0 in stage 47.0 (TID 2319, rc1a-dataproc-c-0430dmrwee1whrn9.mdb.yandexcloud.net, executor driver): TaskKilled (Stage cancelled)


In [18]:
datadet_df

DataFrame[target: int, f_AdID: int, f_SearchID: int, SearchID: int, AdID: int, Position: int, ObjectType: int, HistCTR: double, IsClick: int, AdID: int, SearchID: int, f_pos: int, f_ctr: double, SearchID: int, f_OS_cat: int, AdID: int, SearchID: int, f_ai_params: string, f_si_params: string, AdID: int, SearchID: int, f_title: string, f_query: string, AdID: int, SearchID: int, f_ai_cat: int, f_si_cat: int, AdID: int, SearchID: int, f_ai_loc: int, f_si_loc: int, AdID: int, SearchID: int, f_log: int]

[Stage 17:=> (9 + 4) / 24][Stage 18:>   (0 + 0) / 4][Stage 20:> (0 + 0) / 200]

In [19]:
cols = datadet_df.columns
non_features_c = [
    c for c in cols
    if not (c == 'target' or c.startswith('f_'))
]
non_features_c

['SearchID',
 'AdID',
 'Position',
 'ObjectType',
 'HistCTR',
 'IsClick',
 'AdID',
 'SearchID',
 'SearchID',
 'AdID',
 'SearchID',
 'AdID',
 'SearchID',
 'AdID',
 'SearchID',
 'AdID',
 'SearchID',
 'AdID',
 'SearchID']

[Stage 19:(187 + 4) / 200][Stage 20:> (4 + 0) / 200][Stage 22:> (0 + 0) / 200]

In [61]:
datadet_df.drop(*non_features_c).write.parquet("/user/avito/parquet/dataset.parquet")

2022-03-15 18:31:27,057 ERROR memory.TaskMemoryManager: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@2e373df1
java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:260)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:136)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSor

Py4JJavaError: An error occurred while calling o175.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:847)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 37.0 failed 1 times, most recent failure: Lost task 31.0 in stage 37.0 (TID 2007, rc1a-dataproc-c-0430dmrwee1whrn9.mdb.yandexcloud.net, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 33 more
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Не вместилось, удалим

In [67]:
!hdfs dfs -rm -r /user/avito/parquet/dataset.parquet || true
!hdfs dfs -ls /user/avito/parquet

Found 8 items
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 00:54 /user/avito/parquet/ads_info.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 00:54 /user/avito/parquet/category.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 00:54 /user/avito/parquet/location.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 00:58 /user/avito/parquet/search_info.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 01:07 /user/avito/parquet/test_search_stream.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 01:07 /user/avito/parquet/train_search_stream.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 00:58 /user/avito/parquet/user_info.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-03-15 01:04 /user/avito/parquet/visits_stream_no_header.parquet


In [40]:
cols

['target',
 'SearchID',
 'AdID',
 'Position',
 'ObjectType',
 'HistCTR',
 'IsClick',
 'AdID',
 'SearchID',
 'f_pos',
 'f_ctr',
 'SearchID',
 'f_OS_cat',
 'AdID',
 'SearchID',
 'f_ai_params',
 'f_si_params',
 'AdID',
 'SearchID',
 'f_title',
 'f_query',
 'AdID',
 'SearchID',
 'f_ai_cat',
 'f_si_cat',
 'AdID',
 'SearchID',
 'f_ai_loc',
 'f_si_loc',
 'AdID',
 'SearchID',
 'f_log']

In [26]:
def convert_to_vw_class(data):
    target = 1 if data['target'] == 1 else -1
    
    #f_pos = data['f_pos']
    #f_ctr = data['f_ctr']
    #f_OS_cat = data['f_OS_cat']
    #f_ai_params = data['f_ai_params']
    #f_si_params = data['f_si_params']
    
    raw_text = data['f_title'].lower()
    word_pattern = re.compile(r"[a-zA-Z0-9_]+")
    words = [match.group(0) for match in re.finditer(word_pattern, raw_text)]
    f_title = ' '.join(words)
    
    raw_text = data['f_query'].lower()
    word_pattern = re.compile(r"[a-zA-Z0-9_]+")
    words = [match.group(0) for match in re.finditer(word_pattern, raw_text)]
    f_query = ' '.join(words)
    
    #f_ai_cat = data['f_ai_cat']
    #f_si_cat = data['f_si_cat']
    #f_ai_loc = data['f_ai_loc']
    #f_si_loc = data['f_si_loc']
    #f_log = data['f_log']

    
    template = "{target} |p f_pos |h f_ctr:{f_ctr} |o f_OS_cat |d f_ai_params |e f_si_params |t f_title |q f_query |c f_ai_cat |b f_si_cat |l f_ai_loc |m f_si_loc |g f_log"
    return template.format(
        target = target,
        f_pos = data['f_pos'],
        f_ctr = data['f_ctr'],
        f_OS_cat = data['f_OS_cat'],
        f_ai_params = data['f_ai_params'],
        f_si_params = data['f_si_params'],
        f_title=f_title,
        f_query=f_query,
        f_ai_cat = data['f_ai_cat'],
        f_si_cat = data['f_si_cat'],
        f_ai_loc = data['f_ai_loc'],
        f_si_loc = data['f_si_loc'],
        f_log = data['f_log']
    )

In [21]:
def write_vw_class(dataframe, filename):
    with open(filename, 'w') as f:
        for vw_record in dataframe.parallel_apply(convert_to_vw_class, axis=1):
            f.write(vw_record + '\n')

In [27]:
train, val = (
    datadet_df
    .randomSplit([0.8, 0.2], 2022)
)

In [34]:
val.drop(*non_features_c).rdd.map(lambda row: convert_to_vw_class(row)).coalesce(1).saveAsTextFile('/user/avito/val.vw')

2022-03-15 20:18:19,749 ERROR executor.Executor: Exception in task 1.0 in stage 84.0 (TID 3013)
java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.Uns

Py4JJavaError: An error occurred while calling o171.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 84.0 failed 1 times, most recent failure: Lost task 1.0 in stage 84.0 (TID 3013, rc1a-dataproc-c-0430dmrwee1whrn9.mdb.yandexcloud.net, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2167)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 50 more
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:539)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [33]:
!hdfs dfs -rm -r /user/avito/data/VisitsStreamRaw.tsv
!hdfs dfs -rm -r /user/avito/data/testSearchStream.tsv
!hdfs dfs -rm -r /user/avito/data/trainSearchStream.tsv

Deleted /user/avito/data/VisitsStreamRaw.tsv
Deleted /user/avito/data/testSearchStream.tsv
Deleted /user/avito/data/trainSearchStream.tsv


In [32]:
!hdfs dfs -ls /user/avito/data

Found 8 items
-rw-r--r--   3 ubuntu hadoop  5350670676 2022-03-15 00:22 /user/avito/data/AdsInfo.tsv
-rw-r--r--   3 ubuntu hadoop         752 2022-03-15 00:18 /user/avito/data/Category.tsv
-rw-r--r--   3 ubuntu hadoop       58976 2022-03-15 00:18 /user/avito/data/Location.tsv
-rw-r--r--   3 ubuntu hadoop  9469373867 2022-03-15 00:28 /user/avito/data/SearchInfo.tsv
-rw-r--r--   3 ubuntu hadoop   104614699 2022-03-15 00:32 /user/avito/data/UserInfo.tsv
-rw-r--r--   3 ubuntu hadoop 13180996366 2022-02-21 15:24 /user/avito/data/VisitsStreamRaw.tsv
-rw-r--r--   3 ubuntu hadoop   557829937 2022-03-15 00:32 /user/avito/data/testSearchStream.tsv
-rw-r--r--   3 ubuntu hadoop  5648044032 2022-03-15 00:32 /user/avito/data/trainSearchStream.tsv


In [None]:
train.rdd.map(lambda row: write_vw_class(row)).coalesce(1).saveAsTextFile('/user/avito/train.vw')

Аналогичный подход для сбора тестового датасета

In [38]:
# features 1-2
se.sql("""
    SELECT AdID, SearchID, Position as f_pos, HistCTR as f_ctr
    FROM test_search_stream
    WHERE ObjectType=3
""").registerTempTable("initial_features_test")

# feature 3
se.sql("""
SELECT SearchID, UserAgentOSID as f_OS_cat
FROM user_info ui
    join search_info si on ui.UserID = si.UserID
""").registerTempTable("OS_features_test")

# feature 4
se.sql("""
    SELECT tss.AdID, tss.SearchID, extract_keys(ai.Params) as f_ai_params, extract_keys(si.SearchParams) as f_si_params
    FROM test_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1 and ai.Params IS NOT NULL and si.SearchParams IS NOT NULL
""").registerTempTable("params_features_test")

# feature 5
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.Title as f_title, si.SearchQuery as f_query
    FROM test_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("title_features_test")

# feature 6
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.CategoryID as f_ai_cat, si.CategoryID as f_si_cat
    FROM test_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("category_features_test")

# feature 7
se.sql("""
    SELECT tss.AdID, tss.SearchID, ai.LocationID as f_ai_loc, si.LocationID as f_si_loc
    FROM test_search_stream tss
        join ads_info ai on tss.AdID = ai.AdID
        join search_info si on tss.SearchID = si.SearchID
    WHERE ai.IsContext=1
""").registerTempTable("location_features_test")

# feature 8
se.sql("""
    SELECT tss.AdID, tss.SearchID,  si.IsUserLoggedOn as f_log
    FROM test_search_stream tss
        join search_info si on tss.SearchID = si.SearchID
    WHERE tss.ObjectType=3
""").registerTempTable("login_features_test")

test_data = se.sql("""
SELECT *
FROM
    test_search_stream tss
    join initial_features_test if on if.AdID = tss.AdID AND if.SearchID = tss.SearchID
    join OS_features_test osf on osf.SearchID = tss.SearchID
    join params_features_test pf on pf.AdID = tss.AdID AND pf.SearchID = tss.SearchID
    join title_features_test tf on tf.AdID = tss.AdID AND tf.SearchID = tss.SearchID
    join category_features_test cf on cf.AdID = tss.AdID AND cf.SearchID = tss.SearchID
    join location_features_test locf on locf.AdID = tss.AdID AND locf.SearchID = tss.SearchID
    join login_features_test logf on logf.SearchID = tss.SearchID
WHERE 
    tss.ObjectType=3
""")

In [None]:
test_data.limit(5).toPandas()

По итогу я не смог закончить задание 1 и приступить к заданию 2 из-за упирания в память. Я не понимаю, каким образом докидывать память на кластер и куда именно её выделять, прошу зачесть хотя бы частичные баллы за проделанную работу и описанную логику дальнейших действий, где я попытался показать, каким образом будет зафинализировано решение

# 2 Обучаем логрегу

По сути здесь достаточно просто скопировать код с семинара для задачи классификации

In [25]:
%%time

! vw --final_regressor avito.model.bin '/user/avito/train.vw' \
    --loss_function logistic \
    --learning_rate 20.0 \
    --bit_precision 23 \
    --passes 10 \
    --cache -k

final_regressor = avito.model.bin
Num weight bits = 23
learning rate = 20
initial_t = 0
power_t = 0.5
decay_learning_rate = 1
Error: can't open: /user/avito/train.vw.cache.writingerrno = unknown

finished run
number of examples = 0
weighted example sum = 0.000000
weighted label sum = 0.000000
average loss = undefined (no holdout)
total feature number = 0
can't open: /user/avito/train.vw.cache.writingerrno = unknown
vw (io_buf.h:143): can't open: /user/avito/train.vw.cache.writingerrno = unknown
CPU times: user 14.7 ms, sys: 4.6 ms, total: 19.3 ms
Wall time: 519 ms


In [None]:
%%time

! vw '/user/avito/val.vw' \
    --link=logistic \
    --testonly \
    --initial_regressor avito.model.bin \
    --predictions logistic_predictions.txt

In [None]:
%%time

! vw '/user/avito/val.vw' \
    --binary \
    --testonly \
    --initial_regressor avito.model.bin \
    --predictions predictions.txt

In [37]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score


def calc_class_metrics(predictions_filename, answers_filename):
    def read_target_from_vw(vw_record):
        return int((float(vw_record.split(' ')[0]) + 1) / 2)
    
    with open(predictions_filename, 'r') as f:
        y_pred = np.array([int(float(value) > 0.5) for value in f.readlines()])
        
    with open(answers_filename, 'r') as f:
        y_expected = np.array([read_target_from_vw(value) for value in f.readlines()])
        
    print('accuracy=', accuracy_score(y_expected, y_pred))
    print('precision=', precision_score(y_expected, y_pred))
    print('recall=', recall_score(y_expected, y_pred))
    return

In [None]:
calc_class_metrics('predictions.txt', '/user/avito/val.vw')

In [36]:
from sklearn.metrics import log_loss

def calc_log_loss(predictions_filename, answers_filename):
    def read_target_from_vw(vw_record):
        return int((float(vw_record.split(' ')[0]) + 1) / 2)
    
    with open(predictions_filename, 'r') as f:
        y_pred = np.array([float(value) for value in f.readlines()])
        
    with open(answers_filename, 'r') as f:
        y_expected = np.array([read_target_from_vw(value) for value in f.readlines()])
        
    print('log_loss=', log_loss(y_expected, y_pred))
    return

In [None]:
calc_log_loss('logistic_predictions.txt', '/user/avito/val.vw')

Добавим классные фичи - 2-граммы для заголовков и запросов, а также умножения one-hot закодированных категориальных меток для локаций, категорий и параметров пар "запрос-реклама"

In [None]:
%%time

! vw --final_regressor avito.model.bin '/user/avito/train.vw' \
    --loss_function logistic \
    --learning_rate 20.0 \
    --bit_precision 23 \
    --passes 10 \
    --ngram t2 \
    --ngram q2 \
    --interactions bc \
    --interactions de \
    --interactions lm \
    --cache -k

In [None]:
%%time

! vw '/user/avito/val.vw' \
    --link=logistic \
    --testonly \
    --initial_regressor avito.model.bin \
    --predictions logistic_predictions.txt

In [None]:
%%time

! vw '/user/avito/val.vw' \
    --binary \
    --testonly \
    --initial_regressor avito.model.bin \
    --predictions predictions.txt

In [None]:
%%time

! vw '/user/avito/test.vw' \
    --binary \
    --testonly \
    --initial_regressor avito.model.bin \
    --predictions submissions.txt