# Необходимые импорты и функции

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import Window
import os

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/21 00:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/21 00:23:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Объявление схемы данных для дальнейшего считывания JSON файлов и составления Data Frame.

In [3]:
schema = T.StructType([
    T.StructField("inn", T.StringType(), True),
    T.StructField("raw_cookie", T.ArrayType(T.MapType(T.StringType(),
                                                      T.StringType()))),
    T.StructField("event_type", T.StringType(), True),
    T.StructField("event_action", T.StringType(), True),
    T.StructField("data_value", T.StringType(), True),
    T.StructField("geocountry", T.StringType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("user_os", T.StringType(), True),
    T.StructField("systemlanguage", T.StringType(), True),
    T.StructField("geoaltitude", T.StringType(), True),
    T.StructField("meta_platform", T.StringType(), True),
    T.StructField("screensize", T.StringType(), True),
    T.StructField("timestampcolumn", T.DateType(), True)
                       ])

Объявление обязательных переменных, успользуемых в функциях

In [4]:
PATH_TO_FILES = "data/json/"
DICT_MATCH_CODE = {"IOS": "IDFA", "Android": "GAID"}
filenames = sorted(os.listdir(PATH_TO_FILES), reverse=True)

Функция считывания JSON файла

In [5]:
def read_file_json(file_name, schema_json_file):
 return spark.read.format("json") \
            .load(f"{file_name}", schema=schema_json_file)

Функция объединения данных JSON файлов и удаления дубликатов в них. Удаление производится с помощью оконной функции rank. Если встречаются дубликаты по столбцу INN, оставляем запись с более поздней датой.

In [6]:
def union_and_dropduplicate(df1, df2):
    return (df1.union(df2).withColumn('rank', F.rank()
    .over(Window.partitionBy('inn')
    .orderBy(F.desc('timestampcolumn'))))
                          .filter('rank = 1')
                          .drop('rank'))

Функция считывания и создания таблицы из сгенерированных файлов

In [7]:
def initial_data(schema_json_file: T.StructType, count: int = len(filenames)):
    blank_df = spark.createDataFrame([], schema=schema_json_file)
    for i in filenames[:count]:
        json_file = read_file_json(f"{PATH_TO_FILES}{i}", schema_json_file)
        blank_df = union_and_dropduplicate(blank_df, json_file)
    return blank_df

Функция, предоставляющая возможность получить значение словоря по его ключу

In [8]:
def search_values_from_maptype(col_name, key):
    return F.expr(f"filter({col_name}, x -> x.key='{key}')")[0]["value"]

Функция преобразования соответствия ОС. Используется в витрине "G"

In [9]:
@F.udf
def match_code(x):
    return DICT_MATCH_CODE.get(x, None)

Чтобы не обращаться постоянно к функции, считывание данных занесем в переменную "initial_data_df"

In [10]:
initial_data_df = initial_data(schema).select("*")

# Витрина А

Данная витрина - это наша сгенерированная таблица без поля "INN".

In [11]:
data_mart_a = initial_data_df.drop('inn')
data_mart_a.show(5, truncate=False)



+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------------+----------------------------------------------------------------+-----------+---------+---------+--------------+-----------+-------------+----------+---------------+
|raw_cookie                                                                                                                                                                                                                                                                                                                                                                                      

                                                                                

# Витрина B

В данной витрине необходимо найти все "INN" пользователей. Для поля "ID" в данной витрине была примена оконная функция в качестве примера, в последующих витринах оконных функций использовано не будет из-за их долгой работы при сортировке данных.

In [12]:
# удалять cookie по INN

data_mart_b = initial_data_df.select(
    F.row_number().over(
        Window.orderBy(
            F.col('inn').desc()
            )
        ).alias("id"),
        "inn"
  )
data_mart_b.show(5)

25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 0

+---+------------+
| id|         inn|
+---+------------+
|  1|999913627903|
|  2|999738417751|
|  3|999607920885|
|  4|999530476502|
|  5|999420201902|
+---+------------+
only showing top 5 rows



# Витрина С

В данной витрине необходимо собрать куки сайта _sa_cookie_a, которые необходимо извлечь из поля raw_cookies. Для этого использовалась функция "pyspark.sql.functions.expr", в которую передовалось сгенирированное значение из функции "search_values_from_maptype". Для генерации ID используется метод "monotonically_increasing_id". В последующий витринах используются эти функции.

In [13]:
data_mart_c = initial_data_df.select(
  (F.monotonically_increasing_id() + 1).alias("id"),
  search_values_from_maptype("raw_cookie", "_sa_cookie_a").alias("sa_cookie_a"))
data_mart_c.show(5)

+---+--------------------+
| id|         sa_cookie_a|
+---+--------------------+
|  1|SA1.b054f0e6-823b...|
|  2|SA1.4ca71a23-bbcd...|
|  3|SA1.e90d1455-90a7...|
|  4|SA1.2ccbfea7-6613...|
|  5|SA1.a451da49-aa8d...|
+---+--------------------+
only showing top 5 rows



# Витрина D

Тоже самое что и в витрине "В". Подразумевается, что к этому моменту поступили еще дополнительные JSON файлы, которые не пересекутся с витриной "В" при объединении.

In [14]:
data_mart_d = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    "inn"
)
data_mart_d.show(5, truncate=False)

+---+------------+
|id |inn         |
+---+------------+
|1  |000721449767|
|2  |000742955042|
|3  |001749892036|
|4  |001933019954|
|5  |002132457813|
+---+------------+
only showing top 5 rows



# Витрина Е

Необходимо найти телефон пользователя и захешировать его алгоритмом хешировани md5. Делается это с помощью встроеной функции "pyspark.sql.functions.md5"

In [15]:
data_mart_e = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    F.md5(search_values_from_maptype("raw_cookie", "user_phone"))
      .alias("hash_phone_md5")
)

data_mart_e.show(5, truncate=False)

+---+--------------------------------+
|ID |hash_phone_md5                  |
+---+--------------------------------+
|1  |105170ecec86e071c67e78310c566c28|
|2  |d20811454482a79cdc76426605cd84ea|
|3  |1eb868434588ad97675b2e535557a202|
|4  |a939f1684e30386d48350c0700bae051|
|5  |776d4d58d98ed74f0fe3bc0b254a6c32|
+---+--------------------------------+
only showing top 5 rows



# Витрина F

Тоже самое что и в витрине "E", только с полем "user_mail".

In [16]:
data_mart_f = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    F.md5(search_values_from_maptype("raw_cookie", "user_mail"))
      .alias("hash_email_md5")
)

data_mart_f.show(5, truncate=False)

+---+--------------------------------+
|ID |hash_email_md5                  |
+---+--------------------------------+
|1  |c7d6779455254d39a599902eb2941290|
|2  |352b3682878d2f82da2336c58a95b308|
|3  |998d3fc0721a82d5feafd0cebe584e45|
|4  |0579aebffd8596b706de97f5f109d535|
|5  |870eef13c4b08dea6e1678309b33dbe4|
+---+--------------------------------+
only showing top 5 rows



# Витрина G

Вытаскиваем user_uid из поля raw_cookie, заменяем значение в поле user_os через функцию "match_code". Создаем новую колонку match_code, принимающую одно из двух значений: GAID или IDFA в зависимости от ОС пользователя. 

In [17]:
data_mart_g = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    search_values_from_maptype("raw_cookie", "user_uid")
      .alias("user_uid"),
    match_code("user_os").alias("match_code")
)

data_mart_g.show(5, truncate=False)

+---+--------+----------+
|ID |user_uid|match_code|
+---+--------+----------+
|1  |5214552 |null      |
|2  |3994681 |null      |
|3  |6679326 |null      |
|4  |5083195 |null      |
|5  |0900790 |null      |
+---+--------+----------+
only showing top 5 rows



# Итоговая объединенная витрина

In [18]:
main_marts = (
    data_mart_a.select('raw_cookie','data_value').alias("df_a")
    .join(
        data_mart_d.alias("df_d"),
        on=(
            (F.md5(F.col("df_d.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_d.inn"), 256) == F.col("df_a.data_value"))
            ),
        how='right'
      )
    .join(
        data_mart_b.alias("df_b"),
        on=(
            (F.md5(F.col("df_b.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_b.inn"), 256) == F.col("df_a.data_value"))
           ),
        how="left"
      )
    .join(
        data_mart_e.alias("df_e"),
        on = (
           F.md5(search_values_from_maptype("raw_cookie",
                                "user_phone")) == F.col("df_e.hash_phone_md5")),
        how ="left"
      )
    .join(
        data_mart_f.alias('df_f'),
        on = (
            F.md5(search_values_from_maptype("raw_cookie",
                                 "user_mail")) == F.col("df_f.hash_email_md5")),
        how = "left"
      )
    .join(
        data_mart_g.alias("df_g"),
        on = (
              search_values_from_maptype("raw_cookie",
                                         "user_uid") == F.col("df_g.user_uid")),
        how = "left"
      )
    .join(
        data_mart_c.alias('df_c'),
        on = (search_values_from_maptype("raw_cookie",
                                  "_sa_cookie_a") == F.col("df_c.sa_cookie_a")),
        how = "left"
      )
    .select(F.col("df_g.user_uid"),
            F.col("df_d.inn").alias("inn"),
            F.col("df_a.data_value").alias("inn_hash"),
            F.col("df_e.hash_phone_md5"),
            F.col("df_f.hash_email_md5"),
            search_values_from_maptype("raw_cookie",
                                       "user_phone").alias("user_phone"),
            search_values_from_maptype("raw_cookie",
                                       "user_mail").alias("user_email"),
            search_values_from_maptype("raw_cookie",
                                       "org_uid").alias("org_uid"),
            F.col("df_d.id").alias("id_d"),
            F.col("df_c.id").alias("id_c"),
            F.col("df_b.id").alias("id_b"),
            F.col("df_e.id").alias("id_e"),
            F.col("df_f.id").alias("id_f"),
            F.col("df_g.id").alias("id_g"),
            F.array(
                search_values_from_maptype("raw_cookie", "_sa_cookie_a"),
                search_values_from_maptype("raw_cookie", "_fa_cookie_a"),
                search_values_from_maptype("raw_cookie", "_ym_cookie_c"),
                search_values_from_maptype("raw_cookie", "_fbp")
            ).alias("array_coockie")
     )
)
main_marts.show(10, truncate=False)

25/08/21 00:24:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 00:24:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/21 0

+--------+------------+----------------------------------------------------------------+--------------------------------+--------------------------------+-----------+------------------+-------+----+----+----+----+----+----+---------------------------------------------------------------------------------------------------------------------------------------+
|user_uid|inn         |inn_hash                                                        |hash_phone_md5                  |hash_email_md5                  |user_phone |user_email        |org_uid|id_d|id_c|id_b|id_e|id_f|id_g|array_coockie                                                                                                                          |
+--------+------------+----------------------------------------------------------------+--------------------------------+--------------------------------+-----------+------------------+-------+----+----+----+----+----+----+---------------------------------------------------------