#Скрипт для расчёта основных недельных и месячных метрик (как для автоматизации так и для ручных расчётов)

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import datetime
import warnings
warnings.simplefilter('ignore')

In [None]:
!pip install clickhouse_driver

Collecting clickhouse_driver
[?25l  Downloading https://files.pythonhosted.org/packages/d9/ea/265008686349490763d450bceeb1ad072352d311e11dd9e5f68993a20ebb/clickhouse_driver-0.2.0-cp36-cp36m-manylinux2010_x86_64.whl (478kB)
[K     |████████████████████████████████| 481kB 5.8MB/s 
Installing collected packages: clickhouse-driver
Successfully installed clickhouse-driver-0.2.0


In [None]:
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import IntegrityError

In [None]:
from clickhouse_driver import Client
client = Client(host='', port='', user='', password='', database='')

In [None]:
from sqlalchemy import create_engine
engine = create_engine('')

#Основные функции

###Функция для выгрузки данных

In [None]:
def upload_data(platform, start_date, finish_date, games_type='free2play'):
  """Позволяет выгружать данные за период [start_date, finish_date].
  
  Parameters:
  ----------
  platform: str, in ['android', 'ios']
    Название платформы (операционной системы), для которой делаеся выгрузка.
  start_date: datetime.date
  finish_date: datetime.date
    Выгрузка делается за период [start_date, finish_date] 
    (обе границы включаются в период). Поэтому, для, например,
    выгрузки недельных данных необходимо указать дату понедельника и воскресенья.
  games_type: str, in ['free2play', 'premium']
   Тип игр, для которых делается выгрузка.
  """

  # переведём start_date, finish_date в строковый тип, чтобы использовать их в запросе
  start_date = str(start_date)
  finish_date = str(finish_date)
  if games_type == 'free2play':
    sql_where_app_add = f"like '%free2play%'"
  elif games_type == 'premium':
    sql_where_app_add = f"not like '%free2play%'"
  
  sql_query = f"""
  select  JSONExtractString(device, 'advertising_id') as advertising_id,
          app_info__id as game,  event_value_in_usd,
          event_timestamp, user_first_touch_timestamp, event_name
  from {platform}1
  where event_date >= '{start_date}' and event_date <= '{finish_date}'
        and event_name in ('session_start', 'in_app_purchase')
        and app_info__id {sql_where_app_add}
  union all
  select  JSONExtractString(device, 'advertising_id') as advertising_id,
          app_info__id as game,  event_value_in_usd,
          event_timestamp, user_first_touch_timestamp, event_name
  from {platform}2
  where event_date >= '{start_date}' and event_date <= '{finish_date}'
        and event_name in ('session_start', 'in_app_purchase')
        and app_info__id {sql_where_app_add}
  """
  if platform == 'android':
    sql_query_add = f"""union all
    select  JSONExtractString(device, 'advertising_id') as advertising_id,
            app_info__id as game,  event_value_in_usd,
            event_timestamp, user_first_touch_timestamp, event_name
    from android3
    where event_date >= '{start_date}' and event_date <= '{finish_date}'
            and event_name in ('session_start', 'in_app_purchase')
            and app_info__id {sql_where_app_add}"""
    sql_query += sql_query_add

  result, columns = client.execute(sql_query, with_column_types=True)
  data = pd.DataFrame(result, columns=[tuple[0] for tuple in columns])

  return data

##Функции корректировки данных и вычислений

In [None]:
def transform_data_expand(data):

  """На вход получает сырые данные, выгруженные из БД clickhouse запросом выше. 
    Проводит предобработку и преобразование данных (какие именно лучше посмотреть в коде). 
    Возвращает  транформированный датафрейм.
    
    Parameters:
    ----------
    data: pd.DataFrame
      Сырые данные, выгруженные из базы данных.
    
    Returns:
    data_copy: pd.DataFrame
      Скорректированные и преобразованные данные."""
    
  data_copy = data.copy(deep=True)
  # корректируем тип данных в event_timestamp (на всякий случай)
  data_copy.event_timestamp = pd.to_datetime(data_copy.event_timestamp)
  #создадим поле event_date, используя event_timestamp (т.к. иногда даты в event_date и event_timestamp могут не совпадать)
  data_copy['event_date'] = data_copy.event_timestamp.dt.date
  data_copy.user_first_touch_timestamp = pd.to_datetime(data_copy.user_first_touch_timestamp)
  
  #поменяем тип данных в поле event_value_in_usd (с decimal на float)
  data_copy.event_value_in_usd = data_copy.event_value_in_usd.astype('float')

  # создадим даты для когорт
  data_copy['cohort'] = data_copy.user_first_touch_timestamp.dt.date
  
  #оставим только данные по играм, которые были установлены позже 01.09.2019
  data_copy = data_copy[data_copy.user_first_touch_timestamp > '2019-09-01']

  #удалим данные по пользователям, у которых advertising_id null или пустая строка или другая невалидная строка
  data_copy = data_copy[data_copy.advertising_id.map(lambda x: len(x) > 10)]

  # оставим только строки, в которых количество NaN не больше 1 (т.е. только в поле event_value_in_usd может быть NaN)
  data_copy = data_copy[data_copy.isnull().sum(axis=1) < 2]
  #сортируем данные по полям cohort_date, event_date
  data_copy = data_copy.sort_values(by=['cohort', 'event_date'], ascending=[True, True])
  data_copy = data_copy.reset_index(drop=True)
  return data_copy

In [None]:
def append_add_features(transform_data):
  """Добавляет преобразованным функцией transform_data_expand данным дополнительные поля: 
    total_games - полное число игр у игрока, paying_games - количество игр у игрока, в 
    которых он платил.
    
    Parameters:
    -----------
    transform_data: pd.DataFrame
      Данные, преобразованные при помощи функции
      transform_data_expand.
    
    Returns:
    transform_data: pd.DataFrame
      Те же данные, что и по на входе, 
      только с несколькими дополнительными полями.
    """
  # добавим поле с общим количеством игр у игрока
  users_total_games = transform_data.groupby('advertising_id', as_index=False)\
                                   .agg({'game': 'nunique'})\
                                   .rename(columns={'game': 'total_games'})
  transform_data = transform_data.merge(users_total_games, on='advertising_id')

  # добавим поле с количеством игр, в которых игрок платил
  users_paying_games = transform_data.query("event_value_in_usd > 0").groupby('advertising_id', as_index=False)\
                                   .agg({'game': 'nunique'})\
                                   .rename(columns={'game': 'paying_games'})
  transform_data = transform_data.merge(users_paying_games, on='advertising_id', how='left')
  
  return transform_data

In [None]:
def simple_calc_metrics(data):
  """Вычисление метрик arpu, arppu, paying_share простым способом (без взвешивания).
  
  Parameters:
  -----------
  data: pd.DataFrame
    Сырые данные, выгруженные из БД clichouse запросом выше.
  
  Returns:
  -------
  result: tuple
    Кортеж из метрик arpu, arppu, paying_share """

  data = append_add_features(transform_data_expand(data))

  arpu = round(data.event_value_in_usd.sum() / data.advertising_id.nunique(), 2)
  arppu = round(data.event_value_in_usd.sum() / data.query("event_value_in_usd > 0").advertising_id.nunique(), 2)
  paying_share = round(data.query("event_value_in_usd > 0").advertising_id.nunique() /data.advertising_id.nunique() * 100, 2)

  result = (arpu, arppu, paying_share)
  return result

In [None]:
def hard_calc_metrics_free(data):

  """Вычисляет метрики более сложным и более точным способом (взвешенные метрики).
    В качестве весов используется доля дохода каждой категории игроков.
    
    Parameters:
    ----------
    data: pd.DataFrame
      Данные, вышедшие из фукнции append_add_features.
    
    Returns:
    result: tuple
      Кортеж, содержащий датафрейм total_games_usd со значениям различных
      метрик для каждой категории игроков; взвешенное значения
      метрики arppu"""
  
  data = append_add_features(transform_data_expand(data))
  
  #получим метрики по всем группам игроков (т.е. игроков с 1-ой игрой, с 2-мя играми и т.д.).
  #Метрики (все они вычисляются для каждой группы отдельно): period_revenue - суммарный доход игроков
  #за период рассмотрения (это период, за который мы выгрузили данные), revenue_rate - доля от общего доход,
  # приносимая данной группой, total_users_rate - доля, которую состаляют игроки данной группы от общего числа
  #игроков; arpu - arpu для данной группы за весь период (по всей выгрузке).
  total_games_usd = data.groupby("total_games", as_index=False)\
                              .agg({'event_value_in_usd': 'sum', 'advertising_id': 'nunique'})\
                              .rename(columns={'event_value_in_usd': 'period_revenue', 'advertising_id': 'unique_total_users'})
  total_games_usd['revenue_rate'] = round(total_games_usd.period_revenue / total_games_usd.period_revenue.sum(), 4)
  total_games_usd['total_users_rate'] = round(total_games_usd.unique_total_users / total_games_usd.unique_total_users.sum() * 100, 1)
  total_games_usd['arpu'] = round(total_games_usd.period_revenue / total_games_usd.unique_total_users, 2)

  # добавим в датафрейм total_games_usd поле unique_paying_users - число уникальных платящих игроков.
  #Данное поле будет использоваться для вычисления метрик для платящих игроков в каждой группе.
  total_games_usd = total_games_usd.merge(
                                            data.query("event_value_in_usd > 0").groupby('total_games', as_index=False)\
                                            .agg({'advertising_id': 'nunique'})\
                                            .rename(columns={'advertising_id': 'unique_paying_users'}),
                                    on='total_games')
  
  # посчитаем метрики для платящих игроков
  total_games_usd['arppu'] = round(total_games_usd.period_revenue / total_games_usd.unique_paying_users, 1)
  total_games_usd['paying_share'] = round(total_games_usd.unique_paying_users / total_games_usd.unique_total_users * 100, 1)
  
  # рассчитаем взвешенные (веса - revenue_rate) значения метрик по получившимся данным
  arpu = round((total_games_usd.arpu * total_games_usd.revenue_rate).sum(), 2)
  arppu = round((total_games_usd.arppu * total_games_usd.revenue_rate).sum(), 2)
  paying_share = round((total_games_usd.paying_share * total_games_usd.revenue_rate).sum(), 2)
  
  result = (total_games_usd, arpu, arppu, paying_share)
  return result

In [None]:
def hard_calc_metrics_premium(data):

  """Делает то же самое, что и фукнция hard_calc_metrics_free, только 
  для premium игр.
  
  Parameters:
  ----------
  data: pd.DataFrame
    Данные, вышедшие из фукнции append_add_features.
    
  Returns:
  -------
  result: tuple
    Кортеж, содержащий датафрейм paying_games_usd со значениям 
    метрик для каждой категории игроков; взвешенные значения
    метрик arpu, arppu, paying_share"""

  data_copy = data.copy(deep=True)
  data_copy = append_add_features(transform_data_expand(data_copy))
  #получим информацию чисто по платящим игрокам (т.е. берутся только платящие игроки)
  paying_games_usd = data_copy.groupby("paying_games", as_index=False)\
                              .agg({'event_value_in_usd': 'sum', 'advertising_id': 'nunique'})\
                              .rename(columns={'event_value_in_usd': 'period_revenue', 'advertising_id': 'unique_paying_users'})
  paying_games_usd['revenue_rate'] = round(paying_games_usd.period_revenue / paying_games_usd.period_revenue.sum(), 4)
  paying_games_usd['paying_users_rate'] = round(paying_games_usd.unique_paying_users / paying_games_usd.unique_paying_users.sum() * 100, 1)
  paying_games_usd['arppu'] = round(paying_games_usd.period_revenue / paying_games_usd.unique_paying_users, 2)

  # рассчитаем метрики по получившимся данным

  arppu = round((paying_games_usd.arppu * paying_games_usd.revenue_rate).sum(), 2)
  
  result = (paying_games_usd, arppu)
  return result

##Функции для автоматизации расчёта недельных и месячных метрик

In [None]:
def design_data(data, start_date, platform, games_type, type_of_period):
  """Переименовывает некоторые поля в датафреймах для free и premium игр
  и добавляет дополнительные поля. Т.е. преобразует датафрейм к виду, 
  необходимому для добавления в БД для дальнейшего построения дэшбордов.
  
  Parameters:
    data: dataframe
      Датафрейм, вышедший из одной из функций 
      hard_calc_metrics_free или hard_calc_metrics_premium.
    start_date: str или datetime.date
      Дата первого дня периода. Здесь она нужно, чтобы
      просто добавить новое поле period_start_date.
    platform: str, in ['android', 'ios']
      Название платформы.
    games_type: str, in ['free', 'premium']
    type_of_period: str, in ['week', 'month']
      Для того, чтобы добавить новое поле type_of_period
      с указанием того, какие метрики вычисляются 
      (недельные или месячные).
      
    Returns:
    data: dataframe
      Датафрейм нужного вида для загрузки в таблицу 
      idfa_weeks_months_metrics БД на сервере."""

  data['platform'] = platform
  data['games_type'] = games_type
  data['period_start_date'] = start_date
  data['type_of_period'] = type_of_period  
  if games_type == 'premium':
    data = data.rename(columns={'paying_games': 'games'})
    data['unique_total_users'] = np.nan
  elif games_type == 'free2play':
    data = data.rename(columns={'total_games': 'games'})
  data = data.loc[:, ['games', 'period_revenue', 'unique_paying_users', 'unique_total_users',
                      'games_type','platform', 'period_start_date', 'type_of_period', 'revenue_rate']]
  return data

In [None]:
def for_db_data_create(start_date, finish_date, type_of_period):

  """Выгружет и преобразует данные, рассчитывает основные недельные (месячные) метрики,
  используя advertising_id, а также приводит результаты к такому виду,
  чтобы можно было сразу загружать в таблицу idfa_weeks_months_metrics.
  Отличается от функции design_data тем, что позволяет получить данные для всех 
  платформ и типов игр.
  Parameters:
  ----------
  start_date: str or datetime.date
  finish_date: str or datetime.date
    Метрики будут рассчитаны по данным за период
    [start_date, finish_date].
  type_of_period: str, in ['week', 'month']
    Для того, чтобы добавить новое поле type_of_period
    с указанием того, какие метрики вычисляются 
    (недельные или месячные)
  
  Returns:
  -------
  result_data: dataframe
    Датафрейм с нужными полями для загрузки в БД на сервере
    в таблицу idfa_weeks_months_metrics."""

  # получим данные по android
  android_free = hard_calc_metrics_free(upload_data('android', start_date, finish_date,  games_type='free2play'))[0]
  android_premium= hard_calc_metrics_premium(upload_data('android', start_date, finish_date,  games_type='premium'))[0]
  # получим данные по ios
  ios_free = hard_calc_metrics_free(upload_data('ios', start_date, finish_date,  games_type='free2play'))[0]
  ios_premium= hard_calc_metrics_premium(upload_data('ios', start_date, finish_date,  games_type='premium'))[0]

  # перобразуем данные к виду, удобному для загрузки в базу данных
  android_free = design_data(android_free, start_date, 'android', 'free2play', type_of_period)
  android_premium = design_data(android_premium, start_date, 'android', 'premium', type_of_period)
  ios_free = design_data(ios_free, start_date, 'ios', 'free2play', type_of_period)
  ios_premium = design_data(ios_premium, start_date, 'ios', 'premium', type_of_period)
  
  result_data = pd.concat([android_free, android_premium, ios_free, ios_premium])
  #приведём данные в поле games к типу int
  result_data.games = result_data.games.astype(int)
  result_data = result_data.reset_index(drop=True)
  return result_data

In [None]:
def main_metrics(start_date, finish_date, period):
  """Основная фукнция для расчёта недельных метрик.
    Необходимо её запускать раз в неделю и передать ей нужные даты.
    Она автоматически вычислит
    нужные данные и добавит в таблицу idfa_weeks_months_metrics.
    
    Parameters:
    ----------
    start_date: str or datetime.date
    finish_date: str or datetime.date
      Данные рассчитываются за период [start_date, finish_date], 
      т.е. необходимо передать дату понедельника и воскресенья
      той недели, для которой необходимо вычислять данные.
    period: str, in ['week', 'month']
      Название периода, для которого производятся вычисления, 
      т.е. 'week' or 'month'."""

  assert period in ['week', 'month'], """Аргумент period должен быть равен 'week' или 'month'"""
  data = for_db_data_create(start_date, finish_date, period)
  data.unique_paying_users = data.unique_paying_users.astype(np.float32)
  %%time
  try:
    data.to_sql('idfa_weeks_months_metrics', engine, if_exists='append', index=False)
  except IntegrityError as error:
    print('Вы пытаетесь добавить в таблицу данные, нарушающие ограничения первичного ключа')
    print(error)


In [None]:
%%time
main_week_metrics('2021-01-25', '2021-02-01')