In [11]:
import os
import sys
import pandas as pd

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from pyspark.sql.dataframe import DataFrame


@F.udf(returnType=FloatType())
def norm_gb_mb(col):
    import re

    col = str(col).lower().strip()
    gb_ind = "gb" in col
    mb_ind = "mb" in col
    col = re.sub(r"[a-zA-Zа-яА-Я]", "", col).replace(",", ".").strip()
    if "/" in col:
        value = col.split("/")[0]
    elif " " in col:
        value = col.split(" ")[0]
    else:
        value = col.strip()
    if gb_ind:
        return float(value)
    elif mb_ind:
        return float(value) / 1024
    else:
        return 0


def get_devices_dataset(spark: SparkSession, reporting_date: pd.Timestamp) -> DataFrame:
    dim_telinfo = spark.read.parquet("../data/dim_telinfo/*.parquet")
    telinfo_device_type = spark.read.csv(
        "../data/dim_device_type.csv", sep=";", header=True
    )
    cp = spark.read.parquet(
        "../data/customer_profile/"
        + "TIME_KEY={}".format(reporting_date.strftime("%Y-%m-%d"))
    )
    cnt_all = cp.count()
    cnt_active = cp.filter(
        F.col("DW_STATUS_KEY").isin(["A", "S"]) & (F.col("ACTIVE_IND") > 0)
    ).count()

    dim_telinfo = dim_telinfo.select(
        "TAC",
        "OS",
        "TOUCHSCREEN_IND",
        "CAMERA_IND",
        "CAMERA_RESOLUTION",
        "FIRST_DISP_TYPE",
        "FIRST_DISP_DIAG",
        "LAUNCH",
        "FUNCTIONALITY_CLASS",
        "WEIGHT",
        "TWO_SIM_IND",
        "RAM_CAPACITY",
        "ROM_CAPACITY",
        "MEMORY_CARD_IND",
        "BLUETOOTH_IND",
        "WLAN_IND",
        "GPS_IND",
        "GLONASS_IND",
        "A_GPS_IND",
        "GPRS_IND",
        "EDGE_IND",
        "LTE_IND",
        "IND_3G",
        "BRAND"
    )
    cp = cp.filter(F.col("PRIORITY_TAC").isNotNull())

    cp_agg = (
        cp.groupby("PRIORITY_TAC")
        .agg(
            F.count("*").alias("CNT_USERS_OVERALL"),
            F.sum(
                F.when(
                    F.col("DW_STATUS_KEY").isin(["A", "S"]) & (F.col("ACTIVE_IND") > 0),
                    1,
                ).otherwise(0)
            ).alias("CNT_ACTIVE_USERS"),
            F.sum("COUNT_DEVICE_CHANGES").alias("SUM_COUNT_DEVICE_CHANGES"),
            F.avg("COUNT_DEVICE_CHANGES").alias("AVG_COUNT_DEVICE_CHANGES"),
            F.avg("AVG_ARPU_3M").alias("AVG_ARPU_3M"),
            F.avg("COUNT_PERIODS_POSIT_REV_12M").alias(
                "AVG_COUNT_PERIODS_POSIT_REV_12M"
            ),
            F.avg("TOTAL_ROAM_MIN").alias("AVG_ROAM_MIN"),
            F.avg("CNT_CALL_OUT").alias("AVG_CNT_CALL_OUT"),
            F.avg("CNT_CALL_IN").alias("AVG_CNT_CALL_IN"),
            F.avg("VOICE_OUT_DURATION").alias("AVG_VOICE_OUT_DURATION"),
            F.avg("VOICE_IN_DURATION").alias("AVG_VOICE_IN_DURATION"),
            F.avg("UNIC_CTN_CPA").alias("AVG_UNIC_CTN_CPA"),
            F.avg("CNT_SMS_OUT").alias("AVG_CNT_SMS_OUT"),
            F.avg("CNT_SMS_IN").alias("AVG_CNT_SMS_IN"),
            F.avg("SMS_SHORT_NUM").alias("AVG_SMS_SHORT_NUM"),
            F.avg("TOTAL_DATA_VOLUME").alias("AVG_TOTAL_DATA_VOLUME"),
            F.avg("MIN_BALANCE").alias("AVG_MIN_BALANCE"),
            F.avg("AVG_BALANCE").alias("AVG_BALANCE"),
            F.avg("MAX_BALANCE").alias("AVG_MAX_BALANCE"),
            F.avg("STD_BALANCE").alias("AVG_STD_BALANCE"),
            F.avg("BLOCK_DAYS_COUNT").alias("AVG_BLOCK_DAYS_COUNT"),
        )
        .withColumn("PERC_USERS_OVERALL", F.col("CNT_USERS_OVERALL") / cnt_all)
        .withColumn("PERC_ACTIVE_USERS", F.col("CNT_ACTIVE_USERS") / cnt_active)
        .withColumn(
            "RATIO_ACTIVE_ALL", F.col("CNT_ACTIVE_USERS") / F.col("CNT_USERS_OVERALL")
        )
    )

    df = (
        cp_agg.withColumnRenamed("PRIORITY_TAC", "TAC")
        .join(telinfo_device_type, on=["TAC"], how="left")
        .join(dim_telinfo.filter("ROWNUMBER_CONTROL=1"), on=["TAC"], how="left")
    )
    df = (
        df.withColumn(
            "DEVICE_TYPE",
            F.when(
                F.trim(F.col("DEVICE_TYPE")).isin(
                    ["Feature_plus", "Smartphone", "Voice_centered", "Tablets"]
                ),
                F.trim(F.col("DEVICE_TYPE")),
            ).otherwise(F.lit("Others")),
        )
        .withColumn(
            "OS",
            F.when(
                F.trim(F.col("OS")).isin(
                    ["BlackBerry OS", "iOS", "Android", "Proprietary", "Symbian"]
                ),
                F.trim(F.col("OS")),
            ).otherwise(
                F.when(
                    F.trim(F.col("OS")).isin(
                        ["Windows Phone", "Windows Mobile", "Windows"]
                    ),
                    F.lit("Windows"),
                ).otherwise(F.lit("Others"))
            ),
        )
        .withColumn("TOUCHSCREEN_IND", F.col("TOUCHSCREEN_IND") == "Да")
        .withColumn("CAMERA_IND", F.col("CAMERA_IND") == "Да")
        .fillna(False, subset=["CAMERA_IND", "TOUCHSCREEN_IND"])
        .withColumn(
            "CAMERA_RESOLUTION",
            F.when(
                F.trim(F.col("CAMERA_RESOLUTION")).isin(["-99", "N/A", "Нет данных"])
                | F.col("CAMERA_RESOLUTION").isNull(),
                0,
            ).otherwise(
                F.regexp_replace(
                    F.split(F.trim(F.col("CAMERA_RESOLUTION")), " ").getItem(0),
                    ",",
                    ".",
                ).cast("float")
            ),
        )
        .withColumn(
            "FIRST_DISP_TYPE",
            F.when(
                F.trim(F.col("FIRST_DISP_TYPE")).isin(["-99", "N/A", "Нет данных"])
                | F.col("FIRST_DISP_TYPE").isNull(),
                F.lit("Nonname"),
            ).otherwise(
                F.when(
                    F.trim(F.col("FIRST_DISP_TYPE")).isin(
                        [
                            "Super AMOLED",
                            "AMOLED",
                            "Dynamic AMOLED",
                            "Super AMOLED Plus",
                            "Optic AMOLED",
                        ]
                    ),
                    F.lit("AMOLED"),
                ).otherwise(
                    F.when(
                        F.trim(F.col("FIRST_DISP_TYPE")).isin(
                            ["TFT", "PLS-TFT", "TFT IPS (SFT)", "PLS"]
                        ),
                        F.lit("TFT"),
                    ).otherwise(
                        F.when(
                            F.trim(F.col("FIRST_DISP_TYPE")).isin(
                                ["IPS", "Super IPS+", "True HD IPS Plus"]
                            ),
                            F.lit("IPS"),
                        ).otherwise(
                            F.when(
                                F.trim(F.col("FIRST_DISP_TYPE")).isin(
                                    [
                                        "LCD",
                                        "UFB-LCD",
                                        "Super LCD",
                                        "Super LCD 5",
                                        "Super LCD 6",
                                        "3D LCD",
                                    ]
                                ),
                                F.lit("LCD"),
                            ).otherwise(
                                F.when(
                                    F.trim(F.col("FIRST_DISP_TYPE")).isin(
                                        ["CSTN", "STN"]
                                    ),
                                    F.lit("STN"),
                                ).otherwise(
                                    F.when(
                                        F.trim(F.col("FIRST_DISP_TYPE")).isin(
                                            ["OLED", "TN", "LTPS"]
                                        ),
                                        F.trim(F.col("FIRST_DISP_TYPE")),
                                    ).otherwise(F.lit("Others"))
                                )
                            )
                        )
                    )
                )
            ),
        )
        .withColumn(
            "FIRST_DISP_DIAG",
            F.when(
                F.trim(F.col("FIRST_DISP_DIAG")).isin(["-99", "N/A", "Нет данных"])
                | F.col("FIRST_DISP_DIAG").isNull(),
                0,
            ).otherwise(
                F.regexp_replace(
                    F.split(F.trim(F.col("FIRST_DISP_DIAG")), " ").getItem(0), ",", "."
                ).cast("float")
            ),
        )
        .withColumn(
            "LT_DEVICE",
            F.when(
                F.length(F.col("LAUNCH")) == 10,
                F.datediff(
                    F.lit(reporting_date.strftime("%Y-%m-%d")),
                    F.col("LAUNCH").cast("date"),
                ),
            ).otherwise(None),
        )
        .withColumn(
            "FUNCTIONALITY_CLASS",
            F.when(
                F.trim(F.col("FUNCTIONALITY_CLASS")).isin(["-99", "N/A"])
                | F.col("FUNCTIONALITY_CLASS").isNull(),
                "Unknown",
            ).otherwise(F.trim(F.col("FUNCTIONALITY_CLASS"))),
        )
        .withColumn(
            "WEIGHT",
            F.when(
                F.trim(F.col("WEIGHT")).isin(["-99", "N/A", "Нет данных"])
                | F.col("WEIGHT").isNull(),
                0,
            ).otherwise(F.trim(F.col("WEIGHT")).cast("float")),
        )
        .withColumn(
            "MULTISIM_IND",
            F.when(
                F.trim(F.col("TWO_SIM_IND")).isin(
                    [
                        "2 UICC",
                        "3 UICC",
                        "4 UICC",
                        "2 SIM-карты",
                        "Да",
                        "В зависимости от оконечного устройства",
                    ]
                ),
                True,
            ).otherwise(False),
        )
        .withColumn(
            "RAM_CAPACITY",
            F.when(
                F.trim(F.col("RAM_CAPACITY")).isin(["-99", "N/A", "Нет данных"])
                | F.col("RAM_CAPACITY").isNull(),
                0,
            ).otherwise(norm_gb_mb(F.col("RAM_CAPACITY"))),
        )
        .withColumn(
            "ROM_CAPACITY",
            F.when(
                F.trim(F.col("ROM_CAPACITY")).isin(["-99", "N/A", "Нет данных"])
                | F.col("ROM_CAPACITY").isNull(),
                0,
            ).otherwise(norm_gb_mb(F.col("ROM_CAPACITY"))),
        )
        .withColumn(
            "MEMORY_CARD_IND",
            F.when(F.trim(F.col("MEMORY_CARD_IND")).isin(["Да"]), True).otherwise(
                False
            ),
        )
        .withColumn(
            "BLUETOOTH_IND",
            F.when(F.trim(F.col("BLUETOOTH_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "WLAN_IND",
            F.when(F.trim(F.col("WLAN_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "GPS_IND",
            F.when(F.trim(F.col("GPS_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "GLONASS_IND",
            F.when(F.trim(F.col("GLONASS_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "A_GPS_IND",
            F.when(F.trim(F.col("A_GPS_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "GPRS_IND",
            F.when(F.trim(F.col("GPRS_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "EDGE_IND",
            F.when(F.trim(F.col("EDGE_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "LTE_IND",
            F.when(F.trim(F.col("LTE_IND")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn(
            "IND_3G",
            F.when(F.trim(F.col("IND_3G")).isin(["Да"]), True).otherwise(False),
        )
        .withColumn("IS_NOKIA", F.col("BRAND") == "Nokia")
        .withColumn("IS_SAMSUNG", F.col("BRAND") == "Samsung")
        .withColumn("IS_APPLE", F.col("BRAND") == "Apple")
        .withColumn("IS_XIAOMI", F.col("BRAND") == "Xiaomi")
        .withColumn(
            "IS_HUAWEI", F.col("BRAND").isin(["Huawei", "Huawei Honor", "Honor"])
        )
        .withColumn("IS_GOOGLE", F.col("BRAND") == "Google")
        .withColumn("IS_MEIZU", F.col("BRAND") == "Meizu")
        .withColumn("IS_EXPENSIVE", F.col("BRAND").isin(["BlackBerry", "Vertu"]))
    )

    avito_min_date = pd.to_datetime("2019-12-01")
    avito_date_1m = (
        avito_min_date if reporting_date < avito_min_date else reporting_date
    )
    avito_date_2m = avito_date_1m + pd.DateOffset(months=-1)
    avito_date_3m = avito_date_1m + pd.DateOffset(months=-2)

    avito_date_2m = avito_date_2m if avito_date_2m > avito_min_date else avito_min_date
    avito_date_3m = avito_date_3m if avito_date_3m > avito_min_date else avito_min_date

    avito_phones_hdfs = "../data/datamart_avito_phones/"
    avito_phones_1m = spark.read.parquet(
        avito_phones_hdfs + "MONTH_KEY={}".format(avito_date_1m.strftime("%Y-%m-%d"))
    )
    avito_phones_2m = spark.read.parquet(
        avito_phones_hdfs + "MONTH_KEY={}".format(avito_date_2m.strftime("%Y-%m-%d"))
    )
    avito_phones_3m = spark.read.parquet(
        avito_phones_hdfs + "MONTH_KEY={}".format(avito_date_3m.strftime("%Y-%m-%d"))
    )

    avito_phones_1m.createOrReplaceTempView("avito_phones_1m")
    avito_phones_2m.createOrReplaceTempView("avito_phones_2m")
    avito_phones_3m.createOrReplaceTempView("avito_phones_3m")

    avito_phones_1m = spark.sql(
        """
        SELECT 
            TAC,
            CASE WHEN price_rub_to_kzt IS NULL THEN price WHEN price IS NULL THEN price_rub_to_kzt ELSE (price_rub_to_kzt + price) / 2 END avg_avito_price_1m
        FROM avito_phones_1m
    """
    )
    avito_phones_2m = spark.sql(
        """
        SELECT 
            TAC,
            CASE WHEN price_rub_to_kzt IS NULL THEN price WHEN price IS NULL THEN price_rub_to_kzt ELSE (price_rub_to_kzt + price) / 2 END avg_avito_price_2m
        FROM avito_phones_2m
    """
    )
    avito_phones_3m = spark.sql(
        """
        SELECT 
            TAC,
            CASE WHEN price_rub_to_kzt IS NULL THEN price WHEN price IS NULL THEN price_rub_to_kzt ELSE (price_rub_to_kzt + price) / 2 END avg_avito_price_3m
        FROM avito_phones_3m
    """
    )
    df = (
        df.join(avito_phones_1m, on=["TAC"], how="left")
        .join(avito_phones_2m, on=["TAC"], how="left")
        .join(avito_phones_3m, on=["TAC"], how="left")
        .withColumn(
            "NOT_NULL_PRICE_AMT_AVITO",
            sum(
                [
                    F.col("avg_avito_price_1m").isNotNull().cast("integer"),
                    F.col("avg_avito_price_2m").isNotNull().cast("integer"),
                    F.col("avg_avito_price_3m").isNotNull().cast("integer"),
                ]
            ),
        )
        .withColumn(
            "AVG_PRICE_3M_AVITO",
            sum(
                [
                    F.col("avg_avito_price_1m"),
                    F.col("avg_avito_price_1m"),
                    F.col("avg_avito_price_1m"),
                ]
            )
            / F.col("NOT_NULL_PRICE_AMT_AVITO"),
        )
    )

    telprice_old = spark.read.csv(
        "../data/vendor_telprice.csv", sep=";", header=True
    )
    df = (
        df.join(
            telprice_old.select(
                F.col("PRIORITY_TAC").alias("TAC"),
                F.col("PRICE").alias("PRICE_VENDOR_OLD").cast("float"),
            ),
            on="TAC",
            how="left",
        )
        .withColumn(
            "AVG_PRICE",
            F.when(
                F.col("avg_avito_price_1m").isNotNull()
                & F.col("PRICE_VENDOR_OLD").isNotNull(),
                (F.col("avg_avito_price_1m") + F.col("PRICE_VENDOR_OLD")) / 2,
            )
            .otherwise(
                F.when(
                    F.col("avg_avito_price_1m").isNotNull(), F.col("avg_avito_price_1m")
                ).otherwise(F.col("PRICE_VENDOR_OLD"))
            )
            .cast("float"),
        )
        .withColumn(
            "AVG_PRICE_3M",
            F.when(
                F.col("AVG_PRICE_3M_AVITO").isNotNull()
                & F.col("PRICE_VENDOR_OLD").isNotNull(),
                (F.col("AVG_PRICE_3M_AVITO") + F.col("PRICE_VENDOR_OLD")) / 2,
            )
            .otherwise(
                F.when(
                    F.col("AVG_PRICE_3M_AVITO").isNotNull(), F.col("AVG_PRICE_3M_AVITO")
                ).otherwise(F.col("PRICE_VENDOR_OLD"))
            )
            .cast("float"),
        )
        .withColumn(
            "TARGET_PRICE",
            F.when(
                F.col("avg_avito_price_1m").isNotNull(), F.col("avg_avito_price_1m")
            ).otherwise(F.col("PRICE_VENDOR_OLD")),
        )
    )
    df = (
        df.withColumn("avg_avito_price_1m", F.col("avg_avito_price_1m").cast("float"))
        .withColumn("avg_avito_price_2m", F.col("avg_avito_price_2m").cast("float"))
        .withColumn("avg_avito_price_3m", F.col("avg_avito_price_3m").cast("float"))
        .withColumn("TARGET_PRICE", F.col("TARGET_PRICE").cast("float"))
    )
    return df.drop("TWO_SIM_IND", "LAUNCH").withColumn(
        "MONTH_KEY", F.lit(reporting_date.strftime("%Y-%m-%d")).cast("date")
    )


def test_cases(spark_df):
    return 0

In [12]:
from datetime import datetime
argdate = datetime(2018, 11, 1)

In [13]:
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)



In [14]:
dataset = get_devices_dataset(spark, argdate)
assert 0 == test_cases(dataset)

In [15]:
dataset.explain('formatted')

== Physical Plan ==
AdaptiveSparkPlan (45)
+- Project (44)
   +- BroadcastHashJoin LeftOuter BuildRight (43)
      :- Project (38)
      :  +- Project (37)
      :     +- BroadcastHashJoin LeftOuter BuildRight (36)
      :        :- Project (31)
      :        :  +- BroadcastHashJoin LeftOuter BuildRight (30)
      :        :     :- Project (25)
      :        :     :  +- BroadcastHashJoin LeftOuter BuildRight (24)
      :        :     :     :- Project (19)
      :        :     :     :  +- BatchEvalPython (18)
      :        :     :     :     +- Project (17)
      :        :     :     :        +- BroadcastHashJoin LeftOuter BuildRight (16)
      :        :     :     :           :- Project (11)
      :        :     :     :           :  +- BroadcastHashJoin LeftOuter BuildRight (10)
      :        :     :     :           :     :- Project (6)
      :        :     :     :           :     :  +- HashAggregate (5)
      :        :     :     :           :     :     +- Exchange (4)
      :     