In [197]:
# task4
user = 'a.eliseev'

In [198]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf

current_dt = "2023-03-01"

demography_path = "/user/{}/data/data3/ok/coreDemography".format(user)
country_path = "/user/{}/data/data3/ok/geography/countries.csv".format(user)
cities_path = "/user/{}/data/data3/ok/geography/rs_city.csv".format(user)
rosstat_path = "/user/{}/data/data3/rosstat".format(user)

# Путь до результата
output_path = "/user/{}/task4".format(user)

In [364]:
import os
import pyspark
os.environ["HADOOP_CONF_DIR"]="/etc/hadoop/conf"
os.environ["HIVE_HOME"]="/usr/lib/hive"
os.environ["METASTORE_PORT"]="9083"

spark = pyspark.sql.SparkSession.builder\
    .master("yarn")\
    .appName("task4_aeliseev")\
    .config("spark.executor.instances", "1")\
    .config("spark.executor.memory", "1G")\
    .config("spark.executor.cores", "2")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.dynamicAllocation.maxExecutors", "1000")\
    .config("spark.driver.memory", "1G")\
    .config("spark.driver.maxResultSize", "1G")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryoserializer.buffer.max", "1024m")\
    .enableHiveSupport()\
    .getOrCreate()

spark_context = spark.sparkContext

24/04/21 11:10:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/21 11:10:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# Минимальная, максимальная и средняя цена по всем городам 

In [365]:
# Список продуктов, которые учитываются в статистике

products_for_stat_path = "{}/products_for_stat.csv".format(rosstat_path)

source_products_for_stat_df = (
    spark.read
        .option("header", "false")
        .csv(products_for_stat_path) #.limit(20)
)

products_for_stat_df = (
    source_products_for_stat_df
    .select(
        sf.col("_c0").cast(IntegerType()).alias("product_id"),
    )
)

                                                                                

In [366]:
# Набор данных по всем товарам и ценам

prices_path = "{}/price".format(rosstat_path)
source_prices_df = (
    spark.read
        .option("header", "false")
        .option("sep", ";")
        .csv(prices_path)  #.limit(20)
)

source_prices_df = source_prices_df.replace(',', '.', '_c2')

prices_df = (
    source_prices_df
    .select(
        sf.col("_c0").cast(IntegerType()).alias("city_id"),
        sf.col("_c1").cast(IntegerType()).alias("product_id"),
        sf.regexp_replace('_c2', ',', '.').cast(FloatType()).alias("price")
    )
)

In [367]:
# Статистика по необходимым товарам

price_stat_df = (
    prices_df
    .join(products_for_stat_df, on='product_id', how='left')
    .groupBy("product_id")
    .agg(sf.min("price").alias("min_price"),
        sf.max("price").alias("max_price"),
        sf.round(sf.avg("price"), 2).alias("price_avg"))
)

In [368]:
price_stat_df.show()



+----------+---------+---------+---------+
|product_id|min_price|max_price|price_avg|
+----------+---------+---------+---------+
|       148|   853.33|  4633.04|  2264.83|
|       496|     50.0|    881.0|   281.19|
|       463|    111.0|    589.0|    306.4|
|       471|   6000.0| 65901.11| 27692.07|
|       243|  1411.76| 11936.51|  3557.27|
|       392|   432.81|  3847.08|  1000.81|
|        31|    275.8|   659.32|   386.71|
|       451|     61.0|  3911.52|   634.37|
|       137|  3328.29|  14937.7|  8117.33|
|       251|   890.58|  7155.84|  3403.97|
|        85|     90.2|   334.37|   148.68|
|       458|    57.99|   183.77|    105.2|
|        65|   597.38|  1319.07|   851.27|
|        53|    97.62|   406.35|   196.31|
|       255|    643.6|  3603.95|  1424.22|
|       481|  8468.47|  92600.0| 27064.82|
|       296|   4709.8| 26666.99| 11591.51|
|       133|    39.93|    245.0|    88.32|
|       472|    106.0|   935.25|   275.54|
|       322|468359.28| 750167.4|556312.04|
+----------

                                                                                

In [369]:
price_stat_result_path = "{}/price_stat".format(output_path)

(price_stat_df
 .repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(price_stat_result_path)
)

                                                                                

# Статистика по ОК

In [205]:
# # Средняя цена на товары

# avg_price = (
#     prices_df
#     .select(
#         sf.avg('price').alias("price_avg")
#     )
# )

# avg_price.show()

# # Id городов, в которых цена выше средней на товары
# cities_df = (
#     prices_df
#     .groupBy("city_id")
#     .agg(sf.avg('price').alias("price_avg"))
#     .filter(sf.col("price_avg") > avg_price.first()['price_avg'])
# )

In [370]:
# Города, где хотя бы на один товар цена выше среднего

cities_df = (
    prices_df
    .groupBy(["city_id", "product_id"])
    .agg(sf.avg('price').alias("price_avg"))
    #.filter(sf.col("price_avg") > avg_price.first()['price_avg'])
    .join(price_stat_df.select(sf.col('price_avg').alias('price_avg_by_all_cities'), sf.col('product_id')), on='product_id', how='left')
    .select('city_id')
    .where(sf.col('price_avg') > sf.col('price_avg_by_all_cities'))
    .distinct()
)

cities_df.show()

[Stage 19:>                                                         (0 + 1) / 1]

+-------+
|city_id|
+-------+
|    148|
|    243|
|     31|
|     85|
|    251|
|    137|
|     65|
|    255|
|     53|
|    133|
|     78|
|    108|
|    155|
|     34|
|    193|
|    211|
|    126|
|    115|
|    101|
|     81|
+-------+
only showing top 20 rows



                                                                                

In [371]:
cities_df.count() #Количество городов

                                                                                

284

In [372]:
# Страны
source_countries_df = (
    spark.read
        .option("header", "false")
        .option("sep", ",")
        .csv(country_path)
)

countries_df = (
    source_countries_df
    .select(
        sf.col("_c0").cast(StringType()).alias("country_id"),
        sf.col("_c1").cast(StringType()).alias("name"),
    )
)

russia_id = countries_df.where(countries_df.name == 'Россия').select("country_id")

In [373]:
russia_id.show()

+-----------+
| country_id|
+-----------+
|10414533690|
+-----------+



In [374]:
# Сопоставление id росстата и ok
source_ok_rs_cities_id = (
    spark.read
        .option("header", "false")
        .option("sep", "\t")
        .csv(cities_path)
)

ok_rs_cities_id = (
    source_ok_rs_cities_id
    .select(
        sf.col("_c0").cast(IntegerType()).alias("city_ok_id"),
        sf.col("_c1").cast(IntegerType()).alias("city_rs_id"),
    )
)

In [375]:
# Имена городов по данным росстата 

name_cities_path = "{}/city.csv".format(rosstat_path)

source_ok_rs_cities_id = (
    spark.read
        .option("header", "false")
        .option("sep", ";")
        .csv(name_cities_path)
)

rs_cities_df = (
    source_ok_rs_cities_id
    .select(
        sf.col("_c0").cast(StringType()).alias("city_name"),
        sf.col("_c1").cast(IntegerType()).alias("city_id"),
    )
)


In [376]:
# Имена городов c id ok, которые есть в cities_df

names_ok_cities_df = (
    rs_cities_df
    .join(cities_df, rs_cities_df.city_id==cities_df.city_id, how='left')
    .join(ok_rs_cities_id, rs_cities_df.city_id==ok_rs_cities_id.city_rs_id, how='left')
    .select(sf.col('city_name'),
            sf.col('city_ok_id')
           )
)

In [377]:
names_ok_cities_df.show()

+----------------+----------+
|       city_name|city_ok_id|
+----------------+----------+
|        Белгород|   1000792|
|          Губкин|   6190119|
|    Старый Оскол|    613013|
|          Брянск|   3100570|
|           Навля|   4170694|
|          Клинцы|   1270354|
|      Новозыбков|   3260868|
|        Владимир|   2270388|
|Гусь-Хрустальный|  12101942|
|          Ковров|   3380088|
|           Муром|   2390694|
|         Воронеж|   4530796|
|    Борисоглебск|   1600307|
|         Россошь|   3630145|
|         Иваново|   3610558|
|         Кинешма|   2700384|
|          Калуга|   3850033|
|        Людиново|   1870640|
|         Обнинск|   1071212|
|        Кострома|   4181225|
+----------------+----------+
only showing top 20 rows



In [378]:
# Таблтица ОК жителей России с полом, возрастом и городом
source_ok_df = (
    spark.read
        .option("header", "false")
        .option("sep", "\t")
        .csv(demography_path) # .limit(20)
)

source_ok_df = source_ok_df.where(sf.col("_c4") == russia_id.first()['country_id'])

ok_df = (
    source_ok_df
    .select( 
        sf.floor(sf.datediff(sf.to_date(sf.lit(current_dt)),
                 sf.from_unixtime(sf.col("_c2").cast(IntegerType()) * (24*60*60))) / 365.25).alias("age"),
        sf.col("_c3").cast(IntegerType()).alias("sex"),
        sf.col("_c5").cast(IntegerType()).alias("city_ok_id"),
    )
)

In [379]:
men_id = 1
women_id = 2

ok_dem_df = (
    ok_df
    .groupBy("city_ok_id")
    .agg(sf.count("*").alias("user_cnt"),
         sf.round(sf.avg("age"), 2).alias("age_avg"),
         sf.count(sf.when(sf.col('sex') == men_id, 1)).alias("men_cnt"),
         sf.count(sf.when(sf.col('sex') == women_id, 1)).alias("women_cnt"))
    .withColumn("men_share", sf.round(sf.col('men_cnt') / sf.col('user_cnt'), 2))
    .withColumn("women_share", sf.round(sf.col('women_cnt') / sf.col('user_cnt'), 2))
    .join(names_ok_cities_df, on='city_ok_id', how='inner')
    .select(sf.col('city_name'),
            sf.col('user_cnt'),
            sf.col('age_avg'),
            sf.col('men_cnt'),
            sf.col('women_cnt'),
            sf.col('men_share'),
            sf.col('women_share'),
           )
)

In [380]:
ok_dem_df.show()

[Stage 46:>                                                         (0 + 2) / 2]

+----------------+--------+-------+-------+---------+---------+-----------+
|       city_name|user_cnt|age_avg|men_cnt|women_cnt|men_share|women_share|
+----------------+--------+-------+-------+---------+---------+-----------+
|        Белгород|      29|  45.14|      1|       28|     0.03|       0.97|
|          Губкин|     149|  45.35|     53|       96|     0.36|       0.64|
|    Старый Оскол|      25|  43.04|      4|       21|     0.16|       0.84|
|          Брянск|      30|   42.2|      5|       25|     0.17|       0.83|
|           Навля|      84|  45.92|     13|       71|     0.15|       0.85|
|          Клинцы|     409|  40.84|    120|      289|     0.29|       0.71|
|      Новозыбков|      44|  41.05|     17|       27|     0.39|       0.61|
|        Владимир|      84|  52.52|     20|       64|     0.24|       0.76|
|Гусь-Хрустальный|      32|  34.44|      8|       24|     0.25|       0.75|
|          Ковров|      21|  40.95|     10|       11|     0.48|       0.52|
|           

                                                                                

In [381]:
ok_dem_result_path = "{}/ok_dem".format(output_path)

(ok_dem_df
 .repartition(1)
 .sortWithinPartitions(sf.col("user_cnt").desc())
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(ok_dem_result_path)
)

                                                                                

# Cамый дешевый и самый дорогой товары

In [382]:
min_avg_age_city = ok_dem_df.agg(sf.min_by('city_name', 'age_avg').alias('city_name')).first()['city_name']
max_avg_age_city = ok_dem_df.agg(sf.max_by('city_name', 'age_avg').alias('city_name')).first()['city_name']
max_men_share_city = ok_dem_df.agg(sf.max_by('city_name', 'men_share').alias('city_name')).first()['city_name']
max_women_share_city = ok_dem_df.agg(sf.max_by('city_name', 'women_share').alias('city_name')).first()['city_name']

                                                                                

In [383]:
selected_city_id = rs_cities_df.where(rs_cities_df.city_name.isin([min_avg_age_city, max_avg_age_city, max_men_share_city, max_women_share_city]))
selected_city_id.show()

+-----------+-------+
|  city_name|city_id|
+-----------+-------+
|     Тихвин|     88|
|Симферополь|    105|
|     Казань|    153|
|      Канаш|    160|
+-----------+-------+



In [353]:
products_path = "{}/product.csv".format(rosstat_path)

source_products_name_df = (
    spark.read
        .option("header", "false")
        .option("sep", ";")
        .csv(products_path) #.limit(20)
)

products_name_df = (
    source_products_name_df
    .select(
        sf.col("_c0").cast(StringType()).alias("product_name"),
        sf.col("_c1").cast(IntegerType()).alias("product_id"),
    )
)

In [354]:
product_stat_df = (
    prices_df
    .join(selected_city_id, on='city_id', how='inner')
    .groupBy("city_name")
    .agg(sf.min_by("product_id", "price").alias("cheapest_product_id"),
         sf.max_by("product_id", "price").alias("most_expensive_product_id"),
         (sf.max("price") -  sf.min("price").alias("cheapest_product_name")).alias("price_difference")
        )
    .join(products_name_df.select(sf.col('product_name').alias('cheapest_product_name'),
                                  sf.col('product_id').alias('cheapest_product_id')),
          on="cheapest_product_id", how='left')
    .join(products_name_df.select(sf.col('product_name').alias('most_expensive_product_name'),
                                  sf.col('product_id').alias('most_expensive_product_id')),
          on="most_expensive_product_id", how='left')
    .select(sf.col("city_name"),
            sf.col("cheapest_product_name"),
            sf.col("most_expensive_product_name"),
            sf.col("price_difference"),
           )
)

In [355]:
product_stat_df.show()



+-----------+---------------------+---------------------------+----------------+
|  city_name|cheapest_product_name|most_expensive_product_name|price_difference|
+-----------+---------------------+---------------------------+----------------+
|Симферополь|      Спички, коробок|       Легковой автомоби...|        790489.3|
|     Казань| Предоставление ме...|       Легковой автомоби...|        909921.8|
|     Тихвин| Предоставление ме...|       Легковой автомоби...|       619285.94|
|      Канаш| Предоставление ме...|       Годовая стоимость...|        43383.27|
+-----------+---------------------+---------------------------+----------------+



                                                                                

In [356]:
product_stat_path = "{}/product_stat".format(output_path)

(product_stat_df
 .repartition(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", ";")
 .csv(product_stat_path)
)

                                                                                

In [363]:
# После работы обязательно отключаем спарк и отдаем ресурсы!
spark.stop()