In [1]:
import findspark
findspark.init('/opt/spark-3.4.3/')

In [2]:
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.feature import VectorAssembler

In [4]:
from pyspark.ml import Pipeline

In [34]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .master("yarn")
    .appName('maxb_lab04_kafka')
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.3")
    .getOrCreate()
)

sc = spark.sparkContext

In [6]:
def kill_all_streams():
    """Остановить все запущенные стримы."""
    streams = spark.streams.active
    if streams:
        for s in streams:
            desc = s.lastProgress["sources"][0]["description"]
            s.stop()
            print(f"Остановлен стрим: {desc}")
    return None

In [7]:
def write_kafka(data, topic="maksim.burdasov"):
    """Записать данные в топик."""
    # Параметры записи
    write_kafka_params = {
       "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
       "topic": topic
    }

    # Преобразование данных в JSON
    kafka_doc = F.to_json(F.struct(F.col('*')))

    # Запись
    data.select(
        kafka_doc.alias('value')
    ).withColumn(
        'topic',
        F.lit(topic)
    ).write.format('kafka').options(**write_kafka_params).save()

### Загрузка обученных моделей

In [8]:
from pyspark.ml.classification import RandomForestClassificationModel

In [9]:
my_folder = "hdfs://spark-master-1.newprolab.com:8020/user/maksim.burdasov/"

gender_model_nm = "lab04_model_gender_rf_v2"
age_model_nm = "lab04_model_age_rf_v2"

# Загрузка моделей
gender_model = RandomForestClassificationModel.load(my_folder + gender_model_nm)
age_model = RandomForestClassificationModel.load(my_folder + age_model_nm)

                                                                                

### Чтение из Kafka

In [14]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_maksim.burdasov",
    # "startingOffsets": "latest",  # в батчевом режиме закоментить
    # "maxOffsetsPerTrigger": "25000",
    "failOnDataLoss": "False"
}

# Подключение к топику
kafka_sdf = spark.read.format("kafka").options(**read_kafka_params).load()

In [15]:
# kafka_sdf.count()

25/05/06 22:54:01 WARN admin.AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

5000

In [16]:
### Парсинг JSON

# Схема строки логов
log_schema = StructType([
    StructField('uid', StringType()),
    StructField('visits', StringType())
])

parsed_sdf = kafka_sdf.select(
    F.from_json(
        F.col('value').cast('string'), 
        log_schema
    ).alias('user_json')
)

parsed_sdf = parsed_sdf.withColumn('uid', F.col('user_json.uid'))
parsed_sdf = parsed_sdf.withColumn('visits', F.col('user_json.visits'))

# Схема содержимого поля visits
visits_schema = ArrayType(
    StructType([
        StructField('url', StringType()),
        StructField('timestamp', StringType())
    ])
)

parsed_sdf = parsed_sdf.select(
    'uid',
    F.from_json(
        F.col('visits').cast('string'), 
        visits_schema
    ).alias('visits_json')
)

parsed_sdf = parsed_sdf.withColumn('urls', F.col('visits_json.url'))
parsed_sdf = parsed_sdf.withColumn('timestamps', F.col('visits_json.timestamp'))

df = parsed_sdf.select('uid', 'urls', 'timestamps')

In [17]:
df = df.cache()
df.show(1, vertical=True, truncate=True)

25/05/06 22:54:05 WARN admin.AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 25:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------
 uid        | bd7a30e1-a25d-4cb... 
 urls       | [http://www.inter... 
 timestamps | [1419775945781, 1... 
only showing top 1 row



                                                                                

### Подготовка данных для инференса

In [23]:
# Колонка с количеством посещенных сайтов
df = df.withColumn('urls_cnt', F.size('urls'))

In [24]:
### Пайплайн векторизации списка URL

# Объединение URL в одну строку
df = df.withColumn('urls', F.concat_ws(" ", 'urls'))

# Получение колонки токенов из URL
tokenizer = RegexTokenizer(
    inputCol="urls",
    outputCol="tokens_raw",
    pattern=r"\b[\w]{2,}\b",
    gaps=False,  # pattern используется для поиска токенов
    toLowercase=True
)

custom_stopwords = [
    # Общие служебные слова URL
    'http', 'https', 'www', 'com', 'ru', 'net', 'org', 'html', 'php', 'asp', 'aspx', 'jsp',
    'utm', 'referrer', 'ref', 'source', 'click', 'id', 'page', 'index', 'feed', 'menu',
    'api', 'track', 'trackid', 'session', 'sid', 'token', 'auth', 'access', 'key', 'lang',
    'language', 'query', 'search', 'rpt', 'clid', 'clid', 'utm_campaign', 'utm_medium', 'utm_source',
    'utm_content', 'utm_term', 'fbclid', 'gclid', 'mc_cid', 'mc_eid',

    # Частые параметры и служебные слова
    'page', 'view', 'item', 'category', 'product', 'offer', 'promo', 'click', 'redirect',
    'index', 'default', 'main',

    # Общие слова, малоинформативные для кластеризации
    'id', 'type', 'action', 'mode', 'ref', 'referrer', 'sessionid', 'userid', 'user',

    # Распространённые сокращения и слова из URL
    'www1', 'www2', 'www3', 'mobile', 'm', 'amp', 'cdn', 'static', 'cache',

    # Часто встречающиеся короткие слова
    'to', 'in', 'on', 'at', 'by', 'of', 'and', 'or', 'for', 'with', 'from'
]

# Удаление неинформативных слов
custom_stopwords = StopWordsRemover.loadDefaultStopWords("english") + custom_stopwords
stopwords_remover = StopWordsRemover(
    inputCol="tokens_raw",
    outputCol="tokens",
    stopWords=custom_stopwords
)

# Расчет TF
tf_vectorizer = HashingTF(numFeatures=1024, inputCol="tokens", outputCol="tf_features")

# Расчет IDF
idf_counter = IDF(inputCol='tf_features', outputCol='tfidf_features')

pipeline = Pipeline(stages=[tokenizer, stopwords_remover, tf_vectorizer, idf_counter])
url_ftrs_df = pipeline.fit(df).transform(df)

# Датафрейм с таргетами и фичами по спискам URL
url_ftrs_df = (
    url_ftrs_df
    .select(
        # Идентификатор
        'uid', 
        # Колонки фичей
        'urls_cnt', 
        'tfidf_features'
    )
)

url_ftrs_df = url_ftrs_df.cache()
url_ftrs_df.show(1)

[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+--------+--------------------+
|                 uid|urls_cnt|      tfidf_features|
+--------------------+--------+--------------------+
|bd7a30e1-a25d-4cb...|    2000|(1024,[0,1,2,3,4,...|
+--------------------+--------+--------------------+
only showing top 1 row



                                                                                

In [25]:
### Подготовка фичей из отметок времени

# Разворачиваем списки временных отметок
df_exploded = df.select('uid', F.explode(F.col('timestamps')).alias('ts')).orderBy('uid', F.col('ts').asc())

# Получение дат
df_exploded = df_exploded.withColumn("ts", F.from_unixtime(F.col("ts") / 1000))

# Отдельные колонки для агрегат
df_exploded = df_exploded.withColumn("date", F.to_date("ts"))
df_exploded = df_exploded.withColumn("hour", F.hour("ts"))

# Флаги захода на сайты по четвертям дня
df_exploded = df_exploded.withColumn(
    "night_flg", 
    F.when((df_exploded.hour >= 0) & (df_exploded.hour <= 5), F.lit(1)).otherwise(0)
)

df_exploded = df_exploded.withColumn(
    "morning_flg", 
    F.when((df_exploded.hour >= 6) & (df_exploded.hour <= 11), F.lit(1)).otherwise(0)
)

df_exploded = df_exploded.withColumn(
    "afternoon_flg", 
    F.when((df_exploded.hour >= 12) & (df_exploded.hour <= 17), F.lit(1)).otherwise(0)
)

df_exploded = df_exploded.withColumn(
    "evening_flg", 
    F.when((df_exploded.hour >= 18) & (df_exploded.hour <= 23), F.lit(1)).otherwise(0)
)

# df_exploded.show(30)

# Группировка
time_ftrs_df = df_exploded.groupBy("uid").agg(
    F.mean(F.col('hour')).alias('mean_hour'),
    F.median(F.col('hour')).alias('median_hour'),
    (F.count('*') / F.count_distinct(F.col('date'))).alias('avg_urls_per_day'),
    (F.sum(F.col('night_flg')) * 100.0 / F.count('*')).alias('night_pct'),
    (F.sum(F.col('morning_flg')) * 100.0 / F.count('*')).alias('morning_pct'),
    (F.sum(F.col('afternoon_flg')) * 100.0 / F.count('*')).alias('afternoon_pct'),
    (F.sum(F.col('evening_flg')) * 100.0 / F.count('*')).alias('evening_pct'),
)

# Показываем результат
time_ftrs_df = time_ftrs_df.cache()
time_ftrs_df.show(1, True, True)



-RECORD 0--------------------------------
 uid              | 7302e78a-ec04-47e... 
 mean_hour        | 16.083333333333332   
 median_hour      | 16.0                 
 avg_urls_per_day | 22.90909090909091    
 night_pct        | 0.3968253968253968   
 morning_pct      | 18.5515873015873     
 afternoon_pct    | 41.964285714285715   
 evening_pct      | 39.08730158730159    
only showing top 1 row



                                                                                

In [26]:
# Получение датасета со всеми фичами
union_df = url_ftrs_df.join(time_ftrs_df, how='left', on='uid')

union_df = union_df.cache()
union_df.show(1, True, True)

-RECORD 0--------------------------------
 uid              | 0108d217-e476-493... 
 urls_cnt         | 3                    
 tfidf_features   | (1024,[99,124,215... 
 mean_hour        | 12.0                 
 median_hour      | 12.0                 
 avg_urls_per_day | 3.0                  
 night_pct        | 0.0                  
 morning_pct      | 0.0                  
 afternoon_pct    | 100.0                
 evening_pct      | 0.0                  
only showing top 1 row



In [27]:
### Помещение всех признаков в один вектор

feature_cols = [
    "urls_cnt",
    "tfidf_features",
    "mean_hour",
    "median_hour",
    "avg_urls_per_day",
    "night_pct",
    "morning_pct",
    "afternoon_pct",
    "evening_pct"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_df = assembler.transform(union_df)

final_df.show(1, True, True)

-RECORD 0--------------------------------
 uid              | 0108d217-e476-493... 
 urls_cnt         | 3                    
 tfidf_features   | (1024,[99,124,215... 
 mean_hour        | 12.0                 
 median_hour      | 12.0                 
 avg_urls_per_day | 3.0                  
 night_pct        | 0.0                  
 morning_pct      | 0.0                  
 afternoon_pct    | 100.0                
 evening_pct      | 0.0                  
 features         | (1032,[0,100,125,... 
only showing top 1 row



In [28]:
infer_df = final_df.select('uid', 'features')

infer_df.show(3)

+--------------------+--------------------+
|                 uid|            features|
+--------------------+--------------------+
|0108d217-e476-493...|(1032,[0,100,125,...|
|0192cc54-559c-4c8...|(1032,[0,33,34,55...|
|019acd5e-be9a-4cd...|(1032,[0,2,10,25,...|
+--------------------+--------------------+
only showing top 3 rows



### Инференс

In [29]:
gen_preds = gender_model.transform(infer_df)
gen_preds = gen_preds.select(
    'uid',
    'features',
    F.when(F.col('prediction') == 0, 'F').otherwise('M').alias('gender')
)

gen_preds.show(1)

+--------------------+--------------------+------+
|                 uid|            features|gender|
+--------------------+--------------------+------+
|0108d217-e476-493...|(1032,[0,100,125,...|     F|
+--------------------+--------------------+------+
only showing top 1 row



Словарь маппинга возрастов
mapping_dict = {
    0: '18-24', 
    1: '25-34', 
    2: '35-44', 
    3: '45-54', 
    4: '>=55'
}


In [30]:
age_preds = age_model.transform(gen_preds)

preds = age_preds.select(
    'uid',
    'gender',
    F.col('prediction').alias('age')
)

preds = preds.withColumn(
    'age',
    F.when(F.col('age') == 0, '18-24')
    .when(F.col('age') == 1, '25-34')
    .when(F.col('age') == 2, '35-44')
    .when(F.col('age') == 3, '45-54')
    .when(F.col('age') == 4, '>=55')
)

In [31]:
preds.show(1, False)

+------------------------------------+------+-----+
|uid                                 |gender|age  |
+------------------------------------+------+-----+
|0108d217-e476-493d-8c81-a9744f12451a|F     |25-34|
+------------------------------------+------+-----+
only showing top 1 row



### Запись в Kafka

In [36]:
write_kafka(preds)

### Остановка контекста

In [35]:
sc.stop()