In [None]:
# import os
# import pyspark
# from pyspark import SparkContext, SparkConf
# from pyspark.sql import SparkSession, Window

# from pyspark.sql.types import *
# import pyspark.sql.functions as sf

# os.environ["HADOOP_CONF_DIR"]="/etc/hadoop/conf"
# os.environ["HIVE_HOME"]="/usr/lib/hive"
# os.environ["METASTORE_PORT"]="9083"
# # os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client"
# # os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_191/jre"

# spark = pyspark.sql.SparkSession.builder\
#     .master("yarn")\
#     .appName("d_arhipova_dz4_solution")\
#     .config("spark.executor.instances", "1")\
#     .config("spark.executor.memory", "1G")\
#     .config("spark.executor.cores", "2")\
#     .config("spark.dynamicAllocation.enabled", "false")\
#     .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
#     .config("spark.dynamicAllocation.maxExecutors", "1000")\
#     .config("spark.driver.memory", "1G")\
#     .config("spark.driver.maxResultSize", "1G")\
#     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
#     .config("spark.kryoserializer.buffer.max", "1024m")\
#     .enableHiveSupport()\
#     .getOrCreate()

# spark_context = spark.sparkContext

In [None]:
import os
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window

from pyspark.sql.types import *
import pyspark.sql.functions as sf

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["HIVE_HOME"]="/usr/lib/hive"
os.environ["METASTORE_PORT"]="9083"
# os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client"
# os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_191/jre"

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Local PySpark Session") \
    .config("spark.executor.instances", "1")\
    .config("spark.executor.memory", "1G")\
    .config("spark.executor.cores", "2")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.dynamicAllocation.maxExecutors", "1000")\
    .config("spark.driver.memory", "1G")\
    .config("spark.driver.maxResultSize", "1G")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryoserializer.buffer.max", "1024m")\
    .enableHiveSupport()\
    .getOrCreate()

spark_context = spark.sparkContext

In [None]:
print("Yarn       http://91.219.226.252:8088/cluster/scheduler")
print("Spark      http://91.219.226.252:8088/proxy/{}/stages/".format(spark_context.applicationId))
print("App info   http://91.219.226.252:8088/cluster/app/{}/".format(spark_context.applicationId))

In [None]:
sourcePriceDF = (
    spark.read
        .option("header", "false")
        .option("sep", ";")
        .csv("/home/d.arhipova/dz4/data3/rosstat/price/")
)
sourcePriceDF.printSchema()
sourcePriceDF.show()

In [None]:
sourcePriceDF = sourcePriceDF.withColumn("_c2", sf.regexp_replace("_c2", ",", "."))
sourcePriceDF.show()

In [None]:
sourcePriceDF = (
    sourcePriceDF
    .select(
        sf.col("_c0").cast(IntegerType()).alias("city_id"),
        sf.col("_c1").cast(IntegerType()).alias("product_id"),
        sf.round(sf.col("_c2").cast(FloatType()), 2).alias("price")
    )
) 
sourcePriceDF.printSchema()
sourcePriceDF.show()

In [None]:
sourceCitiesDF = spark.read.table("user_d_arhipova.cities")
sourceCitiesDF.printSchema()
sourceCitiesDF.show()

In [None]:
sourceProductsDF = spark.read.table("user_d_arhipova.products")
sourceProductsDF.printSchema()
sourceProductsDF.show()

In [None]:
sourceProductsForStatDF = spark.read.table("user_d_arhipova.products_for_stat")
sourceProductsForStatDF.printSchema()
sourceProductsForStatDF.show()

Задание 1. Для предложенных товаров необходимо вычислить минимальную, максимальную и среднюю цену по всем городам (будет получен датасет price_stat), результат сохранить в hdfs. 

In [None]:
joined_df = (
    sourcePriceDF
    .join(sourceCitiesDF, on="city_id")
    .join(sourceProductsDF, on="product_id")
    .join(sourceProductsForStatDF, on="product_id")
)

In [None]:
joined_df.show()

In [None]:
#проверим, что в нашей новой таблице уникальных записей столько же, сколько в таблице products_for_stat
sourceProductsForStatDF.count() == joined_df.select("product_id").distinct().count()

In [None]:
price_stat = joined_df.groupBy("product_id").agg(
    sf.min("price").alias("min_price"),
    sf.max("price").alias("max_price"),
    sf.round(sf.avg("price"), 2).alias("price_avg")
)

In [None]:
price_stat.show()

In [None]:
price_stat = price_stat.orderBy("product_id")
price_stat.show()

In [None]:
#сохраняю локально
price_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/home/d.arhipova/task3/price_stat")

In [None]:
price_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/user/d.arhipova/task3/price_stat")

Задание 2. Из набора данных ОК следует для всех пользователей из городов, цена на товары в которых выше средней, собрать статистику, которая будет содержать: название города, число пользователей из этого города, средний возраст пользователей, число пользователей мужчин, число пользователей женщин, доля мужчин, доля женщин (датасет ok_dem). 

In [None]:
sourceRSCityDF = spark.read.table("user_d_arhipova.rs_city")
sourceRSCityDF.printSchema()
sourceRSCityDF.show()

In [None]:
sourceCoreDemographyDF = spark.read.table("user_d_arhipova.core_demography")
sourceCoreDemographyDF.printSchema()
sourceCoreDemographyDF.show()

In [None]:
ok_dem = sourceCoreDemographyDF.join(sourceRSCityDF, on="id_location").join(sourceCitiesDF, on="city_id", how='inner')
ok_dem.show()

In [None]:
ok_dem = ok_dem.drop("id_location", "create_date", "id_country", "login_region")
ok_dem.show()

In [None]:
price_avg = sourcePriceDF.groupBy("product_id").agg(
    sf.mean("price").alias("price_avg")
)
price_avg.show()

In [None]:
high_price_cities = sourcePriceDF.join(price_avg.select(["product_id","price_avg"]), on = "product_id").filter(sf.col("price") > sf.col("price_avg"))
high_price_cities = high_price_cities.select("city_id").distinct()
high_price_cities.show()

In [None]:
ok_dem = ok_dem.join(high_price_cities, on = "city_id")
ok_dem.show()

In [None]:
ok_dem = ok_dem.groupBy("city_name").agg(
        sf.count("id").alias("user_cnt"),
        sf.round(sf.avg(sf.datediff(sf.to_date(sf.lit("2023-03-01")), sf.to_date(sf.expr("timestamp(birth_date * 24*60*60)")))/365.25), 2).alias("avg_age"),
        sf.sum(sf.when(ok_dem.gender == 1, 1).otherwise(0)).alias("men_cnt"),
        sf.sum(sf.when(ok_dem.gender == 2, 1).otherwise(0)).alias("women_cnt"),
        sf.round(sf.sum(sf.when(ok_dem.gender == 1, 1).otherwise(0)) / sf.count("id"), 2).alias("men_share"),
        sf.round(sf.sum(sf.when(ok_dem.gender == 2, 1).otherwise(0)) / sf.count("id"), 2).alias("women_share")
    ).orderBy("user_cnt", ascending=False)

In [None]:
ok_dem.show()

Из полученного датасета нужно будет выбрать города с максимальным и минимальным средним возрастом, максимальной долей мужчин и максимальной долей женщин.

In [None]:
max_avg_age_city = ok_dem.orderBy(sf.desc("avg_age")).first()
min_avg_age_city = ok_dem.orderBy("avg_age").first()

max_men_share_city = ok_dem.orderBy(sf.desc("men_share")).first()
max_women_share_city = ok_dem.orderBy(sf.desc("women_share")).first()

In [None]:
print("Город с максимальным средним возрастом:", max_avg_age_city.city_name, ", максимальный средний возраст:", max_avg_age_city.avg_age)
print("Город с минимальным средним возрастом:", min_avg_age_city.city_name, ", минимальный средний возраст:", min_avg_age_city.avg_age)
print("Город с максимальной долей мужчин:", max_men_share_city.city_name, ", максимальная доля мужчин:", max_men_share_city.men_share)
print("Город с максимальной долей женщин:", max_women_share_city.city_name, ", максимальная доля женщин:", max_women_share_city.women_share)

In [None]:
#сохраняю локально
price_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/home/d.arhipova/task3/ok_dem")

In [None]:
price_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/user/d.arhipova/task3/ok_dem")

Задание 3. Для этих городов в данных Росстат нужно будет выбрать самый дешевый и самый дорогой товары, разницу в цене между ними (датасет product_stat).

In [None]:
selected_cities = sourceCitiesDF.filter(sf.col("city_name").isin(["Симферополь", "Новороссийск", "Тихвин", "Казань"]))
selected_cities.show()

In [None]:
# Присоединение таблиц для получения полных данных
all_cities_price = sourcePriceDF.join(selected_cities, "city_id").join(sourceProductsDF, "product_id")
all_cities_price.show()

In [None]:
product_stat = (
    all_cities_price
    .join(selected_cities, on = "city_name", how="inner")
    .select(
        sf.col("product_name"),
        sf.col("price"),
        sf.col("city_name")
    )
)
product_stat.show()

In [None]:
min_max_city_prices = product_stat.groupBy("city_name").agg(
    sf.max("price").alias("max_price"),
    sf.min("price").alias("min_price")
)

In [None]:
min_max_city_prices.show()

In [None]:
min_price_products = (
    product_stat
    .join(min_max_city_prices, on = "city_name" , how = "inner")
    .select(
        sf.col("city_name"),
        sf.col("min_price"),
        sf.col("product_name"),
    )
    .where(
        product_stat.price == min_max_city_prices.min_price
    )
).withColumnRenamed("product_name", "min_priced_product_name")
min_price_products.show()

In [None]:
max_price_products = (
    product_stat
    .join(min_max_city_prices, on = "city_name" , how = "inner")
    .select(
        sf.col("city_name"),
        sf.col("max_price"),
        sf.col("product_name"),
    )
    .where(
        product_stat.price == min_max_city_prices.max_price
    )
).withColumnRenamed("product_name", "max_priced_product_name")
max_price_products.show()

In [None]:
product_stat = min_price_products.join(max_price_products, on = "city_name", how="inner").withColumn("price_diff", sf.col("max_price") - sf.col("min_price"))
product_stat.show()

In [None]:
#сохраняю локально
product_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/home/d.arhipova/task3/product_stat")

In [None]:
product_stat.write.option("sep", ";").option("header", "true").mode("overwrite").format("csv").save("/user/d.arhipova/task3/product_stat")

In [None]:
# После работы обязательно отключаем спарк и отдаем ресурсы!
spark.stop()