In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

In [150]:
def load_data_csv(spark, file):
    try: 
        return spark.read.format("csv")\
      .option("sep", ";")\
      .option("inferSchema", "true")\
      .option("header", "true")\
      .load(file)
    except Exception as e:
        print(f"Error while reading csv file {e}")

In [75]:
logins = load_data_csv(spark, "logins.csv")
logins.createOrReplaceTempView("logins")
logins.head()

Row(user_id=1, login_datetime=datetime.datetime(2020, 3, 12, 18, 26, 24, 239000))

In [71]:
logouts = load_data_csv(spark, "logouts.csv") 
logouts.createOrReplaceTempView("logouts")
logouts.head()

Row(user_id=1, logout_datetime=datetime.datetime(2020, 3, 12, 18, 30, 26, 235000))

In [73]:
payments = load_data_csv(spark, "payments.csv")
payments.createOrReplaceTempView("payments")
payments.head()

Row(user_id=1, payment_datetime=datetime.datetime(2020, 3, 12, 18, 32, 57, 315000), payment_sum=10)

In [123]:
"""
SQL
Нужно посчитать DAU (day active users - уникальные пользователи которые входили в игру за день) 
по дням за последние 30 дней. 
Без дополнительных условий;
Ограничиваем только тех пользователей у которых был login за последние 24 часа;
Учитываем только тех пользователей которые суммарно заплатили за последние 30 дней более 100$
"""
df = spark.sql("""
 select to_date(login_datetime,'yyyy-mm-dd') as login_date, 
    l.user_id
    from (select user_id, login_datetime 
            from logins 
            where login_datetime >= date_add(current_timestamp(), -1) ) l
    left join (select 
                user_id, sum(payment_sum) p_sum
                from payments 
                where payment_datetime >= date_add(current_timestamp(), -30)
                group by user_id
                having sum(payment_sum) > 100
    )
    p on p.user_id=l.user_id
    where login_datetime >= date_add(current_timestamp(), -30) and p_sum is not null
    group by to_date(login_datetime,'yyyy-mm-dd'), l.user_id
    order by 1 desc, 2
""")
df.show()

+----------+-------+
|login_date|user_id|
+----------+-------+
|2020-03-14|      2|
|2020-03-14|     10|
|2020-03-13|      2|
|2020-03-13|     10|
+----------+-------+



In [119]:
# logins check
spark.sql("""
select user_id, to_date(login_datetime,'yyyy-mm-dd') login_d, count(1)
from logins
where user_id in (10)
group by user_id, to_date(login_datetime,'yyyy-mm-dd')
order by 1, 2 desc
""").show()

+-------+----------+--------+
|user_id|   login_d|count(1)|
+-------+----------+--------+
|     10|2020-03-14|       2|
|     10|2020-03-13|       2|
|     10|2020-03-12|       2|
|     10|2020-03-11|       2|
|     10|2020-03-10|       2|
|     10|2020-03-09|       2|
|     10|2020-03-08|       2|
|     10|2020-03-07|       2|
|     10|2020-03-06|       2|
|     10|2020-03-05|       2|
|     10|2020-03-04|       2|
|     10|2020-03-03|       2|
|     10|2020-03-02|       2|
|     10|2020-03-01|       2|
|     10|2020-02-29|       2|
|     10|2020-02-28|       2|
|     10|2020-02-27|       2|
|     10|2020-02-26|       2|
|     10|2020-02-25|       2|
|     10|2020-02-24|       2|
+-------+----------+--------+
only showing top 20 rows



In [149]:
'''
Нужно узнать, какой “основной” класс персонажа у каждого юзера. 
То есть, за какой класс он играл больше всего времени за весь период. 
На выходе должны получить таблицу user_id; character
'''
from pyspark.sql.functions import max, sum, rank
from pyspark.sql import Window
df = load_data_csv(spark,"user_classes.csv")
df_agg = df.groupBy("user_id","character").\
            agg(
            sum("session_time").alias("ch_ses"))
            
windowSpec  = Window.orderBy("ch_ses").partitionBy("user_id")
df_favorite_character = df_agg.withColumn("character_rank",
                                          rank().over(windowSpec))\
                        .filter("character_rank = 1")\
                        .drop("ch_ses","character_rank").orderBy("user_id")

df_favorite_character.show()

                       

+-------+-----------+
|user_id|  character|
+-------+-----------+
|      1|    warrior|
|      2|     wizard|
|      3|  barbarian|
|      4|      rogue|
|      5|  barbarian|
|      6|      rogue|
|      7|  barbarian|
|      8|necromancer|
|      9|necromancer|
|     10|  barbarian|
+-------+-----------+



In [192]:
"""
Имеем таблицу с платежами: user_id, pay_dt, pay_sum и таблицу с регистрациями: 
user_id; reg_dt. Нужно посчитать ARPU (Average Revenue Per User) 
для каждого дня жизни этих пользователей. 
День жизни пользователей -- количество дней между датой платежа и датой регистрации пользователя. 
На выходе нужно получить следующую таблицу:
diff; arpu, где diff - количество дней между датой платежа и датой регистрации, 
оно же день жизни пользователей.
Таким образом эта таблица будет показывать, 
сколько в среднем каждый пользователь приносит денег на 0 день жизни, на 1 день жизни и так далее.
Важный момент: стоит обратить внимание, что далеко не все пользователи должны попадать в выборку для расчета. 
Например, считая ARPU за 90й день жизни будет некорректно брать тех юзеров, 
которые зарегистрировали вчера, т.к. они просто еще попросту не прожили 90 дней и никак не могли заплатить.
"""
from pyspark.sql.functions import max, sum, rank, count, lit, datediff, round
from pyspark.sql import Window

df_user_reg = load_data_csv(spark,"user_reg.csv")
df_payments = load_data_csv(spark,"payments.csv")

df_payments_agg = df_payments.withColumn("pay_dt",expr("to_date(payment_datetime)"))\
    .groupBy("pay_dt", "user_id").agg(sum("payment_sum").alias("rev"))


df_user_reg_agg = df_user_reg.withColumn("reg_dt",expr("to_date(reg_dt)"))\
                    .join(df_payments_agg,df_payments_agg.user_id == df_user_reg.user_id )\
                    .withColumn("diff",datediff(df_payments_agg.pay_dt, col("reg_dt")))\
                    .groupBy("diff").agg((round(sum("rev")/col("diff"), 2)).alias("arpu"))
    
df_user_reg_agg.show()

+----+-----+
|diff| arpu|
+----+-----+
|  12| 2.58|
|   9| 3.22|
|  39|24.97|
|  14| 1.79|
|  36| 8.83|
+----+-----+



In [193]:
df_check = df_user_reg.join(df_payments,df_payments.user_id == df_user_reg.user_id)
df_check.show()

+-------+--------------------+-------+--------------------+-----------+
|user_id|              reg_dt|user_id|    payment_datetime|payment_sum|
+-------+--------------------+-------+--------------------+-----------+
|      1|2020-02-03 16:07:...|      1|2020-03-13 14:24:...|          1|
|      2|2020-02-02 16:07:...|      2|2020-03-12 14:24:...|         40|
|      3|2020-02-01 16:07:...|      3|2020-03-11 14:24:...|          3|
|      4|2020-01-31 16:07:...|      4|2020-03-10 14:24:...|          4|
|      5|2020-01-30 16:07:...|      5|2020-03-09 14:24:...|          0|
|      6|2020-01-29 16:07:...|      6|2020-03-08 14:24:...|          4|
|      7|2020-01-28 16:07:...|      7|2020-03-07 14:24:...|          5|
|      8|2020-01-27 16:07:...|      8|2020-03-06 14:24:...|          4|
|      9|2020-01-26 16:07:...|      9|2020-03-05 14:24:...|          3|
|     10|2020-01-25 16:07:...|     10|2020-03-04 14:24:...|         12|
|      1|2020-02-03 16:07:...|      1|2020-03-13 14:24:...|     