In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("create tables") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/28 10:24:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [17]:
spark.sql("CREATE DATABASE IF NOT EXISTS mtsfix_dm")
spark.sql("USE mtsfix_dm")

DataFrame[]

In [24]:
spark.sql("CREATE DATABASE IF NOT EXISTS mtsfix_dm_sec")
spark.sql("USE mtsfix_dm_sec")

DataFrame[]

In [34]:
spark.sql("CREATE DATABASE IF NOT EXISTS raw")
spark.sql("USE raw")

DataFrame[]

In [4]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [8]:
schema_red_conv = T.StructType([
    T.StructField("fix_billing_id", T.IntegerType(), False),
    T.StructField("fix_acc_id", T.LongType(), True),
    T.StructField("fix_acc_num", T.LongType(), True),
    
    T.StructField("foris_id", T.IntegerType(), False),
    T.StructField("foris_acc_id", T.LongType(), True),
    T.StructField("personal_account_number", T.LongType(), True),

    T.StructField("foris_msisdn", T.LongType(), False),
    T.StructField("dt_from", T.TimestampType(), True),
    T.StructField("foris_tariff_plan", T.StringType(), True)
])

In [31]:
n_rows = 2000000

In [32]:
df_red_conv = spark.range(n_rows).select(
    (F.rand() * 1000 % 100).cast("int").alias("fix_billing_id"),
    (F.rand() * 1000000 % 1000000).cast("int").alias("fix_acc_id"),
    (F.rand() * 10000000 % 10000000).cast("int").alias("fix_acc_num"),

    (F.rand() * 1000 % 100).cast("int").alias("foris_id"),
    (F.rand() * 1000000 % 1000000).cast("int").alias("foris_acc_id"),
    (F.rand() * 10000000 % 10000000).cast("int").alias("personal_account_number"),

    (F.rand() * 100000000 % 1000000000).cast("int").alias("foris_msisdn"),

    F.date_add(F.lit("2020-01-01"), (F.rand() * 2000).cast("int")).alias("dt_from"),
    F.lit("9999-12-31").alias("dt_to"),

    (F.rand() * 1000000000 % 1000000000).cast("int").alias("foris_tariff_plan")
)

In [33]:
(
    df_red_conv
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_red_convergent_fixa_a')
)

                                                                                

In [39]:
n_rows = 510247

In [40]:
df_mgts_conv = spark.range(n_rows).select(
    (F.rand() * 1000 % 100).cast("int").alias("fix_billing_id"),
    (F.rand() * 1000000 % 1000000).cast("int").alias("fix_acc_id"),
    (F.rand() * 10000000 % 10000000).cast("int").alias("fix_acc_num"),

    (F.rand() * 1000 % 100).cast("int").alias("foris_id"),
    (F.rand() * 1000000 % 1000000).cast("int").alias("foris_acc_id"),
    (F.rand() * 10000000 % 10000000).cast("int").alias("personal_account_number"),

    (F.rand() * 100000000 % 1000000000).cast("int").alias("foris_msisdn"),

    F.date_add(F.lit("2020-01-01"), (F.rand() * 2000).cast("int")).alias("dt_from"),
    F.lit("9999-12-31").alias("dt_to"),

    (F.rand() * 1000000000 % 1000000000).cast("int").alias("foris_tariff_plan")
)

In [41]:
(
    df_mgts_conv
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_red_convergent_mgts_fixa_a')
)

                                                                                

In [6]:
df_acc_con = (
    spark.table('mtsfix_dm.agg_mtsfix_red_convergent_fixa_a')
    .union(spark.table('mtsfix_dm.agg_mtsfix_red_convergent_mgts_fixa_a'))
    .withColumnRenamed('fix_billing_id', 'billing_id')
    .withColumnRenamed('fix_acc_id', 'acc_id')
    .withColumnRenamed('fix_acc_num', 'acc_num')

    .select('billing_id', 'acc_id', 'acc_num', 'dt_from', 'dt_to')

    .withColumn('bopos_id', (F.rand() * 1000 % 100).cast("int"))
    .withColumn('contract_id', (F.rand() * 1000000 % 1000000).cast("int"))
    .withColumn('contract_num', (F.rand() * 10000000 % 10000000).cast("int"))
    .withColumn('siebel_contract_id', (F.rand() * 1000000 % 1000000).cast("int"))
    .withColumn('customer_id', (F.rand() * 1000000 % 1000000).cast("int"))
    .withColumn('city_id', (F.rand() * 1000 % 100).cast("int"))
    .withColumn('reg_id', (F.rand() * 1000 % 100).cast("int"))
    .withColumn('mr_id', (F.rand() * 1000 % 10).cast("int"))
    .withColumn('customer_type_id', (F.rand() * 1000 % 2).cast("int"))
    .withColumn('calculation_method_id', (F.rand() * 1000 % 10).cast("int"))
    .withColumn('is_noncommercial', F.lit(0))
    .withColumn('is_closed', F.lit(0))
    .withColumn('parent_contract_id', F.lit(None).cast("long"))
    .withColumn('marketing_category_id', (F.rand() * 1000 % 10).cast("int"))
    .withColumn('siebel_status_id', F.lit(1))
    .withColumn('service_provider_id', (F.rand() * 1000 % 100).cast("int"))
    .withColumn('is_b2b', F.lit(0))
    .withColumn('orders_credit', F.lit(0))
    .withColumn('credit', F.lit(0))
    .withColumn('is_archive', F.lit(0))
    .withColumn('customer_category_id', (F.rand() * 1000 % 10).cast("int"))
)

In [7]:
df_acc_con.count()

                                                                                

2510247

In [8]:
(
    df_acc_con
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_acc_con_fixa_a')
)

26/02/28 10:24:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
26/02/28 10:24:55 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [9]:
df_acc_att = (
    spark.table('mtsfix_dm.agg_mtsfix_acc_con_fixa_a')

    .select('billing_id', 'acc_id', 'dt_from', 'dt_to')

    .withColumnRenamed('dt_from', 'dt_open')
    .withColumnRenamed('dt_to', 'dt_close')

    .withColumn('dt_open_spd', F.col('dt_open'))
    .withColumn('dt_close_spd', F.col('dt_close'))

    .withColumn('dt_open_ctv', F.col('dt_open'))
    .withColumn('dt_close_ctv', F.col('dt_close'))

    .withColumn('dt_open_atv', F.col('dt_open'))
    .withColumn('dt_close_atv', F.col('dt_close'))

    .withColumn('dt_open_tlf', F.col('dt_open'))
    .withColumn('dt_close_tlf', F.col('dt_close'))

    .withColumn('dt_open_ictv', F.col('dt_open'))
    .withColumn('dt_close_ictv', F.col('dt_close'))
)

In [10]:
(
    df_acc_att
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_acc_att_fixa_a')
)

                                                                                

In [11]:
spark.table('mtsfix_dm.agg_mtsfix_acc_att_fixa_a').count()

2510247

In [None]:
n_rows = 6000000

In [14]:
df_ids = spark.table("mtsfix_dm.agg_mtsfix_acc_con_fixa_a").select("billing_id", "acc_id").distinct()

months = [0, 1, 2, 3]
df_months = spark.createDataFrame([(i,) for i in months], ["month_offset"])

current_date = F.current_date()
df_months = df_months.withColumn(
    "dt",
    F.add_months(current_date, -F.col("month_offset"))
).select(
    F.trunc("dt", "month").alias("dt")
)

df_payments = df_ids.crossJoin(df_months)

df_payments = (
    df_payments 
    .withColumn("amount", (F.rand() * 500 + 500).cast("decimal(10,2)")) 
    .withColumn('pay_class', (F.rand() * 1000 % 10).cast("int"))
)


In [17]:
df_payments.count()

                                                                                

9914824

In [18]:
(
    df_payments
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_pay_fixa_a')
)

                                                                                

In [21]:
df_ids = spark.table("mtsfix_dm.agg_mtsfix_acc_con_fixa_a").select("billing_id", "acc_id").distinct()

months = [0, 1, 2, 3]  # текущий + 3 предыдущих
df_months = spark.createDataFrame([(i,) for i in months], ["month_offset"])

df_months = df_months.withColumn(
    "dt",
    F.trunc(F.add_months(F.current_date(), -F.col("month_offset")), "month")
).select("dt")

df_traffic = df_ids.crossJoin(df_months)

df_traffic = (
    df_traffic
    .withColumn("number_of_units", (F.rand() * 500 + F.rand() * 200).cast("decimal(10,2)"))
    .withColumn('metric_unit_number', F.lit(5))
)

In [22]:
df_traffic

DataFrame[billing_id: int, acc_id: int, dt: date, number_of_units: decimal(10,2), metric_unit_number: int]

In [25]:
(
    df_traffic
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm_sec.agg_mtsfix_clc_fixa_a')
)

                                                                                

In [27]:
df_equipment = (
    spark.table('mtsfix_dm.agg_mtsfix_acc_con_fixa_a')

    .select('billing_id', 'acc_id')
    .limit(510423)

    .withColumn('dt_from', F.date_add(F.lit("2019-01-01"), (F.rand() * 2000).cast("int")))
    .withColumn('dt_to', F.lit("9999-12-31"))

    .withColumn('equipment_type', (F.rand() * 100000 % 20).cast("int"))
    .withColumn('provide_types', (F.rand() * 100000 % 5).cast("int"))
    
)

In [29]:
df_equipment.count()

510423

In [30]:
(
    df_equipment
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_equipment_fixa_a')
)

                                                                                

In [33]:
df_ids = spark.table("mtsfix_dm.agg_mtsfix_acc_con_fixa_a").select("contract_num").distinct().limit(123104)

months = [0, 1, 2, 3]  # текущий + 3 предыдущих
df_months = spark.createDataFrame([(i,) for i in months], ["month_offset"])

df_months = df_months.withColumn(
    "createdate_dt_orig",
    F.trunc(F.add_months(F.current_date(), -F.col("month_offset")), "month")
).select("createdate_dt_orig")

df_inc = df_ids.crossJoin(df_months)

df_inc = (
    df_inc
    .withColumn('request_id', (F.rand() * 10000000 % 14000).cast("int"))
    .withColumn('row_dt', F.col('createdate_dt_orig'))
    .withColumn('closedate_dt_orig', F.date_add(F.col('createdate_dt_orig'), (F.rand() * 30).cast("int")))
    .withColumn('dogtype', F.lit('Фиксированный'))
    .withColumn('priority', F.lit('Средний'))
    .withColumn('status', F.lit('Закрыт'))
)

In [35]:
(
    df_inc
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('raw.mtsru_remedytb_msk__aradmin_stb_singlinc_dashb_all')
)

                                                                                

In [36]:
spark.table('raw.mtsru_remedytb_msk__aradmin_stb_singlinc_dashb_all').count()

492416

In [39]:
df_ids = spark.table("mtsfix_dm.agg_mtsfix_acc_con_fixa_a").select("billing_id", "acc_id").distinct()

max_service_id = 20
max_services_per_client = 7

df_with_services = (
    df_ids
    .withColumn("num_services", (F.rand() * max_services_per_client + 1).cast("int")) 
    .withColumn("service_id_array",
        F.expr(f"transform(sequence(1, num_services), x -> cast(rand()*{max_service_id} + 1 as int))")
    )
    .select("billing_id", "acc_id", F.explode("service_id_array").alias("app_service_id"))
    .withColumn('is_closed', (F.rand() * 10000000 % 2).cast("int"))
    .withColumn('is_b2c_basic_service', (F.rand() * 10000000 % 2).cast("int"))
    .withColumn('dt_from', F.date_add(F.lit("2019-01-01"), (F.rand() * 2000).cast("int")))
    .withColumn('dt_to', F.lit("9999-12-31"))
)

In [40]:
df_with_services

DataFrame[billing_id: int, acc_id: int, app_service_id: int, is_closed: int, dt_from: date, dt_to: string]

In [41]:
(
    df_with_services
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm.agg_mtsfix_app_service_fixa_a')
)

                                                                                

In [42]:
spark.table('mtsfix_dm.agg_mtsfix_app_service_fixa_a').count()

9908610

In [43]:
df_disc = (
    spark.table('mtsfix_dm.agg_mtsfix_app_service_fixa_a')
    .select('billing_id', 'acc_id', 'app_service_id', 'dt_from', 'dt_to')
    .distinct()
    .limit(124365)
    .withColumn('discount_price', (F.rand() * 10000000 % 150).cast("int")) 
)    

In [45]:
(
    df_disc
    .repartition(1)
    .write
    .format('orc')
    .mode('overwrite')
    .saveAsTable('mtsfix_dm_sec.agg_mtsfix_discount_fixa_a')
)

                                                                                

In [46]:
spark.table('mtsfix_dm_sec.agg_mtsfix_discount_fixa_a').count()

124365

In [50]:
spark.table('mtsfix_dm.agg_mtsfix_acc_con_fixa_a').count()

2000000

In [28]:
(
    spark.table('mtsfix_dm.agg_mtsfix_red_convergent_fixa_a')
).show(10, truncate = False)

+--------------+----------+-----------+--------+------------+-----------------------+------------+----------+----------+-----------------+
|fix_billing_id|fix_acc_id|fix_acc_num|foris_id|foris_acc_id|personal_account_number|foris_msisdn|dt_from   |dt_to     |foris_tariff_plan|
+--------------+----------+-----------+--------+------------+-----------------------+------------+----------+----------+-----------------+
|76            |499472    |7505138    |49      |142383      |3713887                |77212704    |2024-07-11|9999-12-31|198333998        |
|41            |644935    |1112451    |47      |880207      |2699947                |82595739    |2022-03-11|9999-12-31|979256432        |
|79            |807096    |2168186    |96      |92197       |1273831                |63282551    |2021-04-01|9999-12-31|470108970        |
|57            |508004    |6023971    |30      |295567      |1025891                |75180348    |2020-09-09|9999-12-31|404794644        |
|78            |386690    |

In [37]:
spark.table('mtsfix_dm.agg_mtsfix_red_convergent_fixa_a').count()

2000000

In [42]:
spark.table('mtsfix_dm.agg_mtsfix_red_convergent_mgts_fixa_a').count()

510247