In [1]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(spark.sparkContext, 'be63ac50-4f9d-43f3-8f26-ad6572f65430', 'p-7bda336230f76ada26590bc6c4038045546d3287')
pc = project.project_context

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20210531015657-0019
KERNEL_ID = 33a9c3f3-c474-4035-902f-70619ee5f2c5


In [2]:
import pyspark

from scipy.spatial.distance import cdist
from pyspark.sql.window import Window 
import numpy as np
import pandas as pd
pd.options.display.max_columns = 999

from pyspark.sql.functions import *
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame


from sklearn.cluster import KMeans
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

import matplotlib.pyplot as plt
import seaborn as sns

from functools import reduce

from botocore.client import Config
import ibm_boto3

from datetime import datetime
from datetime import datetime, timedelta

import sys
import shutil
import types

In [3]:
def normalize(x):
    return [(x[n] - np.min(x)) / (np.max(x) - 
           np.min(x)) for n in range(len(x))]

In [4]:
credentials_1 = {
    'IAM_SERVICE_ID': 'iam-ServiceId-8c64e377-89e3-424a-b706-72ea9ab5fd59',
    'IBM_API_KEY_ID': 'vNrUHeW9-R1HdOM2ld5PDpR96H0VdqryNzcmyVlNFne_',
    'ENDPOINT': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'IBM_AUTH_ENDPOINT': 'https://iam.cloud.ibm.com/oidc/token',
    'BUCKET': 'projetoviagrupo1-donotdelete-pr-q8ymlbqhihqlvv',
    'VEN' : 'compilado_vendas.csv',
    'CAR' : 'carrinho.csv'
}

cgsClient = ibm_boto3.client(service_name='s3',
    ibm_api_key_id = credentials_1['IBM_API_KEY_ID'],
    ibm_auth_endpoint="https://iam.ng.bluemix.net/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

In [5]:
path = '/home/spark/shared/tmp/'
try:
    os.makedirs(path)
except OSError:
    shutil.rmtree(path)
    print ("Deletado diretório: %s" % path)
    print ("Diretório criado com sucesso: %s" % path)
    os.makedirs(path)
else:
    print ("Diretório criado com sucesso: %s" % path)

Deletado diretório: /home/spark/shared/tmp/
Diretório criado com sucesso: /home/spark/shared/tmp/


In [6]:
def upload_file_cos(cos, local_file_name, credentials,key):  
    
    try:
        res=cos.upload_file(Filename=local_file_name+key, Bucket=credentials['BUCKET'],Key=key)
    except Exception as e:
        print(Exception, e)
    else:
        print(key+' - File Uploaded -'+ (datetime.now() - timedelta(hours=3)).strftime("%d/%m/%Y %H:%M:%S"))

In [7]:
def download_file_cos(cos, local_file_name, credential): 
    
    for i in credential:
        if list(credential).index(i) > 4:

            '''
            Wrapper function to download a file from cloud object storage using the
            credential dict provided and loading it into memory
            '''
            try:
                res=cos.download_file(Bucket=credential['BUCKET'], Key=credential[i], Filename=local_file_name +credential[i])
            except Exception as e:
                print(credential[i] + ' - Exception', e)
            else:
                print(credential[i] + ' - File Downloaded')
    
    print("Done. -"+(datetime.now() - timedelta(hours=3)).strftime("%d/%m/%Y %H:%M:%S"))

In [8]:
download_file_cos(cgsClient, path, credentials_1)

compilado_vendas.csv - File Downloaded
carrinho.csv - File Downloaded
Done. -30/05/2021 22:57:09


In [9]:
spark.read.option("header",'true').csv("/home/spark/shared/tmp/compilado_vendas.csv").createOrReplaceTempView("compilado_vendas")
spark.read.option("header",'true').csv("/home/spark/shared/tmp/carrinho.csv").createOrReplaceTempView("carrinho")

In [10]:
def unionAll(*dfs):
  df = reduce(DataFrame.unionAll, dfs)
  return df

In [11]:
spark.sql(''' 
select 
ano,
mes,
idlojista,
idbandeira,
sum(valor) as valor,
sum(frete) as frete,
sum(valor_total) as valor_total,
sum(itens) as itens,
sum(ENVIOS) as ENVIOS,
sum(atrasos) as atrasos,
sum(media_atraso)/sum(atrasos) as media_atraso,
sum(itens_entregues) as itens_entregues,
sum(itens_cancelados) as itens_cancelados,
sum(itens_fluxo) as itens_fluxo

from compilado_vendas 
group by 1,2,3,4
''').createOrReplaceTempView('sellers')

#### Clusterização IDBANDEIRA 7

In [12]:
df_principal  = spark.sql(''' select * from sellers where idbandeira = 7 and valor is not null ''')

In [13]:
df = df_principal.toPandas()

In [14]:
df_kmeans = pd.DataFrame(data={'itens': normalize(pd.to_numeric(df.itens, errors='coerce')), 'valor': np.log1p(pd.to_numeric(df.valor, errors='coerce'))})

In [15]:
kmeans = KMeans(n_clusters=3, random_state=0).fit(df_kmeans)
df['CLUSTER_GMV']=kmeans.labels_

In [16]:
df_7 = spark.createDataFrame(df)

#### Clusterização IDBANDEIRA 49

In [17]:
df_principal  = spark.sql(''' select * from sellers where idbandeira = 49 and valor is not null ''')

In [18]:
df = df_principal.toPandas()

In [19]:
df_kmeans = pd.DataFrame(data={'itens': normalize(pd.to_numeric(df.itens, errors='coerce')), 'valor': np.log1p(pd.to_numeric(df.valor, errors='coerce'))})

In [20]:
kmeans = KMeans(n_clusters=3, random_state=0).fit(df_kmeans)
df['CLUSTER_GMV']=kmeans.labels_

In [21]:
df_49 = spark.createDataFrame(df)

#### Clusterização IDBANDEIRA 343

In [22]:
df_principal  = spark.sql(''' select * from sellers where idbandeira = 343 and valor is not null ''')

In [23]:
df = df_principal.toPandas()

In [24]:
df_kmeans = pd.DataFrame(data={'itens': normalize(pd.to_numeric(df.itens, errors='coerce')), 'valor': np.log1p(pd.to_numeric(df.valor, errors='coerce'))})

In [25]:
kmeans = KMeans(n_clusters=3, random_state=0).fit(df_kmeans)
df['CLUSTER_GMV']=kmeans.labels_

In [26]:
df_343 = spark.createDataFrame(df)

#### Juntando DF's

In [27]:
df_principal = unionAll(*[df_7, df_49, df_343])
df_principal.createOrReplaceTempView('Clusters')

In [28]:
cluster = spark.sql('''select 
c.*,
T1.GMV as NEW_CLUSTER_GMV

from Clusters c
inner join
(select
row_number() OVER(
  PARTITION BY IDBANDEIRA
  order by max_valor ASC
) AS GMV,
*
from
(select
CLUSTER_GMV,
max(Valor) as max_valor,
IDBANDEIRA
from Clusters
group by 1,3)) T1
on c.CLUSTER_GMV = T1.CLUSTER_GMV and c.IDBANDEIRA = T1.IDBANDEIRA
where c.idbandeira = 7 and c.CLUSTER_GMV = 0''')\
.withColumn("CLUSTER_GMV",col("NEW_CLUSTER_GMV"))\
.drop("NEW_CLUSTER_GMV")

In [29]:
cluster.limit(10).toPandas()

Unnamed: 0,ano,mes,idlojista,idbandeira,valor,frete,valor_total,itens,ENVIOS,atrasos,media_atraso,itens_entregues,itens_cancelados,itens_fluxo,CLUSTER_GMV
0,2021,3,17396,7,9402.8,33.9,9436.7,7.0,7.0,0.0,,1.0,6.0,0.0,3
1,2020,5,13860,7,10399.2,215.1,10614.3,8.0,8.0,0.0,,1.0,7.0,0.0,3
2,2020,5,31312,7,29130.54,83.41,29213.95,39.0,39.0,1.0,10.0,27.0,12.0,0.0,3
3,2020,5,17071,7,30013.21,3133.56,33146.77,69.0,69.0,10.0,13.05,49.0,19.0,1.0,3
4,2020,6,16528,7,97142.0,12019.0,109161.0,62.0,62.0,5.0,21.4,38.0,24.0,0.0,3
5,2021,4,14551,7,186900.56,10582.98,197483.54,447.0,447.0,16.0,2.375,313.0,126.0,8.0,3
6,2020,1,14612,7,62477.45,15223.83,77701.28,188.0,188.0,15.0,6.6,121.0,66.0,1.0,3
7,2019,7,16189,7,8185.0,527.68,8712.68,5.0,5.0,2.0,9.0,2.0,3.0,0.0,3
8,2020,6,10150,7,27781.82,5224.44,33006.26,63.0,63.0,13.0,17.923077,42.0,21.0,0.0,3
9,2020,2,14551,7,97732.7,7965.72,105698.42,215.0,215.0,8.0,41.375,108.0,57.0,50.0,3


In [30]:
cluster.toPandas().to_csv('/home/spark/shared/tmp/cluster.csv', index=False)
upload_file_cos(cgsClient, path,credentials_1,'cluster.csv')

cluster.csv - File Uploaded -30/05/2021 22:59:26
