In [1]:
user = "eremzin"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
534,application_1667827781736_0990,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf

current_dt = "2022-10-26"

price_path = "/user/{}/data/data3/rosstat/price/*".format(user)
city_path = "/user/{}/data/data3/rosstat/city.csv".format(user)
product_path = "/user/{}/data/data3/rosstat/product.csv".format(user)
products_for_stat_path = "/user/{}/data/data3/rosstat/products_for_stat.csv".format(user)
product_join_price_path = "/user/{}/data/data3/rosstat/products_for_stat.csv".format(user)

demography_path = "/user/{}/data/data3/ok/coreDemography".format(user)
country_path = "/user/{}/data/data3/ok/geography/countries.csv".format(user)
rs_city_path = "/user/{}/data/data3/ok/geography/rs_city.csv".format(user)

# Путь до результата
output_path = "/user/{}/task4".format(user)

In [3]:
debugMode = True

## Загрузка таблиц

#### Данные по ценам на товары в разных городах

In [4]:
priceSchema = ( 
    StructType()
    .add("city_id", StringType())
    .add("product_id", IntegerType()) 
    .add("price", StringType())
)

priceDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", ";")
    .schema(priceSchema)
    .csv(price_path)
)

priceDF = (
    priceDF.dropna()
    .select(
        sf.col("city_id"),
        sf.col("product_id"),
        sf.regexp_replace(sf.col("price"), ",", ".").cast(FloatType()).alias("price")
    )
)

if debugMode:
    priceDF.printSchema()
    priceDF.show(3, False)

root
 |-- city_id: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- price: float (nullable = true)

+-------+----------+------+
|city_id|product_id|price |
+-------+----------+------+
|193    |437       |529.38|
|126    |34        |684.46|
|41     |338       |12.81 |
+-------+----------+------+
only showing top 3 rows

#### Справочник городов

In [5]:
citySchema = ( 
    StructType()
    .add("city",StringType())
    .add("city_id",IntegerType()) 
)

cityDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", ";")
    .schema(citySchema)
    .csv(city_path)
)

if debugMode:
    cityDF.printSchema()
    cityDF.show(3, False)

root
 |-- city: string (nullable = true)
 |-- city_id: integer (nullable = true)

+------------+-------+
|city        |city_id|
+------------+-------+
|Белгород    |1      |
|Губкин      |2      |
|Старый Оскол|3      |
+------------+-------+
only showing top 3 rows

#### Справочник товаров и услуг

In [6]:
productSchema = ( 
    StructType()
    .add("product",StringType())
    .add("product_id",IntegerType()) 
)

productDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", ";")
    .schema(productSchema)
    .csv(product_path)
)

if debugMode:
    productDF.printSchema()
    productDF.show(3, False)

root
 |-- product: string (nullable = true)
 |-- product_id: integer (nullable = true)

+----------------------------+----------+
|product                     |product_id|
+----------------------------+----------+
|Фарш мясной, кг             |1         |
|Пельмени, манты, равиоли, кг|2         |
|Печень говяжья, кг          |3         |
+----------------------------+----------+
only showing top 3 rows

#### Идентификаторы товаров для которых нужно собрать статистику

In [7]:
productForStatSchema = StructType().add("product_id",IntegerType())

productForStatDF = ( 
    spark.read
    .option("header", "false")
    .schema(productForStatSchema)
    .csv(products_for_stat_path)
)

if debugMode:
    productForStatDF.printSchema()
    productForStatDF.show(3, False)

root
 |-- product_id: integer (nullable = true)

+----------+
|product_id|
+----------+
|355       |
|446       |
|114       |
+----------+
only showing top 3 rows

#### Сырые данные демографии пользователей

In [8]:
rawCoreDemographySchema = ( 
    StructType()
    .add("user_id",IntegerType())
    .add("create_date",StringType())
    .add("birth_date",IntegerType())
    .add("gender",IntegerType())
    .add("id_country",StringType())
    .add("id_city",IntegerType())
    .add("login_region",IntegerType())
         )

rawCoreDemographyDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", "\t")
    .schema(rawCoreDemographySchema)
    .csv(demography_path)
)

if debugMode:
    rawCoreDemographyDF.printSchema()
    rawCoreDemographyDF.show(3, False)

root
 |-- user_id: integer (nullable = true)
 |-- create_date: string (nullable = true)
 |-- birth_date: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id_country: string (nullable = true)
 |-- id_city: integer (nullable = true)
 |-- login_region: integer (nullable = true)

+--------+-------------+----------+------+-----------+-------+------------+
|user_id |create_date  |birth_date|gender|id_country |id_city|login_region|
+--------+-------------+----------+------+-----------+-------+------------+
|16460783|1182885174073|486       |2     |10414533690|1078547|85          |
|16467391|1176953226317|4669      |2     |10414533690|1384327|85          |
|16467889|1169816093060|6861      |2     |10414533690|33438  |null        |
+--------+-------------+----------+------+-----------+-------+------------+
only showing top 3 rows

#### Идентификатор РФ

In [9]:
countrySchema = ( 
    StructType()
    .add("id",StringType())
    .add("name",StringType())
)

countryDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", ",")
    .schema(countrySchema)
    .csv(country_path)
)

RussiaID = (
    countryDF
    .select("id")
    .where(sf.col("name") == "Россия")
).collect()[0][0]

if debugMode:
    print("Russia ID: {}".format(RussiaID))

Russia ID: 10414533690

#### Данные демографии пользователей из РФ

In [10]:
coreDemographyDF = (
    rawCoreDemographyDF
    .select(
        sf.col("user_id"),
        (sf.col("birth_date") * (24 * 60 * 60)).alias("birth_date"), # перевод из дней в секунды (unix time) 
        sf.col("gender"),
        sf.col("id_city"),
        sf.col("login_region"),
    )
    .where(sf.col("id_country") == RussiaID)
)

if debugMode:
    coreDemographyDF.printSchema()
    coreDemographyDF.show(3, False)

root
 |-- user_id: integer (nullable = true)
 |-- birth_date: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id_city: integer (nullable = true)
 |-- login_region: integer (nullable = true)

+--------+----------+------+-------+------------+
|user_id |birth_date|gender|id_city|login_region|
+--------+----------+------+-------+------------+
|16460783|41990400  |2     |1078547|85          |
|16467391|403401600 |2     |1384327|85          |
|16467889|592790400 |2     |33438  |null        |
+--------+----------+------+-------+------------+
only showing top 3 rows

#### Справочник соответствия городов  Росстат 

In [11]:
sourceRsCitySchema = ( 
    StructType()
    .add("ok_city_id",IntegerType())
    .add("rs_city_id",IntegerType()) 
)

rsCityDF = ( 
    spark.read
    .option("header", "false")
    .option("sep", "\t")
    .schema(sourceRsCitySchema)
    .csv(rs_city_path)
)

if debugMode:
    rsCityDF.printSchema()
    rsCityDF.show(3, False)

root
 |-- ok_city_id: integer (nullable = true)
 |-- rs_city_id: integer (nullable = true)

+----------+----------+
|ok_city_id|rs_city_id|
+----------+----------+
|1000792   |1         |
|6190119   |2         |
|613013    |3         |
+----------+----------+
only showing top 3 rows

## Обработка данных

### Минимальная, максимальная и средняя цена по всем городам

In [12]:
priceStatDF = (
    productForStatDF
    .join(priceDF, productForStatDF.product_id == priceDF.product_id, how='inner')
    .select(
        productForStatDF.product_id,
        sf.col("price")
    )
    .groupBy(sf.col("product_id"))
    .agg(sf.min("price").alias("min_price"),
         sf.max("price").alias("max_price"),
         sf.round(sf.avg("price"), 2).alias("price_avg"))
)

if debugMode:
    priceStatDF.show(5, False)

+----------+---------+---------+---------+
|product_id|min_price|max_price|price_avg|
+----------+---------+---------+---------+
|496       |50.0     |881.0    |281.19   |
|471       |6000.0   |65901.11 |27692.07 |
|463       |111.0    |589.0    |306.4    |
|392       |432.81   |3847.08  |1000.81  |
|243       |1411.76  |11936.51 |3557.27  |
+----------+---------+---------+---------+
only showing top 5 rows

In [13]:
(priceStatDF
 .repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv("{}/price_stat".format(output_path))
)

In [14]:
priceStatSchema = ( 
    StructType()
    .add("product_id",IntegerType())
    .add("min_price",FloatType())
    .add("max_price",FloatType())
    .add("price_avg",FloatType())
)

priceStatDF = ( 
    spark.read
    .option("header", "true")
    .option("sep", ";")
    .schema(priceStatSchema)
    .csv("{}/price_stat".format(output_path))
)

if debugMode:
    priceStatDF.printSchema()
    priceStatDF.show(3, False)

root
 |-- product_id: integer (nullable = true)
 |-- min_price: float (nullable = true)
 |-- max_price: float (nullable = true)
 |-- price_avg: float (nullable = true)

+----------+---------+---------+---------+
|product_id|min_price|max_price|price_avg|
+----------+---------+---------+---------+
|243       |1411.76  |11936.51 |3557.27  |
|392       |432.81   |3847.08  |1000.81  |
|31        |275.8    |659.32   |386.71   |
+----------+---------+---------+---------+
only showing top 3 rows

#### Города в которых хотя бы один товар из списка продаётся по цене выше средней
 (Вспомогательная таблица)

In [15]:
expensiveCityDF = (
    priceDF
    .groupBy(sf.col("city_id"), sf.col("product_id"))
    .agg(sf.avg("price").alias("local_price_avg"))
    .join(priceStatDF, priceStatDF.product_id == priceDF.product_id, how='inner')
    .select(
        sf.col("city_id")
    )
    .where(sf.col("local_price_avg") > sf.col("price_avg"))
    .distinct()
)

if debugMode:
    expensiveCityDF.show(5, False)

+-------+
|city_id|
+-------+
|125    |
|7      |
|124    |
|51     |
|205    |
+-------+
only showing top 5 rows

### Cтатистика для всех пользователей из городов, цена на товары в которых выше средней

In [16]:
okDemDF = (
    coreDemographyDF
    .join(rsCityDF, coreDemographyDF.id_city == rsCityDF.ok_city_id, how = "inner")
    .join(cityDF, rsCityDF.rs_city_id == cityDF.city_id)
    .select(
        sf.col("city"),
        (sf.months_between(sf.to_date(sf.lit(current_dt)), sf.from_unixtime("birth_date"))/12)
        .cast(IntegerType()).alias('age'),
        (sf.col("gender") == 1).alias('man'),
        (sf.col("gender") == 2).alias('woman')
    )
    .groupBy("city")
    .agg(sf.count("man").alias("user_cnt"),
         sf.round(sf.avg("age"), 2).alias("age_avg"),
         sf.sum(sf.col("man").cast(IntegerType())).alias("men_cnt"),
         sf.sum(sf.col("woman").cast(IntegerType())).alias("women_cnt")
    )
    .withColumn("men_share", sf.round((sf.col("men_cnt") / sf.col("user_cnt")), 2))
    .withColumn("women_share", sf.round((1 - sf.col("men_share")), 2))
    .orderBy(sf.col("user_cnt").desc())
)

if debugMode:
    okDemDF.show(5, False)

+-------+--------+-------+-------+---------+---------+-----------+
|city   |user_cnt|age_avg|men_cnt|women_cnt|men_share|women_share|
+-------+--------+-------+-------+---------+---------+-----------+
|Тара   |34777   |39.47  |8155   |26622    |0.23     |0.77       |
|Городец|4498    |39.6   |1402   |3096     |0.31     |0.69       |
|Юрга   |2120    |45.26  |594    |1526     |0.28     |0.72       |
|Волхов |1961    |45.9   |477    |1484     |0.24     |0.76       |
|Чита   |1751    |42.65  |217    |1534     |0.12     |0.88       |
+-------+--------+-------+-------+---------+---------+-----------+
only showing top 5 rows

In [17]:
(okDemDF
 .repartition(1)
 .sortWithinPartitions(sf.col("user_cnt").desc())
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv("{}/ok_dem".format(output_path))
)

In [18]:
okDemSchema = ( 
    StructType()
    .add("city",StringType())
    .add("user_cnt",IntegerType())
    .add("age_avg",FloatType())
    .add("men_cnt",IntegerType())
    .add("women_cnt",IntegerType())
    .add("men_share",FloatType())
    .add("women_share",FloatType())
)

okDemDF = ( 
    spark.read
    .option("header", "true")
    .option("sep", ";")
    .schema(okDemSchema)
    .csv("{}/ok_dem".format(output_path))
)

if debugMode:
    okDemDF.printSchema()
    okDemDF.show(3, False)

root
 |-- city: string (nullable = true)
 |-- user_cnt: integer (nullable = true)
 |-- age_avg: float (nullable = true)
 |-- men_cnt: integer (nullable = true)
 |-- women_cnt: integer (nullable = true)
 |-- men_share: float (nullable = true)
 |-- women_share: float (nullable = true)

+-------+--------+-------+-------+---------+---------+-----------+
|city   |user_cnt|age_avg|men_cnt|women_cnt|men_share|women_share|
+-------+--------+-------+-------+---------+---------+-----------+
|Тара   |34777   |39.47  |8155   |26622    |0.23     |0.77       |
|Городец|4498    |39.6   |1402   |3096     |0.31     |0.69       |
|Юрга   |2120    |45.26  |594    |1526     |0.28     |0.72       |
+-------+--------+-------+-------+---------+---------+-----------+
only showing top 3 rows

#### Города с максимальным и минимальным средним возрастом, максимальной долей мужчин и максимальной долей женщин
 (Вспомогательная таблица)

In [19]:
requirements = (
    okDemDF
    .agg(
        sf.round(sf.max("age_avg"), 2).alias("max_avg_age"),
        sf.min("age_avg").alias("min_avg_age"),
        sf.max("men_share").alias("max_men_share"),
        sf.max("women_share").alias("max_women_share")
    )
).collect()

maxAvgAge = requirements[0][0]
mixAvgAge = requirements[0][1]
maxMenShare = requirements[0][2]
maxWomenShare = requirements[0][3]

eps = 0.001 # константа для сравнения вещественных чисел

selectedCitiesDF = (
    okDemDF
    .select("city")
    .filter((sf.abs(sf.round(sf.col("age_avg"), 2) - maxAvgAge) < eps) 
            | (sf.abs(sf.round(sf.col("age_avg"), 2) - mixAvgAge) < eps)
            | (sf.abs(sf.round(sf.col("men_share"), 2) - maxMenShare) < eps)
            | (sf.abs(sf.round(sf.col("women_share"), 2) - maxWomenShare) < eps)
            )
    .join(cityDF, okDemDF.city == cityDF.city, how = "inner")
    .select (
        cityDF.city,
        "city_id"
    )
)

if debugMode:
    selectedCitiesDF.show()

+-------------+-------+
|         city|city_id|
+-------------+-------+
|        Канаш|    160|
|  Симферополь|    105|
|       Тихвин|     88|
|Сергиев Посад|     38|
|       Казань|    153|
+-------------+-------+

### Cамый дешевый и самый дорогой товары для выбранных городов, а также разница в цене между ними

In [20]:
cityPriceDF = (
    selectedCitiesDF
    .join(sf.broadcast(priceDF), selectedCitiesDF.city_id == priceDF.city_id, how = "inner")
    .join(sf.broadcast(productDF), priceDF.product_id == productDF.product_id, how = "inner")
    .select(sf.col("city").alias("city_name"),
            selectedCitiesDF.city_id,
            "product",
            "price"
    )
)

productPriceStatDF = (
    cityPriceDF
    .groupBy("city_id")
    .agg(sf.min("price").alias("cheapest_product_price"),
         sf.max("price").alias("most_expensive_product_price")
        )
    .withColumn("price_difference", sf.col("most_expensive_product_price") - sf.col("cheapest_product_price"))
)

productStatDF = (
    cityPriceDF
    .join(productPriceStatDF, cityPriceDF.city_id == productPriceStatDF.city_id)
    .withColumn("cheapest_product_name", 
                sf.when(sf.col("price") == sf.col("cheapest_product_price"), sf.col("product"))
                .otherwise(None)
               )
    .withColumn("most_expensive_product_name", 
                sf.when(sf.col("price") == sf.col("most_expensive_product_price"), sf.col("product"))
                .otherwise(None)
               )
    .select("city_name",
            "cheapest_product_name",
            "most_expensive_product_name",
            "price_difference"
    )
    .groupBy("city_name")
    .agg(
        sf.max(sf.col("cheapest_product_name")).alias("cheapest_product_name"),
        sf.max(sf.col("most_expensive_product_name")).alias("most_expensive_product_name"),
        sf.max(sf.col("price_difference")).alias("price_difference")
    )
)

if debugMode:
    productStatDF.show(10)

+-------------+---------------------+---------------------------+----------------+
|    city_name|cheapest_product_name|most_expensive_product_name|price_difference|
+-------------+---------------------+---------------------------+----------------+
|  Симферополь|      Спички, коробок|       Легковой автомоби...|        790489.3|
|       Казань| Предоставление ме...|       Легковой автомоби...|        909921.8|
|Сергиев Посад| Предоставление ме...|       Поездка в Китай, ...|        70000.65|
|       Тихвин| Предоставление ме...|       Легковой автомоби...|       619285.94|
|        Канаш| Предоставление ме...|       Годовая стоимость...|        43383.27|
+-------------+---------------------+---------------------------+----------------+

### Сохраняем результаты на HDFS

In [21]:
(productStatDF
 .repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv("{}/product_stat".format(output_path))
)

In [22]:
spark.stop()