In [None]:
import findspark
findspark.init()
findspark.find()
import os
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'

In [None]:
from datetime import date, timedelta, datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window 
from pyspark.sql import DataFrame
from pyspark.sql.types import TimestampType

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("yarn")\
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.config("spark.dynamicAllocation.executorIdleTimeout", "10000s") \
.appName("Projecte") \
.getOrCreate()                               

In [None]:
spark

# Функции

In [None]:
# Выбирает все даты, попадающие в заданный диапазон.
# Даты передаются в формате "2022-01-01"
def select_dates(start_date: str, end_date: str) -> list:
    try:
        start_date = datetime.fromisoformat(start_date)
        end_date = datetime.fromisoformat(end_date)
    except:
        print('Неверный формат даты!') # ПИСАТЬ В ЛОГ!!!!!
    # Список дат в формате ['/user/master/data/events/date=2020-10-02', ...]
    paths_list = []
    # Количество дней между датами
    delta = end_date - start_date
    
    if delta.days < 0:
        print ('Указан слишком маленький диапазон!')  # ПИСАТЬ В ЛОГ!!!!!
    for i in range(delta.days + 1):
        paths_list.append('/user/master/data/geo/events/date=' + (start_date + timedelta(i)).__str__()[:10])
    return paths_list

calculate_dist = F.lit(2*6371) * F.asin(
        F.sqrt(
            F.pow(F.sin((F.radians(F.col("lat")) - F.radians(F.col("ltt"))) / 2), 2) +
            F.cos(F.radians(F.col("lat"))) * F.cos(F.radians(F.col("ltt"))) *
            F.pow(F.sin((F.radians(F.col("lon")) - F.radians(F.col("lng"))) / 2), 2)
        )
    )


In [None]:
select_dates('2022-01-01', '2022-01-03')

In [None]:
t = spark.read.parquet(*select_dates('2022-01-01', '2022-03-01')).sample(0.15)

In [None]:
!hdfs dfs -ls /user/master/data/geo/events

## Загрузка данных

### Действия

In [None]:
#  Добавить проверку на количество записей в лог
try:
    activities = spark.read.parquet(*select_dates('2022-01-01', '2022-03-01')).sample(0.15)
except:
    print('Нет данных за указанный диапазон!')

In [None]:
activities.show(3, truncate=False)

In [None]:
activities.schema

### Города

In [None]:
cities = spark.read.csv('/user/flaviusoct/data/coord', sep=';', header=True).withColumnRenamed('lat', 'ltt')

In [None]:
cities.show(30,truncate=False)

## Обработка данных

In [None]:
# Удаляем строки с пропущенными значениями координат

activities_new = activities.where("lat IS NOT NULL AND lon IS NOT NULL")

In [None]:
# Делаем одну колонку со временем
# и добавляем колонку с уникальным id
activities_new = activities_new \
.withColumn('datetime', F.coalesce(F.col('event.datetime'), F.col('event.message_ts')).cast('timestamp')) \
.orderBy(F.asc('datetime')) \
.withColumn('activity_id', F.monotonically_increasing_id())
# В этом точно есть, ибо группировать ниже надо по какому-то общему идентификатору
# в сообщениях и иных действиях. По умолчанию его нет.


In [None]:
activities_new.show(3, truncate=False)

In [None]:
cities_new = cities \
.withColumn("ltt", F.regexp_replace(F.col("ltt"), pattern=',', replacement='.').cast("double")) \
.withColumn("lng", F.regexp_replace(F.col("lng"), pattern=',', replacement='.').cast("double")) \
.withColumn('id', F.col("id").cast("integer"))

## Операции

In [None]:
activities_new = activities_new.crossJoin(cities_new) \
        .withColumn('distance', calculate_dist) \
        .withColumn("distance_rank",
                    F.row_number().over(Window.partitionBy("activity_id").orderBy("distance"))) \
        .where("distance_rank == 1")

In [None]:
activities_new.show()

In [None]:
activities_new.write.mode('overwrite').parquet('/user/flaviusoct/data/tmp/activities_new')
#activities_new = spark.read.parquet('/user/flaviusoct/data/tmp/activities_new')

## Создаём витрину 1

In [None]:
def create_user_view(df: DataFrame) -> DataFrame:
    messages = df.where("event_type='message' AND datetime IS NOT NULL") \
    .select(F.col('event.message_from').alias('user_id'),
            F.col('city'), F.col('datetime')
           ).distinct()
    
    act_city = messages \
    .withColumn(
        'user_messages_rank', F.row_number().over(Window().partitionBy(['user_id']).orderBy(F.desc('datetime')))
    ).where('user_messages_rank == 1') \
    .select(F.col('user_id'), F.col('city').alias('act_city'))
    
    home_city = messages. \
    withColumn("date_group_rank",
               F.row_number().over(Window.partitionBy("user_id", "city").orderBy("datetime"))) \
    .selectExpr('*', 'date_sub(datetime, date_group_rank) as date_group') \
    .groupBy("user_id", "city", "date_group") \
    .agg(F.count("*").alias("cnt_days")) \
    .select("user_id", "city", "cnt_days") \
    .where("cnt_days > 1") \
    .withColumn('max_cnt', F.max('cnt_days').over(Window.partitionBy("user_id", "city"))) \
    .where(F.col('cnt_days') == F.col('max_cnt')) \
    .drop('max_cnt', 'cnt_days') \
    .withColumn("home_city", F.first("city").over(Window.partitionBy("user_id"))) \
    .drop("city") \

    travel_count = messages \
    .groupBy("user_id") \
    .agg(F.count("*").alias("travel_count"))
    
    travel_array = messages \
    .orderBy("datetime") \
    .groupBy("user_id") \
    .agg(F.collect_list("city").alias("travel_array")) \
    .drop("city", "datetime")
    
    local_time = messages \
    .withColumn('message_rank_by_user',
                F.row_number().over(Window.partitionBy("user_id").orderBy(F.desc('datetime')))
               ).where('message_rank_by_user == 1') \
    .select(F.col('user_id'), F.col('datetime')).orderBy(F.col('user_id')) \
    .withColumn('local_time', F.from_utc_timestamp(F.col("datetime"), (F.lit("Australia/Sydney")))) \
        .drop("datetime")
    
    total_user_view = act_city \
    .join(home_city, ["user_id"], 'left') \
    .join(travel_count, ["user_id"]) \
    .join(travel_array, ["user_id"]) \
    .join(local_time, ["user_id"])
    
    return total_user_view

In [None]:
user_view = create_user_view(tt)

In [None]:
user_view.show()

In [None]:
user_view.write.mode('overwrite').parquet('/user/flaviusoct/data/tmp/user_view')

## Создаём витрину 2

In [None]:
activities_new.show(3)

In [None]:
def create_activity_by_zone_view(df: DataFrame, act_type: str, reg: bool = False) -> DataFrame:
    new_df = ''
    if reg:
        new_df = df.where(f'event_type == "message" and datetime IS NOT NULL') \
        .withColumn(
            'msg_rank', F.row_number().over(
                Window.partitionBy('event.message_from').orderBy(F.asc('datetime'))
            )
        ).where('msg_rank == 1')
    else:
        new_df = df.where(f'event_type == "{act_type}" and datetime IS NOT NULL')

    new_df = new_df.withColumn('month', F.month(F.col('datetime'))) \
    .withColumn('week', F.weekofyear(F.col('datetime'))) \
    .select(F.col('month'), F.col('week'), F.col('id').alias('zone_id')) \
    .withColumn(f'month_{act_type}', F.count('*').over(Window.partitionBy('month', 'zone_id'))) \
    .withColumn(f'week_{act_type}', F.count('*').over(Window.partitionBy('week', 'zone_id'))) \
    .groupBy('month', 'week', 'zone_id') \
    .agg(F.first(f"week_{act_type}").alias(f"week_{act_type}"),
         F.first(f"month_{act_type}").alias(f"month_{act_type}"))          
    
    return new_df

def create_activities_by_zone_view(df: DataFrame) -> DataFrame:
    messages = create_activity_by_zone_view(df, 'message')
    subscriptions = create_activity_by_zone_view(df, 'subscription')
    reactions = create_activity_by_zone_view(df, 'reaction')
    registrations = create_activity_by_zone_view(df, 'registration', True)
    
    return messages \
.join(subscriptions, ["week", "month", "zone_id"], 'full') \
.join(reactions, ["week", "month", "zone_id"], 'full') \
.join(registrations, ["week", "month", "zone_id"], 'full') \
.orderBy("week", "month", "zone_id")

In [None]:
test = create_activities_by_zone_view(activities_new)

In [None]:
test.show(40)

## Создаём витрину 3

In [None]:
tt.where('event.subscription_channel IS NOT NULL').show(10, truncate=False)

In [None]:
def create_user_recommend_view(df: DataFrame, start_date: str, end_date: str) -> DataFrame:

    user_channel_pairs = df \
    .where(
        f'datetime < "{end_date}" and datetime IS NOT NULL and event.subscription_channel IS NOT NULL'
    ).select(
        F.col('event.user').alias('user_id'), F.col('event.subscription_channel').alias('channel'), 
        F.col('event_type'), F.col('id').alias('zone_id')
    ).distinct()
    
    friends_pairs = df \
    .select('event.message_from', 'event.message_to') \
    .withColumn('user_pair_hash', F.hash(F.col('message_from') + F.col('message_to'))) \
    .drop_duplicates(subset=['user_pair_hash'])
    
    user_user_pairs = user_channel_pairs \
    .join(user_channel_pairs.select('user_id', 'channel').withColumnRenamed('user_id', 'user_id2'), ['channel'], 'inner') \
    .where('user_id != user_id2') \
    .withColumn('user_pair_hash', F.hash(F.col('user_id') + F.col('user_id2'))) \
    .drop_duplicates(subset=['user_pair_hash']) \
    .join(friends_pairs, ['user_pair_hash'], 'leftanti') \
    .drop('user_pair_hash') \
    .select('user_id', 'user_id2', 'channel', 'zone_id')
    
    target_day_activities = df \
    .where(f'datetime >= "{start_date}" and datetime < "{end_date}"') \
    .withColumn('total_user_id', F.coalesce(F.col('event.message_from'), F.col('event.user'))) \
    .withColumn('time_rank', F.row_number().over(Window.partitionBy('total_user_id').orderBy(F.desc('datetime')))) \
    .where('time_rank == 1') \
    .select('total_user_id', 'lat', 'lon')
    
    user_recommend = user_user_pairs \
    .join(target_day_activities, [user_user_pairs.user_id == target_day_activities.total_user_id], 'left') \
    .withColumnRenamed('lat', 'ltt') \
    .withColumnRenamed('lon', 'lng') \
    .join(
        user_user_pairs.select('user_id2', 'channel').join(
            target_day_activities, [user_user_pairs.user_id2 == target_day_activities.total_user_id], 'left'
        ), ['user_id2'], 'inner'
    ).select('user_id', 'user_id2', 'lat', 'lon', 'ltt', 'lng', 'zone_id') \
    .withColumn('distance', calculate_dist) \
    .drop_duplicates() \
    .withColumn('processed_dttm', F.current_timestamp()) \
    .select(
        F.col('user_id').alias('user_left'), F.col('user_id2').alias('user_right'),
        F.col('zone_id'), F.col('processed_dttm'),
        F.from_utc_timestamp(F.col("processed_dttm"), (F.lit("Australia/Sydney"))).alias('local_time')
    )
    
    return user_recommend

In [None]:
test = create_user_recommend_view(tt, '2021-01-07', '2022-02-08')