## Импорты

In [28]:
import sys
import os
import warnings
warnings.filterwarnings('ignore')
import time
from pyspark.sql import SparkSession, DataFrameWriter
import pyspark.sql.functions as F

from pyspark.sql.functions import isnan, isnull

In [2]:
# Создание спарк сессии
spark = SparkSession.builder.master("local").enableHiveSupport().appName("extract-transform").getOrCreate()
spark

In [3]:
# Читаем данные из паркета
df = spark.read.format("parquet").load('data_in/competition_data_final_pqt/')

In [23]:
df.printSchema()

root
 |-- region_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- cpe_manufacturer_name: string (nullable = true)
 |-- cpe_model_name: string (nullable = true)
 |-- url_host: string (nullable = true)
 |-- cpe_type_cd: string (nullable = true)
 |-- cpe_model_os_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- date: date (nullable = true)
 |-- part_of_day: string (nullable = true)
 |-- request_cnt: long (nullable = true)
 |-- user_id: long (nullable = true)



In [None]:
["region_name", "city_name", "cpe_manufacturer_name", "cpe_model_name", "url_host", "cpe_type_cd", "cpe_model_os_type", "price", "date", "part_of_day", "request_cnt", "user_id"]

In [4]:
# Создаем локальное представление датафрейма, как sql таблицы mts
df.createOrReplaceTempView("mts")

In [6]:
# Читаем данные таргета
target = spark.read.format("parquet").load('data_in/public_train.pqt')

In [8]:
# Нам даные user_id для которых известны age и is_male
target.printSchema()

root
 |-- age: double (nullable = true)
 |-- is_male: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [9]:
target.count()

270000

In [21]:
%%time
# isnan очень медленная функция
target.where(isnan(F.col("age"))).count()

CPU times: user 8.12 ms, sys: 5.57 ms, total: 13.7 ms
Wall time: 39.7 s


0

In [22]:
target.createOrReplaceTempView("target")

In [36]:
# На основе eda и таргета сформируем не полные данные для сокращения времени обработки, возьмем не все столбцы
df_target = spark.sql("SELECT region_name, city_name, url_host, date, part_of_day, request_cnt, user_id "
                      "from mts where user_id in (select user_id from target)")

In [34]:
df_target.count()

210730732

In [37]:
%%time
# Сохраним не полные данные для дальнейшей обработоки
df_target.write.parquet(path="data_in/competition_data_only_target_pqt/", mode="overwrite")

CPU times: user 64.3 ms, sys: 43.1 ms, total: 107 ms
Wall time: 8min 57s


In [38]:
df_test = spark.read.format("parquet").load("data_in/competition_data_only_target_pqt/")

In [39]:
df_test.printSchema()

root
 |-- region_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- url_host: string (nullable = true)
 |-- date: date (nullable = true)
 |-- part_of_day: string (nullable = true)
 |-- request_cnt: long (nullable = true)
 |-- user_id: long (nullable = true)



In [40]:
# Провери количество записей
df_test.count()

210730732