In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/nullb
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark

findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import Window
from google.colab import drive

drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
schema = T.StructType([
    T.StructField("inn", T.StringType(), True),
    T.StructField("raw_cookie", T.ArrayType(T.MapType(T.StringType(),
                                                      T.StringType()))),
    T.StructField("event_type", T.StringType(), True),
    T.StructField("event_action", T.StringType(), True),
    T.StructField("data_value", T.StringType(), True),
    T.StructField("geocountry", T.StringType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("user_os", T.StringType(), True),
    T.StructField("systemlanguage", T.StringType(), True),
    T.StructField("geoaltitude", T.StringType(), True),
    T.StructField("meta_platform", T.StringType(), True),
    T.StructField("screensize", T.StringType(), True),
    T.StructField("timestampcolumn", T.DateType(), True)
                       ])

In [None]:
PATH_TO_FILES = "/content/gdrive/MyDrive/data/json/"
DICT_MATCH_CODE = {"IOS": "IDFA", "Android": "GAID"}
filenames = sorted(os.listdir(PATH_TO_FILES), reverse=True)

In [None]:
def read_file_json(file_name, schema_json_file):
 return spark.read.format("json") \
            .load(f"{file_name}", schema=schema_json_file)

In [None]:
def union_and_dropduplicate(df1, df2):
    return (df1.union(df2).withColumn('rank', F.rank()
    .over(Window.partitionBy('inn')
    .orderBy(F.desc('timestampcolumn'))))
                          .filter('rank = 1')
                          .drop('rank'))


In [None]:
def initial_data(schema_json_file: T.StructType, count: int = len(filenames)):
    blank_df = spark.createDataFrame([], schema=schema_json_file)
    for i in filenames[:count]:
        json_file = read_file_json(f"{PATH_TO_FILES}{i}", schema_json_file)
        blank_df = union_and_dropduplicate(blank_df, json_file)
    return blank_df

In [None]:
def search_values_from_maptype(col_name, key):
    return F.expr(f"filter({col_name}, x -> x.key='{key}')")[0]["value"]

In [None]:
@F.udf
def match_code(x):
    return DICT_MATCH_CODE.get(x, None)

In [None]:
initial_data_df = initial_data(schema).select("*")

In [None]:
data_mart_a = initial_data_df.drop('inn')
data_mart_a.show(5, truncate=False)

In [None]:
# Создать цикл счтывания DF по датам. удалять дубликать на объединении дней, удалять cookie по INN

data_mart_b = initial_data_df.select(
    F.row_number().over(
        Window.orderBy(
            F.col('inn').desc()
            )
        ).alias("id"),
        "inn"
  )
data_mart_b.show(5)


In [None]:
data_mart_c = initial_data_df.select(
  (F.monotonically_increasing_id() + 1).alias("id"),
  search_values_from_maptype("raw_cookie", "_sa_cookie_a").alias("sa_cookie_a"))
data_mart_c.show(5)

In [None]:
data_mart_d = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    "inn"
)
data_mart_d.show(5, truncate=False)

In [None]:
data_mart_e = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    F.md5(search_values_from_maptype("raw_cookie", "user_phone"))
      .alias("hash_phone_md5")
)

data_mart_e.show(5, truncate=False)


In [None]:
data_mart_f = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    F.md5(search_values_from_maptype("raw_cookie", "user_mail"))
      .alias("hash_email_md5")
)

data_mart_f.show(5, truncate=False)

In [None]:
data_mart_g = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("ID"),
    search_values_from_maptype("raw_cookie", "user_uid")
      .alias("user_uid"),
    match_code("user_os").alias("match_code")
)

data_mart_g.show(5, truncate=False)

In [None]:
main_marts = (
    data_mart_a.select('raw_cookie','data_value').alias("df_a")
    .join(
        data_mart_d.alias("df_d"),
        on=(
            (F.md5(F.col("df_d.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_d.inn"), 256) == F.col("df_a.data_value"))
            ),
        how='right'
      )
    .join(
        data_mart_b.alias("df_b"),
        on=(
            (F.md5(F.col("df_b.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_b.inn"), 256) == F.col("df_a.data_value"))
           ),
        how="left"
      )
    .join(
        data_mart_e.alias("df_e"),
        on = (
           F.md5(search_values_from_maptype("raw_cookie",
                                "user_phone")) == F.col("df_e.hash_phone_md5")),
        how ="left"
      )
    .join(
        data_mart_f.alias('df_f'),
        on = (
            F.md5(search_values_from_maptype("raw_cookie",
                                 "user_mail")) == F.col("df_f.hash_email_md5")),
        how = "left"
      )
    .join(
        data_mart_g.alias("df_g"),
        on = (
              search_values_from_maptype("raw_cookie",
                                         "user_uid") == F.col("df_g.user_uid")),
        how = "left"
      )
    .join(
        data_mart_c.alias('df_c'),
        on = (search_values_from_maptype("raw_cookie",
                                  "_sa_cookie_a") == F.col("df_c.sa_cookie_a")),
        how = "left"
      )
    .select(F.col("df_g.user_uid"),
            F.col("df_d.inn").alias("inn"),
            F.col("df_a.data_value").alias("inn_hash"),
            F.col("df_e.hash_phone_md5"),
            F.col("df_f.hash_email_md5"),
            search_values_from_maptype("raw_cookie",
                                       "user_phone").alias("user_phone"),
            search_values_from_maptype("raw_cookie",
                                       "user_mail").alias("user_email"),
            search_values_from_maptype("raw_cookie",
                                       "org_uid").alias("org_uid"),
            F.col("df_d.id").alias("id_d"),
            F.col("df_c.id").alias("id_c"),
            F.col("df_b.id").alias("id_b"),
            F.col("df_e.id").alias("id_e"),
            F.col("df_f.id").alias("id_f"),
            F.col("df_g.id").alias("id_g"),
            F.array(
                search_values_from_maptype("raw_cookie", "_sa_cookie_a"),
                search_values_from_maptype("raw_cookie", "_fa_cookie_a"),
                search_values_from_maptype("raw_cookie", "_ym_cookie_c"),
                search_values_from_maptype("raw_cookie", "_fbp")
            ).alias("array_coockie")
     )
)
main_marts.show(10, truncate=False)