In [1]:
user = "itrenyov"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
539,application_1678651132715_1554,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf
from pyspark.sql.window import Window

current_dt = "2023-03-01"

city = "/user/{}/data/data3/rosstat/city.csv".format(user)
product = "/user/{}/data/data3/rosstat/product.csv".format(user)
products_for_stat = "/user/{}/data/data3/rosstat/products_for_stat.csv".format(user)
price_path = "/user/{}/data/data3/rosstat/price".format(user)

demography_path = "/user/{}/data/data3/ok/coreDemography".format(user)
countries = "/user/{}/data/data3/ok/geography/countries.csv".format(user)
rs_city = "/user/{}/data/data3/ok/geography/rs_city.csv".format(user)

# Путь до результата
output_path_price_stat = "/user/{}/task4/price_stat".format(user)
output_path_ok_dem = "/user/{}/task4/ok_dem".format(user)
output_path_product_stat = "/user/{}/task4/product_stat".format(user)

In [4]:
# Приводим все к DataFrame и даем понятные названия столбцам
cityDF = spark.read.csv(city, sep=';').select(
    sf.col("_c0").cast(StringType()).alias("city"),
    sf.col("_c1").cast(IntegerType()).alias("city_id")
)

productDF = spark.read.csv(product, sep=';').select(
    sf.col("_c0").cast(StringType()).alias("product"),
    sf.col("_c1").cast(IntegerType()).alias("product_id")
)

productStatDF = spark.read.csv(products_for_stat, sep=';').select(
    sf.col("_c0").cast(IntegerType()).alias("product_id")
)

countriesDF = spark.read.csv(countries, sep=',').select(
    sf.col("_c0").cast(IntegerType()).alias("city_id"),
    sf.col("_c1").cast(StringType()).alias("city_name")
)

rsCityDF = spark.read.csv(rs_city, sep='\t').select(
    sf.col("_c0").cast(LongType()).alias("ok_city_id"),
    sf.col("_c1").cast(IntegerType()).alias("rs_city_id")
)

priceDF = spark.read.csv(price_path, sep=';').select(
    sf.col("_c0").cast(LongType()).alias("rs_city_id"),
    sf.col("_c1").cast(IntegerType()).alias("product_id"),
    sf.col("_c2").alias("price")
)
priceDF = priceDF.withColumn("price", sf.regexp_replace("price", ',', '.').cast("float")) 

demographyDF = spark.read.csv(demography_path, sep='\t').select(
    sf.col("_c0").cast(IntegerType()).alias("user_id"),
    sf.col("_c1").cast(LongType()).alias("create_date"),
    sf.col("_c2").cast(ShortType()).alias("birth_date"),
    sf.col("_c3").cast(IntegerType()).alias("gender"),
    sf.col("_c4").cast(LongType()).alias("id_country"),
    sf.col("_c5").cast(LongType()).alias("id_city"),
    sf.col("_c6").cast(IntegerType()).alias("login_region")
)

In [5]:
price_stat = (
    priceDF.alias("left")
    .join(productStatDF, priceDF.product_id == productStatDF.product_id, "inner")
    .select(sf.col("left.rs_city_id"), sf.col("left.product_id"), sf.col("left.price"))
    .groupBy("product_id")
    .agg(
        sf.min(sf.col("price")).cast(DecimalType(20,2)).alias("min_price"),
        sf.max(sf.col("price")).cast(DecimalType(20,2)).alias("max_price"),
        sf.avg(sf.col("price")).cast(DecimalType(20,2)).alias("avg_price"),
    )
    .orderBy(["product_id"], ascending=True)
)

In [7]:
price_stat = spark.read.option("header", "true").csv(output_path_price_stat, sep=';')

# UPD age calculation: https://towardsdatascience.com/how-to-accurately-calculate-age-in-bigquery-999a8417e973

ok_dem  = (
    priceDF
    .join(
        price_stat,
        [priceDF.product_id == price_stat.product_id, priceDF.price > price_stat.avg_price],
        "inner",
    )
    .select(sf.col('rs_city_id')).alias("rich_cities")
    .distinct()
    .join(cityDF, sf.col('rich_cities.rs_city_id') == cityDF.city_id, "inner")
    .select(sf.col("city"), sf.col("rs_city_id"))
    .join(rsCityDF, sf.col('rich_cities.rs_city_id') == rsCityDF.rs_city_id, "inner")
    .select(sf.col('city'), sf.col('ok_city_id'))
    .join(demographyDF, sf.col('ok_city_id') == demographyDF.id_city, 'inner')
    .select(
        sf.col('city'),
        sf.col('birth_date'),
        sf.col('gender'),
    )
    .groupBy(sf.col('city'))
    .agg(
        sf.count(sf.col('gender')).alias('user_cnt'),
        sf.count(sf.when(sf.col('gender') == 1, True)).alias('men_cnt'),
        sf.count(sf.when(sf.col('gender') == 2, True)).alias('women_cnt'),
        sf.avg(sf.datediff(sf.lit(current_dt), sf.from_unixtime(sf.col('birth_date') * 24 * 60 * 60)) / 365.25)
        .cast(IntegerType()).alias('age_avg'),
    )
    .select(
        sf.col('city'),
        sf.col('user_cnt'),
        sf.col('age_avg'),
        sf.col('men_cnt'),
        sf.col('women_cnt'),
        (sf.col('men_cnt') / sf.col('user_cnt')).cast(DecimalType(20,2)).alias('men_share'),
        (sf.col('women_cnt') / sf.col('user_cnt')).cast(DecimalType(20,2)).alias('women_share'),
    )
    .orderBy(sf.col('user_cnt'), ascending=False)
)

In [56]:
ok_dem = spark.read.option("header", "true").csv(output_path_ok_dem, sep=';')
stats = ok_dem.agg(
    sf.max(sf.col("age_avg")),
    sf.min(sf.col("age_avg")),
    sf.max(sf.col("men_share")),
    sf.max(sf.col("women_share")),
).collect()[0]

product_stat = (
    ok_dem
    .where(
        (sf.col("age_avg") == stats[0]) |
        (sf.col("age_avg") == stats[1]) |
        (sf.col("men_share") == stats[2]) |
        (sf.col("women_share") == stats[3])
    )
    .select(sf.col("city")).alias("suitable_cities")
    .join(cityDF, sf.col("suitable_cities.city") == cityDF.city, "inner")
    .select(sf.col("suitable_cities.city"), sf.col("city_id"))
    .join(priceDF, sf.col("city_id") == priceDF.rs_city_id, "inner")
    .select(sf.col("city"), sf.col("product_id"), sf.col("price")).alias("stat_ok")
    .join(productDF, sf.col("stat_ok.product_id") == productDF.product_id, "inner")
    .select(sf.col("city"), sf.col("product"), sf.col("price"))
    .withColumn('cheap', sf.min('price').over(Window.partitionBy('city')))
    .withColumn('expensive', sf.max('price').over(Window.partitionBy('city')))
    .select(
        sf.col("city").alias("city_name"),
        sf.col('price'),
        sf.when(sf.col("price") == sf.col("cheap"), sf.col("product")).alias('min_product'),
        sf.when(sf.col("price") == sf.col("expensive"), sf.col("product")).alias('max_product'),
        (sf.col("expensive") - sf.col("cheap")).alias("price_difference")
    )
    .where(
        (sf.col("min_product").isNotNull()) | 
        (sf.col("max_product").isNotNull())
    )
    .groupBy("city_name")
    .agg(
        sf.last("max_product", True).alias('most_expensive_product_name'), 
        sf.last("min_product", True).alias('cheapest_product_name'),
        sf.last("price_difference", True).alias('price_difference'),
    )
)

In [10]:
# Сохранение результата на hdfs
(price_stat.repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(output_path_price_stat)
)

(ok_dem.repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(output_path_ok_dem)
)

(product_stat.repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(output_path_product_stat) 
)

In [58]:
# После работы обязательно отключаем спарк и отдаем ресурсы!
spark.stop()