# Start session

In [None]:
import os
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
# os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client"
# os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_191/jre"

spark = pyspark.sql.SparkSession.builder \
    .master("yarn") \
    .appName("task4_a.chapaev") \
    .config("spark.executor.instances", "1") \
    .config("spark.executor.memory", "1G") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s") \
    .config("spark.dynamicAllocation.maxExecutors", "1000") \
    .config("spark.driver.memory", "1G") \
    .config("spark.driver.maxResultSize", "1G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .getOrCreate()

spark_context = spark.sparkContext

In [None]:
print("""
Urls:
Yarn       http://91.219.226.252:8088/cluster/scheduler
Spark      http://91.219.226.252:8088/proxy/{app_id}/stages/
App info   http://91.219.226.252:8088/cluster/app/{app_id}/
""".format(app_id=spark_context.applicationId))

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf

## Read source data from rosstat and ok directory

### rosstat directory

In [None]:
city_path = "/user/a.chapaev/data/data3/rosstat/city.csv"
product_path = "/user/a.chapaev/data/data3/rosstat/product.csv"
price_path = "/user/a.chapaev/data/data3/rosstat/price"
products_for_stat_path = "/user/a.chapaev/data/data3/rosstat/products_for_stat.csv"

price_stat_path = "/user/a.chapaev/task3/price_stat"  # path for result of 1st task

In [None]:
core_demography_path = "/user/a.chapaev/data/data3/ok/coreDemography"
rs_city_path = "/user/a.chapaev/data/data3/ok/geography/rs_city.csv"

ok_dem_path = "/user/a.chapaev/task3/ok_dem"  # path for result of 2nd task

In [None]:
product_stat_path = "/user/a.chapaev/task3/product_stat"  # path for result of 3rd task

In [None]:
sourceCityDF = (spark.read
                .option("header", "false")
                .option("sep", ";")
                .csv(city_path)
                )

In [None]:
sourceCityDF.printSchema()
sourceCityDF.show(n=5)

In [None]:
sourceProductDF = (spark.read
                   .option("header", "false")
                   .option("sep", ";")
                   .csv(product_path)
                   )

In [None]:
sourceProductDF.printSchema()
sourceProductDF.show(n=5)

In [None]:
sourcePriceDF = (spark.read
                 .option("header", "false")
                 .option("sep", ";")
                 .csv(price_path)
                 )

In [None]:
sourcePriceDF.printSchema()
sourcePriceDF.show(n=5)

In [None]:
sourceProductsForStatDF = (spark.read
                           .option("header", "false")
                           .option("sep", ";")
                           .csv(products_for_stat_path)
                           )

In [None]:
sourceProductsForStatDF.printSchema()
sourceProductsForStatDF.show(n=5)

### ok directory

In [None]:
sourceCoreDemographyDF = (spark.read
                          .option("header", "false")
                          .option("sep", "\t")
                          .csv(core_demography_path)
                          )

In [None]:
sourceCoreDemographyDF.printSchema()
sourceCoreDemographyDF.show(n=5)

In [None]:
sourceCityIdMatchingDF = (spark.read
                          .option("header", "false")
                          .option("sep", "\t")
                          .csv(rs_city_path)
                          )

In [None]:
sourceCityIdMatchingDF.printSchema()
sourceCityIdMatchingDF.show(n=5)

## Convert data according to the types

### rosstat directory

In [None]:
cityDF = (
    sourceCityDF
    .select(
        sf.col("_c0").alias("city"),
        sf.col("_c1").cast(IntegerType()).alias("city_id")
    )
)

productDF = (
    sourceProductDF
    .select(
        sf.col("_c0").alias("product"),
        sf.col("_c1").cast(IntegerType()).alias("product_id")
    )
)

In [None]:
priceDF = (
    sourcePriceDF
    
    # firstly filter bcs sourcePriceDF has rows with empty "_c2" ("price") column
    .where(sf.col("_c2").isNotNull())
    
    .select(
        sf.col("_c0").cast(IntegerType()).alias("city_id"),
        sf.col("_c1").cast(IntegerType()).alias("product_id"),
        # .cast(DoubleType() requires '.' instead of ','
        sf.regexp_replace(sf.col("_c2"), ",", ".").cast(DoubleType()).alias("price")
    )
)

In [None]:
print(f"Deleted {sourcePriceDF.count() - priceDF.count()} rows")

In [None]:
productsForStatDF = (
    sourceProductsForStatDF
    .select(
        sf.col("_c0").cast(IntegerType()).alias("stat_product_id"),
    )
)

In [None]:
cityDF.printSchema()
cityDF.show(n=5)

productDF.printSchema()
productDF.show(n=5)

priceDF.printSchema()
priceDF.show(n=5)

productsForStatDF.printSchema()
productsForStatDF.show(n=5)

### ok directory

In [None]:
coreDemographyDF = (
    sourceCoreDemographyDF
    .select(
        # delete some columns with non-required data (login_region, create_data, etc)
        sf.col("_c0").cast(IntegerType()).alias("user_id"),
        sf.col("_c2").cast(LongType()).alias("birth_date"),
        sf.col("_c3").cast(ByteType()).alias("gender"),
        sf.col("_c5").cast(IntegerType()).alias("city_id"),
    )
)

cityIdMatchingDF = (
    sourceCityIdMatchingDF
    .select(
        sf.col("_c0").cast(IntegerType()).alias("ok_city_id"),
        sf.col("_c1").cast(IntegerType()).alias("rs_city_id"),
    )
)


In [None]:
coreDemographyDF.printSchema()
coreDemographyDF.show(n=5)

cityIdMatchingDF.printSchema()
cityIdMatchingDF.show(n=5)

In [None]:
# save DF of prices with selected product_ids, bcs we'll need it several times
pricesForStatDF = (
    priceDF
    .join(productsForStatDF, priceDF.product_id == productsForStatDF.stat_product_id, how='inner')
)

In [None]:
print(f"Previos count of rows in price DF: {priceDF.count()}, current: {pricesForStatDF.count()}")

# Complete Task 1 (price_stat)

Для предложенных товаров необходимо вычислить минимальную, максимальную и среднюю цену по всем городам (будет получен датасет price_stat), результат сохранить в hdfs.

In [None]:
priceStatDF = (
    pricesForStatDF
    .groupBy(sf.col("product_id"))
    .agg(
        sf.round(sf.min("price"), 2).alias("min_price"),
        sf.round(sf.max("price"), 2).alias("max_price"),
        sf.round(sf.mean("price"), 2).alias("price_avg")
    )
)

In [None]:
priceStatDF.show(n=10)

In [None]:
(
    priceStatDF
    .repartition(1)
    .sortWithinPartitions(sf.col("price_avg").desc())
    .write
    .mode("overwrite")
    .option("header", "false")
    .option("sep", ";")
    .csv(price_stat_path)
)

# Complete Task 2 (ok_dem)

Задание: из набора данных ОК следует для всех пользователей из городов, цена на товары в которых выше средней, собрать статистику, которая будет содержать: название города, число пользователей из этого города, средний возраст пользователей, число пользователей мужчин, число пользователей женщин, доля мужчин, доля женщин (датасет ok_dem).

#### firstly find cities with prices bigger than mean price


In [None]:
cityForStatDF = (
    pricesForStatDF
    
    # join with mean price for following comparison
    .join(priceStatDF, pricesForStatDF.product_id == priceStatDF.product_id, how='inner') 
    .select(
        sf.col("city_id"),
        sf.col("price_avg"),
        sf.col("price")
    )

    # do comparison, find products with price bigger than mean price
    .where((sf.col("price_avg") < sf.col("price")))

    # save only cities without information about products
    .select(
        sf.col("city_id") 
    )
    .dropDuplicates()
    
    # obtain cities' ids in ok's datasets
    .join(cityIdMatchingDF, pricesForStatDF.city_id == cityIdMatchingDF.rs_city_id, how='inner')  
    .select(
        sf.col("ok_city_id"),
        sf.col("rs_city_id")
    )
)

In [None]:
cityForStatDF.show(n=5)

In [None]:
cityForStatDF.count()

#### find users from necessary cities and calculate their age

In [None]:
coreDemographyForStatDF = (
    coreDemographyDF
    .join(cityForStatDF, coreDemographyDF.city_id == cityForStatDF.ok_city_id)
    .withColumn("age", sf.year(sf.from_unixtime((19417 - sf.col("birth_date")) * 86400)) - 1970)
    .select(
        sf.col("user_id"),
        sf.col("age"),
        sf.col("gender"),
        sf.col("rs_city_id")
    )
)

In [None]:
coreDemographyForStatDF.show(n=5)

#### calculate all necessary info 

In [None]:
okDemDF = (
    coreDemographyForStatDF
    .groupBy("rs_city_id")

    .agg(
        sf.sum(sf.when(coreDemographyForStatDF["gender"] == 1, 1).otherwise(0)).alias("men_cnt"),
        sf.sum(sf.when(coreDemographyForStatDF["gender"] == 2, 1).otherwise(0)).alias("women_cnt"),
        sf.count("user_id").alias("user_cnt"),
        sf.round(sf.mean("age").alias("age_avg"), 2).alias("age_avg")
    )

    # add info about gender shares
    .withColumn("women_share", sf.round(sf.col("women_cnt").cast(DoubleType()) / sf.col("user_cnt"), 2))
    .withColumn("men_share", sf.round(sf.col("men_cnt").cast(DoubleType()) / sf.col("user_cnt"), 2))

    # add cities' name
    .join(cityDF, coreDemographyForStatDF.rs_city_id == cityDF.city_id, how='inner')
    .select(
        sf.col("city").alias("city_name"),
        sf.col("user_cnt"),
        sf.col("women_cnt"),
        sf.col("men_cnt"),
        sf.col("women_share"),
        sf.col("men_share"),
        sf.col("age_avg")
    )
)

In [None]:
okDemDF.show(n=10) 

In [None]:
(
    okDemDF
    .repartition(1)
    .sortWithinPartitions(sf.col("city_name"))
    .write
    .mode("overwrite")
    .option("header", "false")
    .option("sep", ";")
    .csv(ok_dem_path)
)

# Complete Task 3 (product_stat)

Из полученного датасета (ok_dem) нужно будет выбрать города с максимальным и минимальным средним возрастом, максимальной долей мужчин и максимальной долей женщин. Для этих городов в данных Росстат нужно будет выбрать самый дешевый и самый дорогой товары, разницу в цене между ними (датасет product_stat)

#### find cities with max, min age_avg, max men and women shares

In [None]:
# add city_id to result of 2nd task, delete non-required data (user_cnt, women_cnt, men_cnt)

largeOkDemDF = (
    okDemDF
    .join(cityDF, okDemDF.city_name == cityDF.city, how='inner')
    .select(
        sf.col("city").alias("city_name"),
        sf.col("city_id"),
        sf.col("age_avg"),
        sf.col("women_share"),
        sf.col("men_share")
    )
)

In [None]:
largeOkDemDF.show(n=5)

In [None]:
okDemStatRow = (
    largeOkDemDF
    .select(
        sf.max(largeOkDemDF.age_avg).alias("max_age_avg"),
        sf.min(largeOkDemDF.age_avg).alias("min_age_avg"),
        sf.max(largeOkDemDF.men_share).alias("max_men_share"),
        sf.max(largeOkDemDF.women_share).alias("max_women_share")
    )
    .first()
)

okDemStatRow

In [None]:
# for each criterion find city_name and city_id for following min|max price products search

max_age_avg_city = (
    largeOkDemDF
    .where(largeOkDemDF.age_avg == okDemStatRow.max_age_avg)
    .select(
        sf.col("city_name"),
        sf.col("city_id"),
    )
    .first()
)

min_age_avg_city = (
    largeOkDemDF
    .where(largeOkDemDF.age_avg == okDemStatRow.min_age_avg)
    .select(
        sf.col("city_name"),
        sf.col("city_id"),
    )
    .first()
)

max_men_share_city = (
    largeOkDemDF
    .where(largeOkDemDF.men_share == okDemStatRow.max_men_share)
    .select(
        sf.col("city_name"),
        sf.col("city_id"),
    )
    .first()
)

max_women_share_city = (
    largeOkDemDF
    .where(largeOkDemDF.women_share == okDemStatRow.max_women_share)
    .select(
        sf.col("city_name"),
        sf.col("city_id"),
    )
    .first()
)

print(f"City with max average age ({okDemStatRow.max_age_avg}) is {max_age_avg_city.city_name}")
print(f"City with min average age ({okDemStatRow.min_age_avg}) is {min_age_avg_city.city_name}")
print(f"City with max women share ({okDemStatRow.max_women_share}) is {max_women_share_city.city_name}")
print(f"City with max men share ({okDemStatRow.max_men_share}) is {max_men_share_city.city_name}")

#### create DF with selected cities

In [None]:
pickedCityDFSchema = StructType([
    StructField("city_name", StringType()),
    StructField("city_id", IntegerType())
])

In [None]:
pickedCityData = []

for i, pickedCity in enumerate([max_age_avg_city, min_age_avg_city, max_women_share_city, max_men_share_city]):
    pickedCityData.append([])
    pickedCityData[i].extend([
        pickedCity.city_name,
        pickedCity.city_id,
    ])

In [None]:
pickedCityData

In [None]:
pickedCityDF = spark.createDataFrame(pickedCityData, schema=pickedCityDFSchema)
pickedCityDF.show()

#### find max and min price products for picked cities

In [None]:
# find priceDF's rows with max_age_avg, min_age_avg, max_women_share and max_men_share cities
pickedPriceDF = (
    priceDF
    .join(pickedCityDF, "city_id", how='inner')
    .select(
        sf.col("product_id"),
        sf.col("price"),
        sf.col("city_name")
    )
)

pickedPriceDF.show(n=10)

In [None]:
minPriceProductDF = (
    pickedPriceDF
    .groupBy(sf.col("city_name"))
    .agg(
        sf.min("price").alias("price"),
    )
)

print("aggregation result:")
minPriceProductDF.show()

minPriceProductDF = (
    minPriceProductDF
    .join(pickedPriceDF, "price", how="inner") # obtain products' id
    .join(productDF, "product_id", how="inner")  # obtain name of products
    .select(
        minPriceProductDF.city_name,
        sf.col("price").alias("cheapest_product_price"),
        sf.col("product").alias("cheapest_product_name"),
    )
)

print("final result:")
minPriceProductDF.show()

In [None]:
maxPriceProductDF = (
    pickedPriceDF
    .groupBy(sf.col("city_name"))
    .agg(
        sf.max("price").alias("price"),
    )
)

maxPriceProductDF = (
    maxPriceProductDF
    .join(pickedPriceDF, "price", how="inner") # obtain products' id
    .join(productDF, "product_id", how="inner")  # obtain name of products
    .select(
        maxPriceProductDF.city_name,
        sf.col("price").alias("most_expensive_product_price"),
        sf.col("product").alias("most_expensive_product_name"),
    )
)

maxPriceProductDF.show()

In [None]:
# convert DFs into necessary DF schema
productStatDF = (
    maxPriceProductDF
    .join(minPriceProductDF, "city_name", how="inner")
    .withColumn("price_difference", sf.round(sf.col("most_expensive_product_price") - sf.col("cheapest_product_price"), 2))
    .select(
        sf.col("city_name"),
        sf.col("cheapest_product_name"),
        sf.col("most_expensive_product_name"),
        sf.col("price_difference"),
    )
)

In [None]:
productStatDF.show()

In [None]:
(
    productStatDF
    .repartition(1)
    .sortWithinPartitions(sf.col("city_name").desc())
    .write
    .mode("overwrite")
    .option("header", "false")
    .option("sep", ";")
    .csv(product_stat_path)
)

In [None]:
spark.stop()