In [799]:
#%load_ext lab_black

In [800]:
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas_gbq
import pandas as pd
import numpy as np
import os
import warnings
import datetime

pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')

credentials = service_account.Credentials.from_service_account_file(
    "/Users/miguelcouto/Downloads/zattoo-dataeng-e5f45785174f.json"
)

project_id = "zattoo-dataeng"
client = bigquery.Client(credentials=credentials, project=project_id)

In [801]:
## prepare dataframe
df = pd.read_csv('/Users/miguelcouto/PycharmProjects/pypayment_v2/raw/amazon/sales-2022-03-00000.csv')

In [802]:
## I've manually renamed zattoo_amazon_firetv_hiq_german_1mo_90day_freetrial to zattoo_amazon_firetv_hiq_german_90days_freetrial - is that future proof?
df['Vendor SKU'].replace(
    {'zattoo_amazon_firetv_hiq_german_1mo_90day_freetrial': 'zattoo_amazon_firetv_hiq_german_90days_freetrial'},
    inplace=True)

In [803]:
reporting_df = df[
    ['Transaction ID', 'Transaction Time', 'Transaction Type', 'Country/Region Code', 'Vendor SKU',
     'In-App Subscription Term',
     'In-App Subscription Status (Trial / Paid)', 'Sales Price (Marketplace Currency)',
     'Estimated Earnings (Marketplace Currency)',
     'Units']]

reporting_df.rename({'Transaction ID': 'vendor_trx_id',
                     'Transaction Time': 'transaction_time',
                     'Transaction Type': 'transaction_type',
                     'Country/Region Code': 'country_code',
                     'Vendor SKU': 'vendor_sku',
                     'In-App Subscription Term': 'subscription_term',
                     'In-App Subscription Status (Trial / Paid)': 'subscription_status',
                     'Sales Price (Marketplace Currency)': 'sales_price',
                     'Estimated Earnings (Marketplace Currency)': 'earnings',
                     'Units': 'units',
                     },
                    axis=1, inplace=True)

reporting_df['subscription_status'] = reporting_df['subscription_status'].str.lower()
reporting_df['transaction_type'] = reporting_df['transaction_type'].str.lower()

In [804]:
reporting_df = reporting_df[pd.isna(reporting_df['subscription_status']) == False]

In [805]:
reporting_df['country_name'] = reporting_df['country_code'].replace(
    'DE', 'Germany').replace('CH', 'Switzerland').replace('AT', 'Austria')

reporting_df['country_name'].replace(
    'Switzerland', 'Germany', inplace=True)

In [806]:
reporting_df['transaction_date'] = pd.to_datetime(reporting_df['transaction_time'].str[:-4], format='%Y-%m-%d %H:%M:%S')
reporting_df.drop('transaction_time', axis=1, inplace=True)

In [807]:
reporting_df['reporting_month'] = reporting_df['transaction_date'].dt.to_period('M').dt.strftime('%Y-%m')
reporting_month = reporting_df.reporting_month.mode()
reporting_df['reporting_month'] = reporting_df['reporting_month'].apply(lambda x: reporting_month)

## for amazon we report with 1month delay, therefore reporting month should be month + 1
reporting_df['reporting_month'] = pd.to_datetime(reporting_df['reporting_month'])
reporting_df['reporting_month'] = (reporting_df['reporting_month'] + pd.offsets.MonthBegin(1)).dt.strftime('%Y-%m')
reporting_month = reporting_df['reporting_month'].unique()[0]

In [808]:
reporting_df['payment_method'] = 'amazon'

In [809]:
reporting_df['currency'] = reporting_df['country_name'].apply(lambda x: 'CHF' if x == 'Switzerland' else 'EUR')

In [810]:
payment_amazon = """
select distinct
       pav.termsku,
       pav.term,
       pav.productid
from b2c_middleware_import.payment_amazontransaction_view pav
"""

df_skus = pandas_gbq.read_gbq(payment_amazon, project_id=project_id, progress_bar_type=None)

In [811]:
reporting_df = reporting_df[
    ['vendor_trx_id', 'country_name', 'country_code', 'vendor_sku', 'subscription_status', 'transaction_type',
     'sales_price',
     'earnings', 'units',
     'transaction_date',
     'reporting_month',
     'payment_method', 'currency']].merge(df_skus[['termsku', 'productid', 'term']], how='left',
                                          left_on='vendor_sku', right_on='termsku')

reporting_df.rename({'productid': 'sku'}, axis=1, inplace=True)

reporting_df['sku'][reporting_df['sku'].isnull()] = reporting_df['vendor_sku'][reporting_df['sku'].isnull()]

# reporting_df.drop(['vendor_sku', 'termsku'], axis=1, inplace=True)
reporting_df.drop(['termsku'], axis=1, inplace=True)

In [812]:
## some SKUs like do not have 'term' data, therefore it needs to be manually written
reporting_df.term.fillna('1 Month', inplace=True)

In [813]:
skus_list = str(set(reporting_df['sku'].to_list()))

skus_expand = f"""select distinct rlv.SKU as sku,
                rlv.product_class,
                rlv.detailed_product_class,
                rlv.product_length
from b2c_middleware.reporting_layer_view rlv
where true
  and rlv.SKU in ({skus_list[1:-1]})
  and rlv.product_class is not null
  and rlv.detailed_product_class is not null
  ;"""

df_skus_expand = pandas_gbq.read_gbq(skus_expand, project_id=project_id, progress_bar_type=None)

reporting_df = reporting_df.merge(df_skus_expand, how='left',
                                  left_on='sku', right_on='sku')

In [814]:
reporting_df['avg_price_sales_per_sub'] = reporting_df['sales_price'] / reporting_df['units']

In [815]:
reporting_df['product_length_months'] = reporting_df['term'].replace(
    {'1 Month': 1, '2 Months': 2, '3 Months': 3, '6 Months': 6, '12 Months': 12, '1 Year': 12})

In [816]:
temp_vat_df = reporting_df[['vendor_trx_id', 'country_code', 'sales_price', 'transaction_date', 'currency']]
temp_vat_df['sales_price_cents'] = temp_vat_df['sales_price'] * 100

## define BQ table schema
bq_schema_vat = [
    {"name": "vendor_trx_id", "type": "INTEGER"},
    {"name": "country_code", "type": "STRING"},
    {"name": "sales_price_cents", "type": "INTEGER"},
    {"name": "transaction_date", "type": "TIMESTAMP"},
    {"name": "currency", "type": "STRING"},
]

## export to BQ table
pandas_gbq.to_gbq(
    dataframe=temp_vat_df,
    destination_table="temp.vat_amazon_pypayment_v2",
    project_id="zattoo-dataeng",
    if_exists="replace",
    progress_bar=None,
    table_schema=bq_schema_vat,
)

In [817]:
vat_expand = """
select vendor_trx_id,
       transaction_date,
--        udf.vat_chf(pe.from_currency,
--                    vat.rate,
--                    pe.exchange_rate,
--                    pe.from_currency_quantity,
--                    py2.sales_price_cents
--            )
--                             AS vat_CHF,
--        udf.vat_eur(pe.from_currency,
--                    vat.rate,
--                    pe.exchange_rate,
--                    pe_eur.from_currency_quantity,
--                    pe_eur.exchange_rate,
--                    py2.sales_price_cents
--            )
--                             AS vat_EUR,
       vat.rate / 100       AS vat_percentage,
       pe_eur.exchange_rate AS exchange_rate_eur_to_chf
FROM temp.vat_amazon_pypayment_v2 py2
         LEFT JOIN b2c_middleware_import.payment_exchangerate pe
                   ON pe.from_currency = py2.currency
                       AND DATE_TRUNC(pe.day, MONTH) =
                           DATE_TRUNC(DATE(py2.transaction_date), MONTH)
         LEFT JOIN b2c_middleware_import.payment_exchangerate pe_eur
                   ON pe_eur.from_currency = 'EUR'
                       AND DATE_TRUNC(pe_eur.day, MONTH) =
                           DATE_TRUNC(DATE(py2.transaction_date), MONTH)
         LEFT JOIN b2c_middleware_import.payment_vat_view vat
                   ON vat.country = py2.country_code
                       AND vat.created_at_date = DATE(py2.transaction_date)
where true
  AND pe.day >= DATE('2008-08-01')
  AND pe_eur.day >= DATE('2008-08-01')
  AND vat.created_at_date >= DATE('2008-08-01')
;
"""

df_vat_expand = pandas_gbq.read_gbq(vat_expand, project_id=project_id, progress_bar_type=None)

In [818]:
reporting_df = reporting_df.merge(
    df_vat_expand[['transaction_date', 'vendor_trx_id', 'vat_percentage', 'exchange_rate_eur_to_chf']], how='left',
    left_on=['vendor_trx_id', pd.to_datetime(reporting_df['transaction_date'], utc=True)],
    right_on=['vendor_trx_id',
              pd.to_datetime(df_vat_expand['transaction_date'], utc=True)]).drop(
    ['transaction_date_y', 'key_1'], axis=1)

In [819]:
reporting_df.rename({'transaction_date_x': 'transaction_date',
                     },
                    axis=1, inplace=True)

In [820]:
## We don't book VAT for Amazon, bc they do it. If we do include VAT though, that could lead to issues. Keep the columns, but set all VAT values to zero (vat_percent, vat_eur, vat_chf)
reporting_df[['vat_percentage', 'vat_eur', 'vat_chf']] = 0

In [821]:
## Amazon's "Sales price" is OUR "Charge"
reporting_df['charge_eur'] = reporting_df['sales_price']

reporting_df['store_fees'] = 15
reporting_df['domestic_abroad'] = 'domestic'

reporting_df['sales_price_eur'] = round(reporting_df['charge_eur'] + (
        reporting_df['charge_eur'] * (reporting_df['vat_percentage'] / 100)), 2)
reporting_df['fee_eur'] = reporting_df['sales_price'] - abs(reporting_df['earnings'])

# reporting_df['vat_eur'] = reporting_df['charge_eur'] * (reporting_df['vat_percentage'] / 100)

reporting_df['net_booking_eur'] = reporting_df['earnings']

reporting_df['payout_eur'] = reporting_df['sales_price_eur'] - reporting_df['fee_eur']
# reporting_df['payout_eur'] = reporting_df['net_booking_eur']

In [822]:
reporting_df['detailed_product_class'] = reporting_df["detailed_product_class"].fillna(reporting_df["product_class"])
reporting_df['detailed_product_class'].replace({'base_hiq': 'premium',
                                                'base_ultimate': 'ultimate'}, inplace=True)

reporting_df.drop('product_class', inplace=True, axis=1)
reporting_df.rename({'detailed_product_class': 'product_class'
                     },
                    axis=1, inplace=True)

In [823]:
reporting_df.replace(np.inf, 0, inplace=True)
reporting_df.replace(-np.inf, 0, inplace=True)

In [824]:
reporting_df_negs = reporting_df[reporting_df['transaction_type'] != 'charge']

reporting_df.drop(reporting_df[reporting_df['transaction_type'] != 'charge'].index, inplace=True)

In [825]:
cols = ['sales_price_eur', 'sales_price', 'units', 'avg_price_sales_per_sub', 'charge_eur', 'fee_eur', 'vat_eur',
        'payout_eur']
reporting_df_negs[cols] = - reporting_df_negs[cols]

In [826]:
reporting_df = pd.concat([reporting_df, reporting_df_negs])

In [827]:
reporting_df['product_length'] = pd.np.where(
    reporting_df['vendor_sku'].str.contains("1mo"), 31, pd.np.where(reporting_df[
                                                                        'vendor_sku'] == "zattoo_amazon_firetv_hiq_german_freetrial_2mo",
                                                                    31,
                                                                    pd.np.where(
                                                                        reporting_df['vendor_sku'].str.contains("_2mo"),
                                                                        62,
                                                                        pd.np.where(
                                                                            reporting_df['vendor_sku'].str.contains(
                                                                                "3mo"),
                                                                            90, pd.np.where(
                                                                                reporting_df['vendor_sku'].str.contains(
                                                                                    "12mo"), 365,
                                                                                pd.np.where(reporting_df[
                                                                                                'vendor_sku'] == "zattoo_amazon_firetv_hiq_german_90days_freetrial",
                                                                                            31,
                                                                                            reporting_df[
                                                                                                'product_length']))))))

In [828]:
## adding artificially created term_end_date based on initial transaction_date
reporting_df['term_end'] = reporting_df['transaction_date'] + reporting_df['product_length'].astype('timedelta64[D]')

In [829]:
## calculate product_term_length_months
reporting_df["product_term_length_months"] = (
        (reporting_df["term_end"].dt.year - reporting_df["transaction_date"].dt.year) * 12
        + (reporting_df["term_end"].dt.month - reporting_df["transaction_date"].dt.month)
        + 1
)

In [830]:
reporting_df["product_term_length_months"][reporting_df["product_term_length_months"] < 0] = 0

In [831]:
## replacing product_term_length_months for exceptions where value is 2 instead of 1
shorter_subs = (reporting_df['term_end'] - reporting_df['transaction_date']).dt.days <= 30
shorter_subs_replacer = reporting_df[shorter_subs][
    (reporting_df["product_term_length_months"] == 2) & ((reporting_df['term_end']).dt.day == 1)].index.to_list()
reporting_df["product_term_length_months"].loc[shorter_subs_replacer] = 1

In [832]:
reporting_df = reporting_df.loc[reporting_df.index.repeat(reporting_df['product_term_length_months'])].reset_index(
    drop=True)
reporting_df['revenue_month_number'] = 1
reporting_df['revenue_month_number'] = reporting_df.groupby(["vendor_trx_id", 'transaction_type'])[
    'revenue_month_number'].cumsum()

In [833]:
## adding max_month_date to tackle the specifications of active_sub_month_end
reporting_df['max_month_date'] = reporting_df.groupby([reporting_df['term_end'].dt.to_period('M'), 'transaction_type'])['term_end'].transform('max')
reporting_df['max_month_date'] = reporting_df['max_month_date'].dt.normalize() + pd.Timedelta('23:59:59')

In [834]:
## get last indices of each transaction_id group
last_idxs_charges = (
        len(reporting_df[reporting_df['transaction_type'] == 'charge'])
        - np.unique(
    reporting_df['vendor_trx_id'][reporting_df['transaction_type'] == 'charge'].values[::-1],
    return_index=1,
)[1]
        - 1
)

## add revenue_month_date
reporting_df["revenue_month_date"] = reporting_df["transaction_date"].to_numpy().astype("datetime64[M]")
reporting_df["revenue_month_date"][reporting_df["transaction_type"] == "refund"] = (
    reporting_df["transaction_date"].to_numpy().astype("datetime64[M]")
)

reporting_df["revenue_month_date"] = reporting_df.apply(
    lambda x: x["revenue_month_date"]
              + pd.offsets.MonthEnd(x["revenue_month_number"])
              + pd.offsets.MonthBegin(-1),
    axis=1,
)

reporting_df["product_term_length"] = reporting_df["revenue_month_date"].apply(
    lambda t: pd.Period(t, freq="S").days_in_month
)

reporting_df["product_term_length"][reporting_df["transaction_date"] > reporting_df["revenue_month_date"]] = (
                                                                                                                     reporting_df[
                                                                                                                         "transaction_date"].dt.daysinmonth -
                                                                                                                     reporting_df[
                                                                                                                         "transaction_date"].dt.day
                                                                                                             ) + 1

# ## fix last position of product_term_length per transaction_id for charges
reporting_df["product_term_length"].iloc[last_idxs_charges] = (
        reporting_df["term_end"].iloc[last_idxs_charges] -
        reporting_df["revenue_month_date"].iloc[last_idxs_charges]
).dt.days

reporting_df = reporting_df[reporting_df['product_term_length'] > 0]

last_idxs = (
        len(reporting_df)
        - np.unique(
    reporting_df['vendor_trx_id'].values[::-1],
    return_index=1,
)[1]
        - 1
)

reporting_df["product_term_length"].iloc[last_idxs] = (
        reporting_df["term_end"].iloc[last_idxs] -
        reporting_df["revenue_month_date"].iloc[last_idxs]
).dt.days

In [835]:
## set active_sub_month_end = 1 by default
reporting_df["active_sub_month_end"] = 1

reporting_df["active_sub_month_end"].iloc[last_idxs] = 0

reporting_df["active_sub_month_end"][
    (reporting_df["revenue_month_number"] == reporting_df['product_term_length_months']) & (reporting_df["term_end"] < reporting_df['max_month_date'])
    ] = 0

## mark all one transaction subscriptions as 1
reporting_df["active_sub_month_end"][
    (reporting_df["revenue_month_number"] <= 1) & (reporting_df["product_term_length_months"] <= 1)
    ] = 1

## mark all refund transactions as -1
reporting_df["active_sub_month_end"][reporting_df["transaction_type"] != "charge"] = -1

In [836]:
## reseting index
reporting_df.reset_index(drop=True, inplace=True)

In [837]:
## active_sub_content follows the same logic as active_sub_month_end except it doesn't count the last month
reporting_df["active_sub_content"] = reporting_df.active_sub_month_end

reporting_df.loc[reporting_df.groupby(["vendor_trx_id", 'transaction_type'])['active_sub_content'].tail(
    1).index, 'active_sub_content'] = 0

## total_days of product_term_length per transaction_id
reporting_df["total_days"] = reporting_df.groupby(["vendor_trx_id", 'transaction_type'])[
    "product_term_length"].transform("sum")

In [838]:
reporting_df[['vendor_trx_id', 'transaction_date','transaction_type', 'term_end', 'active_sub_month_end', 'active_sub_content']][reporting_df['vendor_trx_id'] == 84052165761142]

Unnamed: 0,vendor_trx_id,transaction_date,transaction_type,term_end,active_sub_month_end,active_sub_content
17505,84052165761142,2022-03-19 21:10:23,charge,2022-06-17 21:10:23,1,1
17506,84052165761142,2022-03-19 21:10:23,charge,2022-06-17 21:10:23,1,1
17507,84052165761142,2022-03-19 21:10:23,charge,2022-06-17 21:10:23,1,1
17508,84052165761142,2022-03-19 21:10:23,charge,2022-06-17 21:10:23,0,0
45720,84052165761142,2022-03-25 02:23:36,chargeback,2022-06-23 02:23:36,-1,-1
45721,84052165761142,2022-03-25 02:23:36,chargeback,2022-06-23 02:23:36,-1,-1
45722,84052165761142,2022-03-25 02:23:36,chargeback,2022-06-23 02:23:36,-1,-1
45723,84052165761142,2022-03-25 02:23:36,chargeback,2022-06-23 02:23:36,-1,0


In [839]:
reporting_df['product_term_length_months'] = reporting_df.groupby(["vendor_trx_id", 'transaction_type'])[
    'revenue_month_number'].transform('max')

In [840]:
## calculate total_revenue_net fields
reporting_df["total_revenue_net_eur"] = (
        reporting_df["net_booking_eur"] / reporting_df["total_days"] * reporting_df["product_term_length"]
)

In [841]:
## remove VAT values from all lines of group except first
one_line_cols = ["vat_eur", 'payout_eur', 'sales_price_eur', 'store_fees', 'charge_eur', 'vat_percentage',
            'avg_price_sales_per_sub', 'sales_price', 'units']

for col in one_line_cols:
    reporting_df.loc[
        reporting_df["revenue_month_number"] > 1,
        [col],
    ] = 0.0

In [842]:
## calculating CHF columns
reporting_df['charge_chf'] = reporting_df['charge_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df['sales_price_chf'] = reporting_df['sales_price_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df['fee_chf'] = reporting_df['fee_eur'] * reporting_df['exchange_rate_eur_to_chf']
# reporting_df['vat_chf'] = reporting_df['vat_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df['net_booking_chf'] = reporting_df['net_booking_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df['payout_chf'] = reporting_df['payout_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df['payout_chf'] = reporting_df['payout_eur'] * reporting_df['exchange_rate_eur_to_chf']
reporting_df["total_revenue_net_chf"] = (
        reporting_df["net_booking_chf"] / reporting_df["total_days"] * reporting_df["product_term_length"]
)

In [843]:
## reorder dataframe
reporting_df = reporting_df[
    [
        'vendor_trx_id',
        'transaction_date',
        'term_end',
        'reporting_month',
        'country_name',
        # 'country_code',
        'currency',
        # 'vendor_sku',
        'sku',
        'subscription_status',
        'transaction_type',
        'payment_method',
        'product_class',
        # 'term',
        'product_length',
        'product_length_months',
        'product_term_length',
        # 'product_term_length_months',
        'domestic_abroad',
        'vat_percentage',
        'exchange_rate_eur_to_chf',
        'store_fees',
        'units',
        # 'sales_price',
        # 'earnings',
        # 'avg_price_sales_per_sub',
        'charge_eur',
        'sales_price_eur',
        'fee_eur',
        'vat_eur',
        'net_booking_eur',
        'payout_eur',
        'total_revenue_net_eur',
        'charge_chf',
        'sales_price_chf',
        'fee_chf',
        'vat_chf',
        'net_booking_chf',
        'payout_chf',
        'total_revenue_net_chf',
        'revenue_month_number',
        'revenue_month_date',
        # 'total_days',
        'active_sub_month_end',
        'active_sub_content'
    ]
]

## define BQ table schema
bq_schema = [
    {"name": "vendor_trx_id", "type": "STRING"},
    {"name": "transaction_date", "type": "TIMESTAMP"},
    {"name": "term_end", "type": "TIMESTAMP"},
    {"name": "reporting_month", "type": "STRING"},
    {"name": "country_name", "type": "STRING"},
    {"name": "currency", "type": "STRING"},
    {"name": "sku", "type": "STRING"},
    {"name": "subscription_status", "type": "STRING"},
    {"name": "transaction_type", "type": "STRING"},
    {"name": "payment_method", "type": "STRING"},
    {"name": "product_class", "type": "STRING"},
    {"name": "product_length", "type": "INTEGER"},
    {"name": "product_length_months", "type": "INTEGER"},
    {"name": "product_term_length", "type": "INTEGER"},
    {"name": "product_term_length", "type": "INTEGER"},
    {"name": "domestic_abroad", "type": "STRING"},
    {"name": "vat_percentage", "type": "FLOAT"},
    {"name": "exchange_rate_eur_to_chf", "type": "FLOAT"},
    {"name": "store_fees", "type": "INTEGER"},
    {"name": "units", "type": "INTEGER"},
    {"name": "charge_eur", "type": "FLOAT"},
    {"name": "sales_price_eur", "type": "FLOAT"},
    {"name": "fee_eur", "type": "FLOAT"},
    {"name": "vat_eur", "type": "FLOAT"},
    {"name": "net_booking_eur", "type": "FLOAT"},
    {"name": "payout_eur", "type": "FLOAT"},
    {"name": "total_revenue_net_eur", "type": "FLOAT"},
    {"name": "charge_chf", "type": "FLOAT"},
    {"name": "sales_price_chf", "type": "FLOAT"},
    {"name": "fee_chf", "type": "FLOAT"},
    {"name": "vat_chf", "type": "FLOAT"},
    {"name": "net_booking_chf", "type": "FLOAT"},
    {"name": "payout_chf", "type": "FLOAT"},
    {"name": "total_revenue_net_chf", "type": "FLOAT"},
    {"name": "revenue_month_number", "type": "INTEGER"},
    {"name": "revenue_month_date", "type": "TIMESTAMP"},
    {"name": "active_sub_month_end", "type": "INTEGER"},
    {"name": "active_sub_content", "type": "INTEGER"}
]

In [860]:
## export to csv
path = r"/Users/miguelcouto/Desktop/"

reporting_df.to_csv(os.path.join(path, fr'subs_reporting_amazon{reporting_month.replace("-", "_")}.csv'))

In [857]:
## export to BQ table
pandas_gbq.to_gbq(
    dataframe=reporting_df,
    destination_table=f"temp.subs_reporting_amazon{reporting_month.replace('-', '')}",
    project_id="zattoo-dataeng",
    if_exists="replace",
    progress_bar=None,
    table_schema=bq_schema,
)