In [1]:
# task4
# Впишите ваш логин. Например "vpetrov"
user = "alubyanov"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
209,application_1573843665329_0153,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# В данной ячейке параметры не меняем! 
from pyspark.sql.types import *
import pyspark.sql.functions as sf

current_dt = "2019-11-12"

demography_path = "/user/{}/data/data3/ok/coreDemography".format(user)
country_path = "/user/{}/data/data3/ok/geography/countries.csv".format(user)

# Путь до результата
output_path = "/user/{}/task4".format(user)

In [3]:
#создание объекта текущей даты
import datetime
cur_dt = datetime.datetime.strptime(current_dt, '%Y-%m-%d')
current_year = cur_dt.year
current_month = cur_dt.month
current_day = cur_dt.day

In [5]:
data_demography = (spark.read
                 .option("header", "false")
                 .option("sep", "\t")
                 .csv(demography_path)
                )
data_country = (spark.read
                 .option("header", "false")
                 .option("sep", ",")
                 .csv(country_path )
                )

# Приводим типы и задаем названия полей.
demography = (
    data_demography
    .select(
        sf.col("_c0").cast(IntegerType()).alias("userId"),
        sf.col("_c1").cast(LongType()).alias("create_date"),
        sf.col("_c2").cast(LongType()).alias("birth_date"),
        sf.col("_c3").cast(ShortType()).alias("gender"),
        sf.col("_c4").cast(LongType()).alias("ID_country"),
        sf.col("_c5").cast(IntegerType()).alias("ID_Location"),
        sf.col("_c6").cast(IntegerType()).alias("loginRegion")
    )
)
country = (
    data_country
    .select(
        sf.col("_c0").cast(LongType()).alias("ID_country"),
        sf.col("_c1").alias("name_country")
    )
)

In [6]:
#udf функция для расчета возраста
def find_year_diffdate(year_birth, month_birth, day_birth):
    if year_birth:
        if (month_birth < current_month) | (month_birth == current_month & day_birth <= current_day):
            return current_year - year_birth
        else:
            return current_year - year_birth - 1
    
udf_datediff = sf.udf(find_year_diffdate, IntegerType())

In [10]:
#подсчет кол-ва людей в каждой стране и средний возраст
age_info = (
        demography
        .select(
            sf.col("ID_country").alias("country_id"),
            sf.from_unixtime(sf.col("birth_date")*24*3600).alias("date"),
        )
        .groupBy("country_id")
        .agg(
            sf.count("country_id").alias("user_cnt"), 
            sf.avg(udf_datediff(sf.year("date"), sf.month("date"), sf.dayofmonth("date"))).alias("age_avg"),
        )
        .withColumn("age_avg", sf.format_number("age_avg", 2))
)

In [11]:
#подсчет кол-ва мужчин и женщин в каждой стране
men_info = (
        demography
        .select(
            sf.col("ID_country").alias("country_id"),
            sf.col("gender").alias("gender"),
        )
        .where(sf.col("gender") == 1)
        .groupBy("country_id")
        .agg(
            sf.count(sf.col("gender")).alias("men_cnt"),
        )
)

women_info = (
        demography
        .select(
            sf.col("ID_country").alias("country_id"),
            sf.col("gender").alias("gender"),
        )
        .where(sf.col("gender") == 2)
        .groupBy("country_id")
        .agg(
            sf.count(sf.col("gender")).alias("women_cnt"),
        )
)

In [None]:
#join всех нужных таблиц, получение итоговой витрины
result_table = (
    age_info
    .join(women_info, age_info.country_id == women_info.country_id, how='left')
    .join(men_info, age_info.country_id == men_info.country_id, how='left')
    .join(country, age_info.country_id == country.ID_country, how='inner')
    .select (
        sf.col("name_country"),
        sf.col("user_cnt"),
        sf.col("age_avg"),
        sf.col("men_cnt"),
        sf.col("women_cnt"),
        sf.format_number(sf.col("men_cnt") / sf.col("user_cnt"),2).alias("men_share"),
        sf.format_number(sf.col("women_cnt") / sf.col("user_cnt"), 2).alias("women_share")
    )
).fillna({'men_cnt': 0, 'women_cnt': 0,'men_share': 0.0,'women_share': 0.0}).show()

In [10]:
# Сохранение результата на hdfs
(result_table
 .repartition(1)
 .sortWithinPartitions(sf.col("user_cnt").desc())
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", "\t")
 .csv(output_path)
)

In [11]:
# После работы обязательно отключаем спарк и отдаем ресурсы!
spark.stop()