In [8]:
# Time imports
import time as t
from datetime import date, datetime, timedelta

In [2]:
from pyspark.sql.functions import countDistinct, approxCountDistinct

In [3]:
# Load Avro file
df = spark.read.format("com.databricks.spark.avro").load("active_users.avro")

In [4]:
###### DAU ######
def compute_DAU(dataframe, day_date):
    """
    Function that compute the cadinality of active users
    """
    # Get today's date and yesterday's date 
    upper_bound_date = ( day_date + timedelta(days=1)).strftime('%d/%m/%Y')
    lower_bound_date = day_date.strftime('%d/%m/%Y')
    
    # Convert date to timestamp in miliseconds
    upper_bound_timestamp = long(t.mktime(t.strptime(upper_bound_date, "%d/%m/%Y"))*1000)
    lower_bound_timestamp = long(t.mktime(t.strptime(lower_bound_date, "%d/%m/%Y"))*1000)
    
    # Filter yesterday's active users
    filtered = dataframe.filter(dataframe.timestamp.between(lower_bound_timestamp, upper_bound_timestamp) )
    
    return filtered.agg(countDistinct(filtered._id)).first()[0]
    

In [5]:
# Example
day = datetime(2017, 1, 15)
compute_DAU(df,day)

738574

In [9]:
###### MAU ######
def compute_MAU(dataframe):
    """
    Function that compute approximately the cardinality of active users in the last 30 days
    """
    # Get today's date and yesterday's date 
    upper_bound_date = (date.today()).strftime('%d/%m/%Y')
    lower_bound_date = (date.today() - timedelta(days=30)).strftime('%d/%m/%Y')
    
    # Convert date to timestamp in miliseconds
    upper_bound_timestamp = long(t.mktime(t.strptime(upper_bound_date, "%d/%m/%Y"))*1000)
    lower_bound_timestamp = long(t.mktime(t.strptime(lower_bound_date, "%d/%m/%Y"))*1000)
    
    # Filter yesterday's active users
    filtered = dataframe.filter(dataframe.timestamp.between(lower_bound_timestamp, upper_bound_timestamp) )
    
    return filtered.agg(approxCountDistinct(filtered._id)).first()[0]

In [10]:
compute_MAU(df)

731769