In [None]:
%pip install --upgrade snowflake-snowpark-python

In [1]:
from __future__ import annotations
from typing import Any
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.window as W
import snowflake.snowpark.types as T
import pandas as pd
import numpy as np
import altair as alt
import json
import random
import math
import uuid
pd.set_option('display.max_columns', 500)
USER = "" #for snowflake
PASS = "" #for snowflake
snowflake_credentials = {
  "account" : "KGA92485",
  "user" : USER,
  "password" : PASS,
  "role" : "SYSADMIN",
  "database" : "DATA_SCIENCE",
  "schema" : "PUBLIC",
  "warehouse" : "DATASCIENCE_XLARGE_WH"
}
session = Session.builder.configs(snowflake_credentials).create()



In [None]:
# pre proccessing

In [17]:
def BinEncoder(df: snowpark.DataFrame ,inputCol: str ,outputCol: str,valueOrder : list=None):
    types = None
    if valueOrder == None:
        types = df.groupBy(F.col(inputCol)).agg(F.row_number().over(W.Window.orderBy(F.col(inputCol).asc_nulls_first())).alias('r'))
    else:
        types = session.createDataFrame(list(enumerate(valueOrder)),['r',inputCol])
    df = df.join(types,inputCol).withColumn(outputCol,F.col('r'))
    df = df.drop(F.col('r'))
    return df,outputCol

def OneHotEncoder(df: snowpark.DataFrame ,inputCol: str ,outputCol: str):
    types = df.select(F.col(inputCol)).distinct().toPandas().values[:,0].tolist()
    outputColType = []
    for t in types:
        outputColType.append(outputCol+'_'+str(t))
        df = df.withColumn(outputColType[-1],F.when(F.col(inputCol) == F.lit(t),1).otherwise(0)) 
    return df,outputColType

def MinMaxScaler(df: snowpark.DataFrame ,inputCol: str ,outputCol: str):
    df = df.withColumn(outputCol,F.when((F.max(F.col(inputCol).cast('float')).over()-F.min(F.col(inputCol).cast('float')).over() > F.lit(0)),(F.col(inputCol)-F.min(F.col(inputCol)).over()).cast('float')/(F.max(F.col(inputCol).cast('float')).over()-F.min(F.col(inputCol).cast('float')).over())).otherwise(F.lit(0)))
    return df,outputCol

In [30]:
df = session.sql('SELECT * FROM DATA_SCIENCE.EXPERIMENT.EXTENDED_ATTRIBUTES_CONSOLIDATED').limit(100000).cache_result()

In [31]:
#avg nulls
df = df.withColumn('AGE',F.coalesce(F.col('AGE'),F.mean(F.col('AGE')).over()))
df = df.withColumn('LENGTH_OF_RESIDENCE',F.coalesce(F.col('LENGTH_OF_RESIDENCE'),F.mean(F.col('LENGTH_OF_RESIDENCE')).over()))
df =df.cache_result()

In [32]:
columns_onehot_catagories = ['EDUCATION_LEVEL','HOUSEHOLD_COMPOSITION','OCCUPATION','DWELLING_TYPE','HOME_OWNERSHIP','MARITAL_STATUS']
columns_encode_catagories = ['AGE','LENGTH_OF_RESIDENCE','ESTIMATED_HOUSEHOLD_INCOME','HOME_MKT_VAL','NUMBER_OF_CHILDREN','CHILD_PRESENT','CHILD_UNDER_6_PRESENT','CHILD_6_10_PRESENT','CHILD_11_15_PRESENT','CHILD_16_17_PRESENT','EDUCATION_LEVEL','HOME_POOL','SENIOR_ADULT_IN_HH','SINGLE_PARENT','SPANISH_SPEAKING','USES_CREDIT_CARD','HOME_OFFICE']+list(filter(lambda c: "_INTEREST" in c,df.columns))
encode_dict = {
    'ESTIMATED_HOUSEHOLD_INCOME' : ['<$20,000','$50,000-$99,999','UNKNOWN','$100,000-249,999','$250,000+'],
    'HOME_MKT_VAL' : ['$1K - $24,999','$25K - $49,999','$50K - $74,999','$75K - $99,999','$100K - $124,999','$125K - $149,999','$150K - $174,999','$175K - $199,999','$200K - $224,999','UNKNOWN','$225K - $249,999','$250K - $274,999','$275K - $299,999','$300K - $349,999','$350K - $399,999','$400K - $449,999','$450K - $499,999','$500K - $749,999','$750K - $999,999','$1M+'],
    'NUMBER_OF_CHILDREN' : ['0','1-3','UNKNOWN','3-5','6+'],
    'CHILD_PRESENT' : ['NO','UNKNOWN','YES'],
    'CHILD_UNDER_6_PRESENT' : ['UNKNOWN','YES'],
    'CHILD_6_10_PRESENT' : ['UNKNOWN','YES'],
    'CHILD_11_15_PRESENT' : ['UNKNOWN','YES'],
    'CHILD_16_17_PRESENT' : ['UNKNOWN','YES'],
    'EDUCATION_LEVEL' : ['HIGH SCHOOL','VOCATIONAL/TECHNICAL SCHOOL','UNKNOWN','COLLEGE','GRADUATE SCHOOL'], # maybe onehot
    'HOME_POOL' : ['UNKNOWN','YES'],
    'SENIOR_ADULT_IN_HH' : ['UNKNOWN','YES'],
    'SINGLE_PARENT' : ['NO','UNKNOWN','YES'],
    'SPANISH_SPEAKING' : ['NO','UNKNOWN','YES'],
    'USES_CREDIT_CARD' : ['UNKNOWN','YES'],
    'HOME_OFFICE' : ['UNKNOWN','YES'],
}
columns_to_scale = []

In [33]:
cols = ['UNIVERSAL_ID']
for c in columns_to_scale:
    print(c,':scale begin')
    #df2 = df2.withColumn(c,F.when(F.col(c).is_not_null(),F.col(c)).otherwise(F.mean(F.col(c)).over()))
    df = df.withColumn(c,F.when(F.col(c).is_not_null(),F.col(c)).otherwise(0))
    df,col = MinMaxScaler(df,c,c+'_scale')
    df = df.cache_result()
    cols.append(col)
    print(c,':scale end')
for c in columns_encode_catagories:
    print(c,':encode begin')
    df,col = BinEncoder(df,c,c+'_encode',valueOrder=encode_dict.get(c))
    df,col = MinMaxScaler(df,col,col)
    df = df.cache_result()
    cols.append(col)
    print(c,':encode end')
for c in columns_onehot_catagories:
    print(c,':cat begin')
    df,col = OneHotEncoder(df,c,c+'_onehot')
    df = df.cache_result()
    cols.extend(col)
    print(c,':cat end')
df = df[cols].cache_result()

AGE :encode begin
AGE :encode end
LENGTH_OF_RESIDENCE :encode begin
LENGTH_OF_RESIDENCE :encode end
ESTIMATED_HOUSEHOLD_INCOME :encode begin
ESTIMATED_HOUSEHOLD_INCOME :encode end
HOME_MKT_VAL :encode begin
HOME_MKT_VAL :encode end
NUMBER_OF_CHILDREN :encode begin
NUMBER_OF_CHILDREN :encode end
CHILD_PRESENT :encode begin
CHILD_PRESENT :encode end
CHILD_UNDER_6_PRESENT :encode begin
CHILD_UNDER_6_PRESENT :encode end
CHILD_6_10_PRESENT :encode begin
CHILD_6_10_PRESENT :encode end
CHILD_11_15_PRESENT :encode begin
CHILD_11_15_PRESENT :encode end
CHILD_16_17_PRESENT :encode begin
CHILD_16_17_PRESENT :encode end
EDUCATION_LEVEL :encode begin
EDUCATION_LEVEL :encode end
HOME_POOL :encode begin
HOME_POOL :encode end
SENIOR_ADULT_IN_HH :encode begin
SENIOR_ADULT_IN_HH :encode end
SINGLE_PARENT :encode begin
SINGLE_PARENT :encode end
SPANISH_SPEAKING :encode begin
SPANISH_SPEAKING :encode end
USES_CREDIT_CARD :encode begin
USES_CREDIT_CARD :encode end
HOME_OFFICE :encode begin
HOME_OFFICE :enc

In [34]:
df.write.mode("overwrite").save_as_table('EXTENDED_ATTRIBUTES_CONSOLIDATED_SAMPLE')

Bad pipe message: %s [b'\x94\x8b|\xca\x15\xf0\x99\x98s\x8cw\xf8p_l\xdd\x0f\x10\x00\x01p\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\t\x00\n\x00\x0b\x00\x0c\x00\r\x00\x0e\x00\x0f\x00\x10\x00\x11\x00\x12\x00\x13\x00\x14\x00\x15\x00\x16\x00\x17\x00\x18\x00\x19\x00\x1a\x00\x1b\x00/\x000\x001\x002\x003\x004\x005\x006\x007\x008\x009\x00:\x00;\x00<\x00=\x00>\x00?\x00@\x00g\x00h\x00i\x00j\x00k\x00l\x00m\x00A\x00B\x00C\x00D\x00E\x00F\x00\x84\x00\x85\x00\x86\x00\x87\x00\x88\x00\x89\x00\xba\x00\xbb\x00\xbc\x00\xbd\x00\xbe\x00\xbf\x00\xc0\x00\xc1\x00\xc2\x00\xc3\x00\xc4\x00\xc5\x00\x9c\x00\x9d\x00\x9e\x00\x9f\x00\xa0\x00\xa1\x00\xa2\x00\xa3\x00\xa4\x00\xa5\x00\xa6\x00\xa7\xc0\x01\xc0\x02\xc0\x03\xc0\x04\xc0\x05\xc0\x06\xc0\x07\xc0\x08\xc0\t\xc0\n\xc0\x0b\xc0\x0c\xc0\r\xc0\x0e\xc0\x0f\xc0\x10\xc0\x11\xc0\x12']
Bad pipe message: %s [b'W\xdf\xe6\xb2c>*\xac4\x0b\xa3\x00\x1e\x01\xbb\xdf-\xe0\x00\x01p\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\t\x00\n\x00

In [None]:
# models

In [40]:
def join(s1: str, s2: str):
    if s1[-1] == '"':
        return s1[:-1]+s2+'"'
    else:
        return s1+s2
    
class Kmeans(object):
    def __init__(self, k: int,*args):
        super(Kmeans, self).__init__(*args)
        self.k = k
        self.cluster_id = 'CLUSTER_ID'
        self.cluster = session.createDataFrame(data=[ v for v in range(self.k)],schema=T.StructType([T.StructField(self.cluster_id, T.IntegerType())]))
        self.prediction = None
        self.run_id = None
    def initializeCluster(self,df: snowpark.DataFrame,inputCols: str) -> snowpark.DataFrame:
        return df.crossJoin(self.cluster).groupBy(F.col(self.cluster_id)) \
            .agg([F.min(F.col(c)).alias(c+'_min') for c in inputCols]+[F.max(F.col(c)).alias(c+'_max') for c in inputCols]) \
            .select([self.cluster_id]+[(F.uniform(F.lit(0.0),F.lit(1.0), F.random())*(F.col(c+'_max')-F.col(c+'_min'))+F.col(c+'_min')).alias(c) for c in inputCols])
    def initializeCluster2(self,df: snowpark.DataFrame, inputCols: str):
        return df.sample(n=self.k).select([F.col(c) for c in inputCols]+[F.row_number().over(W.Window.orderBy(F.lit(None))).alias(self.cluster_id)])
    def distance(self,df: snowpark.DataFrame,id: str,inputCols: str) -> snowpark.DataFrame:
        return df.crossJoin(self.cluster,lsuffix='_L',rsuffix='_R') \
            .select(F.col(id) if id != self.cluster_id else F.col(join(id,'_L')),F.col(self.cluster_id) if id != self.cluster_id else F.col(join(self.cluster_id,'_R')),F.sqrt(sum([F.pow(F.col(join(c,'_L'))-F.col(join(c,'_R')),F.lit(2)) for c in inputCols])).alias('distance'),F.row_number().over(W.Window.partitionBy(F.col(id) if id != self.cluster_id else F.col(join(id,'_L'))).orderBy(F.col('distance').asc())).alias('r'))\
            .select(F.col(id) if id != self.cluster_id else F.col(join(id,'_L')),F.col(self.cluster_id) if id != self.cluster_id else F.col(join(self.cluster_id,'_R')),F.col('distance'),F.col('r'))
    def fit(self,df: snowpark.DataFrame,id: str,inputCols: str,maxIter=None,initialize=True) -> Kmeans:
        if initialize:
            self.cluster = self.initializeCluster2(df,inputCols).cache_result()
        prediction = self.transform(df,id,inputCols).cache_result()
        iteration = 0
        self.run_id = str(uuid.uuid4())
        while (self.prediction == None or (self.prediction.except_(prediction).count() > 0)) and (maxIter==None or iteration < maxIter):
            print('run_id:',self.run_id,',iteration:',iteration) 
            self.prediction = prediction
            print(self.prediction.groupBy(F.col(self.cluster_id)).agg(F.count('*')).orderBy(F.col(self.cluster_id)).toPandas())
            self.cluster = df.join(self.prediction,[id]).groupBy(F.col(self.cluster_id)).agg([ F.mean(c).alias(c) for c in inputCols]) \
                .union(self.cluster.where(F.col(self.cluster_id).isin(self.prediction.select(F.col(self.cluster_id))) == F.lit(False))).cache_result()
            prediction = self.transform(df,id,inputCols).cache_result()
            iteration += 1
            self.cluster.select(F.col('*'),F.lit(iteration).alias('t'),F.lit(self.run_id).alias('run_id')).write.mode('append').save_as_table('kmeans_t2')
        return self
    
    def transform(self,df: snowpark.DataFrame,id: str,inputCols: str) ->snowpark.DataFrame:
        return self.distance(df,id,inputCols).where(F.col('r') == F.lit(1)).select(F.col(id),F.col(self.cluster_id))
    
    
class SOM(object):
    # https://www.osti.gov/servlets/purl/1566795
    def __init__(self,k : int=None,h : int=None,w : int=None,sigma : float=1.0 ,lr : float=1.0 ,*args):
        super(SOM, self).__init__(*args)
        if w is None and h is None:
            self.h = k
            self.w = k
            self.k = k**2
        else:
            self.h = h
            self.w = w
        self.lr = lr
        self.sigma = sigma
        self.cluster_id = 'CLUSTER_ID'
        self.cluster = session.createDataFrame(data=[ v for v in range(self.k)],schema=T.StructType([T.StructField(self.cluster_id, T.IntegerType())]))
        self.neighbor = self.cluster.crossJoin(self.cluster,lsuffix='_L',rsuffix='_R')\
            .select(F.col(join(self.cluster_id,'_L')).alias('BMI_ID'),F.col(join(self.cluster_id,'_R')).alias(self.cluster_id),(F.pow(F.col(join(self.cluster_id,'_L'))%F.lit(self.w)-F.col(join(self.cluster_id,'_R'))%F.lit(self.w),F.lit(2))+F.pow(F.floor(F.col(join(self.cluster_id,'_L'))/F.lit(self.h))-F.floor(F.col(join(self.cluster_id,'_R'))/F.lit(self.h)),F.lit(2))).alias('distance')).cache_result()
        self.run_id = None
    def initializeCluster(self,df: snowpark.DataFrame, inputCols: str) -> snowpark.DataFrame:
        return df.crossJoin(self.cluster).groupBy(F.col(self.cluster_id)) \
            .agg([F.min(F.col(c)).alias(c+'_min') for c in inputCols]+[F.max(F.col(c)).alias(c+'_max') for c in inputCols]) \
            .select([self.cluster_id]+[(F.uniform(F.lit(0.0),F.lit(1.0), F.random())*(F.col(c+'_max')-F.col(c+'_min'))+F.col(c+'_min')).alias(c) for c in inputCols])
    def initializeCluster2(self,df: snowpark.DataFrame, inputCols: str):
        return df.sample(n=self.k).select([F.col(c) for c in inputCols]+[(F.row_number().over(W.Window.orderBy(F.lit(None)))-F.lit(1)).alias(self.cluster_id)])
    def distance(self,df: snowpark.DataFrame,id: str,inputCols: str) -> snowpark.DataFrame:
        return df.crossJoin(self.cluster,lsuffix='_L',rsuffix='_R') \
            .select(F.col(id) if id != self.cluster_id else F.col(join(id,'_L')),F.col(self.cluster_id) if id != self.cluster_id else F.col(join(self.cluster_id,'_R')),F.sqrt(sum([F.pow(F.col(join(c,'_L'))-F.col(join(c,'_R')),F.lit(2)) for c in inputCols])).alias('distance'),F.row_number().over(W.Window.partitionBy(F.col(id) if id != self.cluster_id else F.col(join(id,'_L'))).orderBy(F.col('distance').asc())).alias('r'))\
            .select(F.col(id) if id != self.cluster_id else F.col(join(id,'_L')),F.col(self.cluster_id) if id != self.cluster_id else F.col(join(self.cluster_id,'_R')),F.col('distance'),F.col('r'))
    def getNeighborHood(self,inputCols: str, sigma: float) -> snowpark.DataFrame:
        return self.cluster.crossJoin(self.cluster,lsuffix='_L',rsuffix='_R')\
            .select([F.col(join(self.cluster_id,'_L')).alias('BMI_ID')]+[F.col(join(c,'_R')).alias(c) for c in self.cluster.columns]+[F.sqrt(sum([F.pow(F.col(join(c,'_L'))-F.col(join(c,'_R')),F.lit(2)) for c in inputCols])).alias('distance'),F.when(F.lit(sigma)>F.lit(0.0),F.exp(-F.pow(F.col('distance'),2)/(F.lit(2)*F.pow(sigma,2)))).otherwise(F.when(F.col('distance')==F.lit(0.0),F.lit(1.0)).otherwise(F.lit(0.0))).alias('influence_rate')])
                    
    def getNeighborHood2(self,inputCols: str, sigma: float) -> snowpark.DataFrame:
        return self.cluster.join(self.neighbor,[self.cluster_id],lsuffix='_L',rsuffix='_R')\
            .select([F.col('BMI_ID')]+[F.col(self.cluster_id).alias(self.cluster_id)]+[F.col(c) for c in self.cluster.columns if c != self.cluster_id]+[F.when(F.lit(sigma)>F.lit(0.0),F.exp(-F.col('distance')
                                                                                                                                                                                             /(F.lit(2)*F.pow(sigma,2)))).otherwise(F.when(F.col('distance')==F.lit(0.0),F.lit(1.0)).otherwise(F.lit(0.0))).alias('influence_rate')])
    def convertToCoord(self,df: snowpark.DataFrame):
            return self.cluster.select(F.col(self.cluster_id),(F.col(self.cluster_id)%F.lit(self.w)).alias('x'),F.floor(F.col(self.cluster_id)/F.lit(self.h)).alias('y')).join(df,self.cluster_id)
    def fit(self,df: snowpark.DataFrame,id: str,inputCols: str,maxIter=5, batchSize=None,initialize=True) -> SOM:
        if initialize:
            self.cluster = self.initializeCluster2(df,inputCols).cache_result()
        iteration = 0
        lr=self.lr
        sigma=self.sigma
        maxIter = maxIter if batchSize==None else maxIter*(df.count()/batchSize)
        time_constant = maxIter
        self.run_id = str(uuid.uuid4())
        while iteration < maxIter:
            print('run_id:',self.run_id,',iteration:',iteration,',lr:',lr,',sigma:',sigma) 
            sample = df if batchSize==None else df.sample(n=batchSize).cache_result()
            bmu = self.distance(sample,id,inputCols).where(F.col('r') == F.lit(1)).drop(F.col('r')).rename(self.cluster_id,'BMI_ID').cache_result()
            #print(bmu.groupBy(F.col('BMI_ID')).agg(F.count('*')).orderBy(F.col('BMI_ID').toPandas())
            neighborhood = self.getNeighborHood2(inputCols,sigma).cache_result()
            self.cluster = sample.join(bmu,id).join(neighborhood,on='BMI_ID',lsuffix='_L',rsuffix='_R').select([F.col(self.cluster_id)]+[(F.col(join(c,'_R'))+F.lit(lr)*(F.col('influence_rate')*(F.col(join(c,'_L'))-F.col(join(c,'_R'))))).alias(c) for c in inputCols]) \
                .groupBy(F.col(self.cluster_id)).agg([F.mean(c).alias(c) for c in inputCols]).cache_result()
                # .union(self.cluster.where(F.col(self.cluster_id).isin(neighborhood.select(F.col(self.cluster_id))) == F.lit(False))) is not needed as the 'influence_rate' is zero where the neighbors are out of range 
            iteration += 1
            lr=self.lr*math.exp(-iteration/time_constant)
            #sigma=self.sigma*math.exp(-iteration/time_constant)
            coefficient = 1.0 - (float(iteration)/time_constant)
            #lr=self.lr*coefficient
            sigma=self.sigma*coefficient
            self.cluster.select(F.col('*'),F.lit(iteration).alias('t'),F.lit(self.run_id).alias('run_id')).write.mode('append').save_as_table('som_t3')
        return self
    
    def transform(self,df: snowpark.DataFrame,id: str,inputCols: str):
        return self.distance(df,id,inputCols).where(F.col('r') == F.lit(1)).select(F.col(id),F.col(self.cluster_id))

In [36]:
def silhouette(model,df:snowpark.DataFrame,id: str,inputCols: str)-> snowpark.DataFrame:
    '''
    this is the simplify version that O(NK) instead of O(N^2)
    https://arrow.tudublin.ie/cgi/viewcontent.cgi?article=1214&context=scschcomcon
    '''
    data = model.distance(df,id,inputCols).cache_result()
    a = data.where(F.col('r') == 1)#.groupBy(F.col(model.cluster_id)).agg(F.mean(F.col('distance')).alias('distance')).cache_result()
    b = data.where(F.col('r') == 2)#.join(data.where(F.col('r') != 1),on=[id],lsuffix='_L',rsuffix='_R').groupBy(F.col(model.join(cluster_id,'_L')),F.col(model.cluster_id+'_R')).agg(F.mean(F.col('distance'+'_R')).alias('distance')).groupBy(F.col(model.join(cluster_id,'_L'))).agg(F.min(F.col('distance')).alias('distance')).cache_result()#.select(F.col(model.join(cluster_id,'_L')).alias(model.cluster_id),F.col('distance')).cache_result()
    return a,b,float(b.join(a,[id],lsuffix='_L',rsuffix='_R').select(((F.col('distance'+'_L') - F.col('distance'+'_R')) / F.when(F.col('distance'+'_L') >= F.col('distance'+'_R'),F.col('distance'+'_L')).otherwise(F.col('distance'+'_R'))).alias('distance')).agg(F.mean(F.col('distance')).alias('distance')).toPandas().values[:,0])
    #a,b,v = silhouette(model,df,'UNIVERSAL_ID',[c for c in df.columns if c != 'UNIVERSAL_ID'])
#print(v)

In [6]:
#df = session.read.table('pa_norm').cache_result()

In [37]:
df = session.read.table('EXTENDED_ATTRIBUTES_CONSOLIDATED_SAMPLE').cache_result()



In [None]:
ss =session.sql('select 1').select(F.lit(json.dumps({'model_name':'kmeans'})).alias('param'))
print(ss.toPandas())
ss.select(F.json_extract_path_text(F.col('param'),F.lit('model_name'))).toPandas()

                      PARAM
0  {"model_name": "kmeans"}


Unnamed: 0,"JSON_EXTRACT_PATH_TEXT(""PARAM"", 'MODEL_NAME')"
0,kmeans


In [42]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def generate_random_configuration(param):
    p = {}
    for k,v in param.items():
        for k2,v2 in v.items():
            if k2 == 'uniform':
                p[k] = random.uniform(v2[0],v2[1])
            elif k2 == 'uniform_int':
                p[k] = random.randint(v2[0],v2[1])
            elif k2 == 'choice':
                p[k] = random.choice(v2)
            elif k2 == 'choice_uniform':
                p[k] = [random.randint(v3[0],v3[1]) for v3 in v2]
            elif k2 == 'choice_map':
                p.update(generate_random_configuration(random.choice(v2)))
            elif k2 == 'contant':
                p[k] = v2
            else:
                print("key: %s is not supported",k2)
    return p
#async def train(k,func_param,name):
def train(k,func_param,name):
    start = time.perf_counter()    
    func = func_param[name]
    m,p = func['algo'],generate_random_configuration(func['param'])
    p['model_name'] = name
    p['k'] = k
    print('parameters:',json.dumps(p))
    score = None
    try:
        m = Kmeans(k=p['k']) if p['model_name'] == 'kmeans' else SOM(k=p['k'],sigma=p['sigma'],lr=p['lr'])
        m=m.fit(sample,id_,inputCols,maxIter=p.get('maxIter'))
        _,_,score = silhouette(m,sample,id_,inputCols)
        m.cluster.select(F.lit(m.run_id).alias('run_id'),F.col('*')).write.mode("append").save_as_table('cluster')
        end = time.perf_counter() - start
        session.sql('select 4').select(F.lit(m.run_id).alias('run_id'),F.lit(score).alias('score'),F.lit(json.dumps(p)).alias('param'),F.lit(end).alias('time'),F.col('*')).write.mode("append").save_as_table('cluster_meta')
        print('parameters:',json.dumps(p),', score:',score)
    except Exception as e:
        print(e)
    return score
sample = df#.sample(n=5).cache_result()
id_ = 'UNIVERSAL_ID'
inputCols = [c for c in sample.columns if c != id_]
task_list =[]
for k in range(3,6+1,1):
    func_param = {
        #'kmeans':{'algo':Kmeans,'param':{}},
        'som':{'algo':SOM,'param':{'maxIter':{'uniform_int':(10, 200)}, 'sigma':{'uniform':(0, 0.75)}, 'lr':{'uniform':(1, 1)}}}
    }
    for _ in range(16):
        for name in func_param:
            #task_list.append(asyncio.create_task(train(k,func_param,name))) 
            task_list.append((k,func_param,name))
#asyncio.gather(*task_list)  
with ThreadPoolExecutor(8) as  executor:
    list(executor.map(lambda args, f=train: f(*args), task_list))

parameters:parameters: {"maxIter": 20, "sigma": 0.5362774121128991, "lr": 1.0, "model_name": "som", "k": 3}
 {"maxIter": 35, "sigma": 0.3687990383542052, "lr": 1.0, "model_name": "som", "k": 3}
parameters: {"maxIter": 11, "sigma": 0.7378873131980508, "lr": 1.0, "model_name": "som", "k": 3}
parameters:parameters: {"maxIter": 88, "sigma": 0.6069037637886936, "lr": 1.0, "model_name": "som", "k": 3}
parameters: {"maxIter": 194, "sigma": 0.7468035718565713, "lr": 1.0, "model_name": "som", "k": 3}
 {"maxIter": 94, "sigma": 0.34503086459205123, "lr": 1.0, "model_name": "som", "k": 3}
parameters: {"maxIter": 133, "sigma": 0.47222566973910984, "lr": 1.0, "model_name": "som", "k": 3}
parameters: {"maxIter": 159, "sigma": 0.11153315277532339, "lr": 1.0, "model_name": "som", "k": 3}




run_id: 9c6d2c42-b904-4ee0-bc9f-59bfb75116e5 ,iteration: 0 ,lr: 1.0 ,sigma: 0.34503086459205123
run_id: f6948c80-6c36-4024-af7f-79284620be33 ,iteration: 0 ,lr: 1.0 ,sigma: 0.5362774121128991
run_id: ea4b337f-46b5-4bb8-a0d9-d4d839f6e8ca ,iteration: 0 ,lr: 1.0 ,sigma: 0.3687990383542052
run_id: 01adc8df-985e-4529-a170-c2cb6adce833 ,iteration: 0 ,lr: 1.0 ,sigma: 0.7378873131980508
run_id: 0154b04c-5b8e-4bb9-b824-a8443d1f2cc4 ,iteration: 0 ,lr: 1.0 ,sigma: 0.11153315277532339
run_id: dc27cc9f-596c-4f90-934c-74c69fa63468 ,iteration: 0 ,lr: 1.0 ,sigma: 0.6069037637886936
run_id: 81569afe-99b0-4da6-9920-32ac3ab3a3e1 ,iteration: 0 ,lr: 1.0 ,sigma: 0.47222566973910984
run_id: 1a5229da-2c65-472b-8664-421f3db91d27 ,iteration: 0 ,lr: 1.0 ,sigma: 0.7468035718565713
run_id: 9c6d2c42-b904-4ee0-bc9f-59bfb75116e5 ,iteration: 1 ,lr: 0.9894180886889878 ,sigma: 0.34136032347936984
run_id: 01adc8df-985e-4529-a170-c2cb6adce833 ,iteration: 1 ,lr: 0.9131007162822623 ,sigma: 0.6708066483618643
run_id: f6948c80

In [None]:
from hyperopt import fmin, tpe, STATUS_OK,STATUS_FAIL
from hyperopt import hp
func_param = hp.choice('model',[
        #{'model_name':'kmeans','algo':SOM,'param':{'k':hp.uniformint('w',2, 6)}},
        {'model_name':'som','algo':SOM,'param':{'w':hp.uniformint('w',2, 6),'h':hp.uniformint('h',2, 6),'maxIter':hp.uniformint('maxIter',10, 200), 'sigma':hp.uniform('sigma',0, 0.75), 'lr':hp.uniform('lr',0.5, 1)}}
])
def objective(x):
    return {'loss': -1*train(x), 'status': STATUS_OK }
def train2(func_param):
    start = time.perf_counter()    
    func = func_param['model']
    m,p= func['algo'],func['param']
    
    print('parameters:',json.dumps(p))
    score = None
    try:
        m = Kmeans(k=p['k']) if p['model_name'] == 'kmeans' else SOM(k=p.get('k'),w=p.get('w'),h=p.get('h'),sigma=p['sigma'],lr=p['lr'])
        m=m.fit(sample,id_,inputCols,maxIter=p.get('maxIter'))
        _,_,score = silhouette(m,sample,id_,inputCols)
        m.cluster.select(F.lit(m.run_id).alias('run_id'),F.col('*')).write.mode("append").save_as_table('cluster')
        end = time.perf_counter() - start
        session.sql('select 5').select(F.lit(m.run_id).alias('run_id'),F.lit(score).alias('score'),F.lit(json.dumps(p)).alias('param'),F.lit(end).alias('time'),F.col('*')).write.mode("append").save_as_table('cluster_meta')
        print('parameters:',json.dumps(p),', score:',score)
    except Exception as e:
        print(e)
    return score
fmin(objective,
    space=func_param,
    algo=tpe.suggest,
    max_evals=50)



In [None]:
data_cluster = session.read.table('cluster').cache_result()
data_meta = session.read.table('cluster_meta').cache_result()

In [31]:
def getModel(meta,df,run_id):
    p = json.loads(meta.where(F.col('run_id') == F.lit(run_id)).select(F.col('PARAM')).toPandas().values[0,0])
    m = Kmeans(k=p['k']) if p['model_name'] == 'kmeans' else SOM(k=p['k'],sigma=p['sigma'],lr=p['lr'])
    m.cluster = df.where(F.col('run_id') == F.lit(run_id)).select([F.col(c) for c in df.columns if c not in ('RUN_ID')]).cache_result()
    return m
model = getModel(data_meta,data_cluster,'07090b28-3b47-4910-ac86-613c9137ff4f')

In [9]:
id_ = 'UNIVERSAL_ID'
inputCols = [c for c in df.columns if c != id_]

In [24]:
#temp = kmeans.cluster
temp_model = model
temp = temp_model.cluster
temp = temp.select(['CLUSTER_ID']+[F.col(c).cast('float').alias(c) for c in temp.columns if c != 'CLUSTER_ID'])
sample = df.sample(n=1000).cache_result()
prediction = temp_model.transform(df,id_,inputCols).cache_result()
t= prediction.join(sample,[id_]).cache_result()


In [33]:
line = alt.Chart(temp.unpivot(value_column='value',name_column='key',column_list=[c for c in temp.columns if c != 'CLUSTER_ID']).toPandas()).mark_line().encode(
    x='KEY:N',
    y='VALUE:Q',
    color='CLUSTER_ID:N',
    opacity=alt.value(0.5)
)
line

In [69]:
def plot_2d(df:snowpark.DataFrame,id:str,fit=None,title=''):
    from sklearn.decomposition import PCA
    import pandas as pd
    pca = PCA(2)
    df = df.toPandas().sort_values(by=id, ascending=True)
    ids = df[id].values
    if fit==None:
        fit = df
    else:
        fit = fit.toPandas()#.sort_values(by=id, ascending=True)
    model = pca.fit(X=fit,y=id)
    t = model.transform(X=df.drop(id,axis=1))
    #t= pca.fit_transform(X=df,y=id)
    t = pd.DataFrame({'pa_x':t[:,0],'pa_y':t[:,1],id:ids})
    return alt.Chart(t,title=title).mark_point().encode(
        x='pa_x:Q',
        y='pa_y:Q',
        color=id+':N'
    )
plot_2d(temp.select(['CLUSTER_ID']+[c for c in temp.columns if c != 'CLUSTER_ID']),'CLUSTER_ID',fit=sample.drop(id_),title='title')


In [68]:
#t = kmeans.prediction.limit(1000).join(df,['PROFILE_ID'])
plot_2d(t.select(['CLUSTER_ID']+[c for c in temp.columns if c != 'CLUSTER_ID']),'CLUSTER_ID',fit=sample.drop(id_),title='title')



In [65]:
score,p

(0.25624808309886526, '{"model_name": "som", "sigma": 0.5, "lr": 1, "k": 3}')

In [74]:
run_ids = data_stat.select(F.col('RUN_ID')).distinct().toPandas().values[:,0]
sample = df.limit(1000).cache_result()
plots = []
for run_id in run_ids:
    score,p = data_stat.where(F.col('RUN_ID') == F.lit(run_id)).select(F.col('SCORE'),F.col('PARAM')).toPandas().values[0]
    title = '''{0} {1} {2}'''.format(run_id,p,score)
    temp_model = getModel(data_stat,run_id)
    prediction = temp_model.transform(df,id_,inputCols).join(sample,[id_]).cache_result()
    plots.append(plot_2d(prediction.select(['CLUSTER_ID']+[c for c in temp_model.cluster.columns if c != 'CLUSTER_ID']),'CLUSTER_ID',fit=sample.drop(id_),title=title))



In [8]:
from functools import reduce
reduce(lambda x,y: x|y,plots)

NameError: name 'plots' is not defined