<a id='index'></a>

# Index

## Cofiguration

[1. AWS Credentials](#aws_credentials)

[2. Clustering Functions](#clustering-funcs)



## Cluster models

[1. Behavioural](#behave)



In [None]:
%%local
!pip install ipython-autotime
!pip install pyathena
%load_ext autotime


In [None]:
%%cleanup -f

In [None]:
#import pandas as pd
#pd.options.display.max_columns = None

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import lit


In [None]:
%%info

## Input Ophan AWS Credentials below

<a id='aws_credentials'></a>

[Back to index](#index)

In [None]:
#get your ophan credentials and paste in below

access_key = ''
secret_key = ''
token = ''

In [None]:
%%local

!aws s3 ls s3://ophan-temp-schema/data/dusvyat/project_heart/ --profile ophan

## Spark ML K Means clustering

<a id='clustering-funcs'></a>

[Back to index](#index)


### Functions for clustering
[Back to index](#index)

In [None]:
def load_df(access_key,secret_key,token,fname):
    
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key) 
    spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", token)

    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")

    spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    
    df=sqlContext.read.option("delimiter", ",") \
.orc("s3a://ophan-temp-schema/data/dusvyat/project_heart/{}_{}/".format(fname,date))
    
    return df


def prep_data_spark(df,cols):

    df = df.filter(df.total_sessions > 1) \
    .na.fill('0').fillna(0) \
    .select(cols)
    df.show()

    return df 

def filter_data(df):

    mask = df.iloc[:,1:-1].apply(lambda x : x>x.quantile(.99)).any(axis=1)
    df_filtered = df.loc[~mask]

    return df_filtered

def prep_scale_data(df,cols,scaler=MinMaxScaler(inputCol="features", 
                                              outputCol="scaledFeatures")):

    df = prep_data_spark(df,cols).rdd.map(lambda x:(Vectors.dense(x[1:]), x[0])) \
    .toDF(["features", "browser_id"])

    scaled_df = scaler.fit(df) \
    .transform(df) \
    .drop('features') \
    .withColumnRenamed('scaledFeatures','features')

    return scaled_df

def find_optimum_nclusters(df,min_max_k):

# Trains a k-means model.

    wssse={}

    K = range(min_max_k[0],min_max_k[1])

    for i in K:

        kmeans = KMeans().setK(i+1).setSeed(1)
        model = kmeans.fit(df)

        # Evaluate clustering by computing Within Set Sum of Squared Errors.
        wssse[i] = model.computeCost(df)
        print("Within Set Sum of Squared Errors = " + str(wssse))

        # Shows the result.
        centers = model.clusterCenters()
        print("Cluster Centers: ")
        for center in centers:
            print(center)

    return K,wssse


def kmeans_train(df,n_clusters):

    kmeanModel = KMeans().setK(n_clusters).setSeed(1)
    model = kmeanModel.fit(df)

    return model

def extract_kmeans(row):

    return (row.browser_id, ) + tuple(row.features.toArray().tolist()) + (row.prediction, )

def kmeans_pred(df,model,cols):

    preds = model.transform(df)

    predictionsdf = preds.rdd.map(extract_kmeans).toDF([col for col in cols]+['cluster'])

    print("counts per cluster: ")
    predictionsdf.groupBy("cluster").count().show()

    return predictionsdf

def joined_result(df,df1,df2,n_clusters,cols):

    df1 = df.join(df1, df1.browser_id == df.browser_id).select(df["*"],df1["cluster"])

    df2 = df2.filter(df2.total_sessions == 1) \
    .na.fill('0').fillna(0) \
    .select(cols).withColumn("cluster", lit(n_clusters)).union(df1) 

    return df1,df2

### Behavioural characteristics (total_attention, stdev_attention, vdays & section coverage):

<a id='behave'></a>

[Back to index](#index)




In [None]:
#specify date locally and on cluster

date='20190701_20190731'


In [None]:
%%local

date='20190701_20190731'

In [None]:
#load data to df

df = load_df(access_key,secret_key,token,'behav_train')
df.count()
df.show()

In [None]:
#produce scaled training set



In [None]:
#Train K Means with optimum k




In [None]:
#save k means


In [None]:
#Predict K Means




In [None]:
#join with sessions with less than one and also to get unscaled result




In [None]:
#write output to s3