In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col,when
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import LongType, ArrayType, DoubleType, BooleanType

import mlflow
import mlflow.pyspark.ml
from mlflow.models.signature import infer_signature
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from sklearn.metrics import classification_report, confusion_matrix,precision_recall_fscore_support
from sklearn.metrics import accuracy_score,average_precision_score,precision_score,confusion_matrix,recall_score,roc_curve,auc,f1_score

import datetime
import re
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import sklearn 
import seaborn as sns
# import backoff

import slack
from slack import WebClient

In [0]:
jdbcUsername = "dsuser"
jdbcPassword = "Dd123dLn4rHDd7Y8"
jdbcPort = 5439
jdbcDatabase = "segment"
jdbcHostname = "dream11-segment-2.cd2mebwblp0z.us-east-1.redshift.amazonaws.com"
jdbcUrl = f"jdbc:redshift://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?user={jdbcUsername}&password={jdbcPassword}"

jdbcUrl

In [0]:
query = """
  SELECT a.cluster_id,
         a.user_id,
         date(b.reg_time) as reg_date, 
         date(b.edge_pred_timestamp) as highlight_date
  FROM DS.Dream11_FenceEdgeValidation_Clusters as a
  INNER JOIN sporta_sandbox.Dream11_fpv_multiid_all_pipeline_ads as b
  ON a.user_id = b.userid
  WHERE a.cluster_size >= 4
"""

df = (
  sqlContext
  .read 
  .format("com.databricks.spark.redshift") 
  .option("url", jdbcUrl)
  .option("forward_spark_s3_credentials",True)
  .option("tempdir", "s3a://databricks-poc-bucket-datateam/temp/payment")
  .option("query", query)
  .load()
)
df.cache()
df.createOrReplaceTempView('df')

In [0]:
display(df)

cluster_id,user_id,reg_date,highlight_date
acf82323-1373-4816-9d88-e39bc130f7cf,148364470,2021-12-07,2021-12-07
71439ae8-08e9-463c-b5a0-0ecff10ebebc,146506264,2021-11-04,2021-11-05
468f2583-fafc-43b6-8b58-13410d23da3e,146776314,2021-11-07,2021-11-08
3415b40d-184f-4eae-af6e-fff31e683ec0,150519783,2022-01-27,2022-01-28
d38264a5-9987-4de5-adfb-18cd2f81f820,149751707,2022-01-11,2022-01-12
468fd1de-ef7f-4390-870d-5ecef85d7d9a,150569578,2022-01-28,2022-01-29
c90bad90-c635-4b92-81dd-20be40394e47,150599470,2022-01-28,2022-01-29
008ccd93-9211-4cac-97b7-90e97c3cff71,150562207,2022-01-28,2022-01-29
468f2583-fafc-43b6-8b58-13410d23da3e,150583458,2022-01-28,2022-01-29
468f2583-fafc-43b6-8b58-13410d23da3e,147302890,2021-11-15,2021-11-16


In [0]:
%sql

CREATE OR REPLACE TEMP VIEW cluster_growth_regdate AS(
  SELECT a.cluster_id,
         a.reg_date,
         unix_timestamp(reg_date)/(24*3600) - b.unix_regdate as day,
         sum(reguser_cnt) OVER (PARTITION BY a.cluster_id ORDER BY a.reg_date) as cumsum_regdate,
         reguser_cnt
  FROM (
    SELECT cluster_id,
           reg_date,
           COUNT(user_id) as reguser_cnt
    FROM df
    GROUP BY 1,2
  ) as a
  INNER JOIN (
    SELECT cluster_id, unix_regdate
    FROM (
      SELECT cluster_id, unix_timestamp(reg_date)/(24*3600) as unix_regdate, row_number() OVER (PARTITION BY cluster_id ORDER BY reg_date) as rn 
      FROM df
    )
    WHERE rn = 1
  ) as b
  ON a.cluster_id = b.cluster_id
  ORDER BY 1,2,3
)

In [0]:
%sql

CREATE OR REPLACE TEMP VIEW cluster_growth_highlightdate AS(
  SELECT a.cluster_id,
         a.highlight_date,
         unix_timestamp(highlight_date)/(24*3600) - b.unix_highlightdate as day,
         sum(highlightuser_cnt) OVER (PARTITION BY a.cluster_id ORDER BY a.highlight_date) as cumsum_highlightdate,
         highlightuser_cnt
  FROM (
    SELECT cluster_id,
           highlight_date,
           COUNT(user_id) as highlightuser_cnt
    FROM df
    GROUP BY 1,2
  ) as a
  INNER JOIN (
    SELECT cluster_id, unix_highlightdate
    FROM (
      SELECT cluster_id, unix_timestamp(highlight_date)/(24*3600) as unix_highlightdate, row_number() OVER (PARTITION BY cluster_id ORDER BY highlight_date) as rn 
      FROM df
    )
    WHERE rn = 1
  ) as b
  ON a.cluster_id = b.cluster_id
  ORDER BY 4 desc
)

In [0]:
cluster_growth_highlightdate = spark.read.table('cluster_growth_highlightdate')
cluster_growth_regdate = spark.read.table('cluster_growth_regdate')

In [0]:
display(cluster_growth_highlightdate)

cluster_id,highlight_date,day,cumsum_highlightdate,highlightuser_cnt
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-15,124.0,351979,4772
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-14,123.0,347207,5442
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-13,122.0,341765,4224
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-12,121.0,337541,3361
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-11,120.0,334180,3080
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-10,119.0,331100,2666
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-09,118.0,328434,1772
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-08,117.0,326662,1966
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-07,116.0,324696,1185
468f2583-fafc-43b6-8b58-13410d23da3e,2022-02-06,115.0,323511,1280


In [0]:
%sql

SELECT day,cumsum_highlightdate
FROM cluster_growth_highlightdate
WHERE cluster_id = '468f2583-fafc-43b6-8b58-13410d23da3e'
order by day

day,cumsum_highlightdate
0.0,2948
1.0,6781
2.0,13104
3.0,18519
4.0,23168
5.0,29128
6.0,34307
7.0,46116
8.0,55663
9.0,64119


In [0]:
#  Rolling window growth rate