In [172]:
user_data_path = "/user/k.haritonov/data/trainDemography"
country_data_path = "/user/k.haritonov/data/geography/countries.csv"
current_dt = "2019-05-01"
output_path = "/user/k_haritonov/hometask_1"

In [173]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

from datetime import datetime

In [174]:
# load dataframes
df_demog = spark.read.load(user_data_path, 
                     format="csv", sep="\t", header=False, inferSchema=True)
df_countries = spark.read.load(country_data_path, 
                     format="csv", sep=",", header=False, inferSchema=True)

In [175]:
# User-defined aggregate function (UDAF) for extracting date of birth from _c2 column.

# Parse date to datetime type variable
curr_datetime = datetime.strptime(current_dt, "%Y-%m-%d")
# Calculate amount of days between curr_datetime and January, 1st, 1970
diff_days = (curr_datetime - datetime(1970, 1, 1)).days

# when subtract, we found amount of days from date of bitrh of person till curr_datetime
# when divided by 365, we found persons' age
# if value was None, then return None, orherwise return age
udaf_age = F.udf(lambda x: (diff_days - x)/365 if x is not None else None , IntegerType())

In [176]:
# Create alias dataframe for using in further join.

# Add age from UDAF created earlier
# Group by country ID
# count user_IDs and select as user_cnt
# calculate average of 'age' column, truncate decimal and rename column
# count rows where '_c3' value is 1 (male) or 2 (female)

# Then add new columns from previously created:
# divide amount of men by amount of all users, then truncate, same for female

# remove column with user IDs and add alias to dataframe

dmg = df_demog.withColumn('age', udaf_age(F.col('_c2'))) \
.groupby('_c4').agg( \
        F.count(F.col('_c0')).alias('user_cnt'), \
        F.format_number(F.avg(F.col('age')), 2).alias('age_avg'), \
        F.count(F.when(F.col('_c3') == 1, True)).alias('men_cnt'), \
        F.count(F.when(F.col('_c3') == 2, True)).alias('wmn_cnt'), \
) \
.withColumn('men_share', F.format_number(F.col('men_cnt')/F.col('user_cnt'), 2)) \
.withColumn('wmn_share', F.format_number(F.col('wmn_cnt')/F.col('user_cnt'), 2)) \
.drop('_c0') \
.alias('dmg') 


# Join created dataframe with countries.

# join by country ID
# select country name, not its ID from countries dataframe , 
# and all columns from previously created dataframe
# and remove column with country ID which was used when joining
df_homework_1 = dmg \
.join(df_countries, dmg._c4 == df_countries._c0) \
.select(df_countries._c1.alias('country_name'), dmg['*']) \
.drop('_c4')

In [177]:
# Save dataframe to HDFS
# Sort data by partitions and save to .csv file with overwrite mode
df_homework_1 \
    .sortWithinPartitions(F.desc('user_cnt')) \
    .write.save(output_path, format='csv', sep='\t', mode='overwrite')