In [None]:
pip install requests

In [None]:
from pyspark.sql.types import*
from pyspark.sql.functions import*
from pyspark.sql.functions import col
from pyspark.sql.functions import sum, avg, max
from pyspark.sql.functions import round, col
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
#Função para criar um ponto de montagem com o blob - necessário somente o nome do blob.
def create_mount_point(container):  #Nome do container que iremos utilizar
    
    conta = 'nome_da_conta' #Nome do Azure Storage Account que iremos retirar os dados
    chave = dbutils.secrets.get(scope="scope", key="BlobKeyVault") #Escopos de segredos utilizado com o Azure Key Vault
    #Para acessar o Azure Storage Account usamos um ponto de montagem (mount_point) no Databricks, informamos o source, mount_point e extra_configs.
    #Para evitar que a senha fique exposta do nosso blob usamos a chave juntamente com o Azure Key Vault.
    try:
        dbutils.fs.mount(source = f"wasbs://{container}@{conta}.blob.core.windows.net",
                         mount_point = f"/mnt/{container}",
                         extra_configs = {f"fs.azure.account.key.{conta}.blob.core.windows.net": chave})
    except:
        print('Mount Point já existe ou nome incorreto do container')

    
#Função para desmontar um ponto de montagem com o blob - necessário somente o nome do blob.    
def dismount_point(container): #Nome do container que iremos desmontar
    try:
        dbutils.fs.unmount(f'/mnt/{container}')
    except:
        print('Mount Point não existe ou nome incorreto do container')

In [None]:
# para desmontar => dbutils.fs.unmount("/mnt/<mount-name>")
#create_mount_point('conteudo-databricks')

In [None]:
# Função para ler uma tabela do banco
def read_database(tabela):
    
    # Key Vault - Secrets para acessar o banco de dados 
    username = dbutils.secrets.get(scope = "scope", key = "BdLogin") # Nome de usuario do banco de dados
    password = dbutils.secrets.get(scope = "scope", key = "BdSecret") # Senha do banco de dados
    
    #Informações de conexão com o banco de dados do BluetCamp
    jdbcHostname  = 'svr-host.database.windows.net'
    jdbcDatabase  = 'db-database'
    jdbcPort      = 1433
    jdbcUrl       = f'jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase}'
    
    # Variavel que será utilizad para leitura
    df_table = (spark.read.format("jdbc").option("url", jdbcUrl).option("dbtable", tabela).option("user", username).option("password", password).load())
    
    return df_table

# Função de escrever nas tabelas no banco de dados
def write_database(df, tabela):
    
    # Key Vault - Secrets para acessar o banco de dados
    username = dbutils.secrets.get(scope = "scope", key = "BdLogin") # Nome de usuario do banco de dados
    password = dbutils.secrets.get(scope = "scope", key = "BdSecret") # Senha do banco de dados
    
    #Informações de conexão com o banco de dados do BluetCamp
    jdbcHostname  = 'svr-bluecamp.database.windows.net'
    jdbcDatabase  = 'db-BlueCamp'
    jdbcPort      = 1433
    jdbcUrl       = f'jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase}'
    
    # Variavel que será utilizada para escrever as informações no banco de dados
    df.write.format('jdbc').mode('overwrite').option('url', jdbcUrl).option('dbtable', tabela).option("user", username).option("password", password).save() 

In [None]:
# função para ler a tabela do banco 
df_matricula = read_database('[dbo].[matricula]')
df_matricula.show()

In [None]:
df_matricula1 = df_matricula.select(col("usuario_id"),
                                    col('bootcamp_id'),
                                    col("mat_data").alias("data_matricula_usuario"),
                                    col('mat_data_conclusao').alias('data_conclusao_usuario')
                                   
                                   )
df_matricula.show()

In [None]:
write_database(df_matricula1, 'dw.matricula_usuario')

In [None]:
df_modulo_usu_acomp = read_database('[dbo].[modulo_usu_acomp]')

#visualizando o dataframe
df_modulo_usu_acomp.display()

#informações de cada coluna
df_modulo_usu_acomp.printSchema()

#contagem das linhas da tabela
df_modulo_usu_acomp.count()

In [None]:
#diminuindo a coluna de data de finalização com a data de início
diferenca_horas = col('moa_data_fim').cast('long') - col('moa_data_ini').cast('long')

#Transformando a diferença de datas em horas
df_modulo_usu_acomp1 = df_modulo_usu_acomp.withColumn('tempo_horas_conclusao_modulo', diferenca_horas / 3600)

#visualizando o dataframe
df_modulo_usu_acomp1.show()


In [None]:
#Arredondando para duas casas decimais o tempo de conclusão por módulo
df_modulo_usu_acomp2 = df_modulo_usu_acomp1.select('*', round(col('tempo_horas_conclusao_modulo'),2))

#visualização do dataframe
df_modulo_usu_acomp2.show()

In [None]:
#selecionando a coluna para exclusão
df_modulo_usu_acomp3 = df_modulo_usu_acomp2.drop('tempo_horas_conclusao_modulo')

#visualizando o dataframe
df_modulo_usu_acomp3.show()

In [None]:
df_modulo_usu_acomp4 = df_modulo_usu_acomp3.select(col('modulo_id'),
                                                   col('usuario_id'),
                                                   col('moa_data_ini').alias('data_inicio_modulo'),
                                                   col('moa_data_fim').alias('data_termino_modulo'),
                                                   col('round(tempo_horas_conclusao_modulo, 2)').alias('tempo_conclusao_modulo_horas'))

#visualizando o dataframe
df_modulo_usu_acomp4.show()


In [None]:
#salvar tabela
write_database(df_modulo_usu_acomp4, 'dw.acompanhamento_modulo_usuario')

In [None]:
df_tempo_modulo_usu_acomp = df_modulo_usu_acomp4.groupBy('modulo_id') \
                                                .agg(sum('tempo_conclusao_modulo_horas').alias('soma_tempo_horas_conclusao_modulo'), \
                                                     avg('tempo_conclusao_modulo_horas').alias('media_tempo_horas_conclusao_modulo'), \
                                                     min('tempo_conclusao_modulo_horas').alias('min_tempo_horas_conclusao_modulo'), \
                                                     max('tempo_conclusao_modulo_horas').alias('max_tempo_horas_conclusao_modulo'))

#df_tempo_modulo_usu_acomp.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_tempo_modulo_usu_acomp1 = df_tempo_modulo_usu_acomp.select('*', round(col('soma_tempo_horas_conclusao_modulo'),2))

#excluindo a coluna não arredondada
df_tempo_modulo_usu_acomp2 = df_tempo_modulo_usu_acomp1.drop('soma_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_tempo_modulo_usu_acomp2.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_tempo_modulo_usu_acomp3 = df_tempo_modulo_usu_acomp2.select('*', round(col('media_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_tempo_modulo_usu_acomp4 = df_tempo_modulo_usu_acomp3.drop('media_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_tempo_modulo_usu_acomp4.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_tempo_modulo_usu_acomp5 = df_tempo_modulo_usu_acomp4.select('*', round(col('min_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_tempo_modulo_usu_acomp6 = df_tempo_modulo_usu_acomp5.drop('min_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_tempo_modulo_usu_acomp6.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_tempo_modulo_usu_acomp7 = df_tempo_modulo_usu_acomp6.select('*', round(col('max_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_tempo_modulo_usu_acomp8 = df_tempo_modulo_usu_acomp7.drop('max_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_tempo_modulo_usu_acomp8.display()

In [None]:
df_tempo_modulo_usu_acomp9 = df_tempo_modulo_usu_acomp8.select(col('modulo_id'),
                                                               col('round(soma_tempo_horas_conclusao_modulo, 2)').alias('soma_tempo_horas_conclusao_modulo'),
                                                               col('round(media_tempo_horas_conclusao_modulo, 2)').alias('media_tempo_horas_conclusao_modulo'),
                                                               col('round(min_tempo_horas_conclusao_modulo, 2)').alias('min_tempo_horas_conclusao_modulo'),
                                                               col('round(max_tempo_horas_conclusao_modulo, 2)').alias('max_tempo_horas_conclusao_modulo')
                                                              
                                                              )
#df_tempo_modulo_usu_acomp9.display()

In [None]:
#salvando tabela
write_database(df_tempo_modulo_usu_acomp9, 'dw.acompanhamento_modulo_usuario_tempo')

In [None]:
df_modulo_tempo_usu_acomp = df_modulo_usu_acomp1.groupBy('usuario_id') \
                                                .agg(sum('tempo_horas_conclusao_modulo').alias('soma_tempo_horas_conclusao_modulo'), \
                                                     avg('tempo_horas_conclusao_modulo').alias('media_tempo_horas_conclusao_modulo'), \
                                                     min('tempo_horas_conclusao_modulo').alias('min_tempo_horas_conclusao_modulo'), \
                                                     max('tempo_horas_conclusao_modulo').alias('max_tempo_horas_conclusao_modulo'))

#df_modulo_tempo_usu_acomp.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_modulo_tempo_usu_acomp1 = df_modulo_tempo_usu_acomp.select('*', round(col('soma_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_modulo_tempo_usu_acomp2 = df_modulo_tempo_usu_acomp1.drop('soma_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_modulo_tempo_usu_acomp2.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_modulo_tempo_usu_acomp3 = df_modulo_tempo_usu_acomp2.select('*', round(col('media_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_modulo_tempo_usu_acomp4 = df_modulo_tempo_usu_acomp3.drop('media_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_modulo_tempo_usu_acomp4.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_modulo_tempo_usu_acomp5 = df_modulo_tempo_usu_acomp4.select('*', round(col('min_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_modulo_tempo_usu_acomp6 = df_modulo_tempo_usu_acomp5.drop('min_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_modulo_tempo_usu_acomp6.display()

In [None]:
#arredondando o dataframe com duas casas decimais
df_modulo_tempo_usu_acomp7 = df_modulo_tempo_usu_acomp6.select('*', round(col('max_tempo_horas_conclusao_modulo'), 2))

#excluindo a coluna não arredondada
df_modulo_tempo_usu_acomp8 = df_modulo_tempo_usu_acomp7.drop('max_tempo_horas_conclusao_modulo')

#visualização do dataframe
#df_modulo_tempo_usu_acomp8.display()

In [None]:
df_modulo_tempo_usu_acomp9 = df_modulo_tempo_usu_acomp8.select(col('usuario_id'),
                                                               col('round(soma_tempo_horas_conclusao_modulo, 2)').alias('soma_tempo_horas_conclusao_modulo'),
                                                               col('round(media_tempo_horas_conclusao_modulo, 2)').alias('media_tempo_horas_conclusao_modulo'),
                                                               col('round(min_tempo_horas_conclusao_modulo, 2)').alias('min_tempo_horas_conclusao_modulo'),
                                                               col('round(max_tempo_horas_conclusao_modulo, 2)').alias('max_tempo_horas_conclusao_modulo')
                                                              
                                                              )

#df_modulo_tempo_usu_acomp9.display()

In [None]:
#salvar tabela
write_database(df_modulo_tempo_usu_acomp9, 'dw.modulo_tempo_usu_acomp')

In [None]:
usuario_interesse = read_database('[dbo].[usuario_interesse]')

usuario_interesse.display()

In [None]:
usuario_interesse1 = usuario_interesse.select(col('Email'),
                                              col('nome'),
                                              col('sobrenome'),
                                              col('data_nascimento'),
                                              col('idade'),
                                              col('genero'),
                                              col('telefone'),
                                              col('estado'),
                                              col('aprovado'),
                                              col('comentario').alias('Comentário'),
                                              col('nota').alias('nota_teste_form_interesse')
                                             )

usuario_interesse1.display()

In [None]:
usuario_interesse2 = usuario_interesse1.filter(usuario_interesse1.aprovado == 'True')
usuario_interesse2.show()

In [None]:
usuario_interesse2 = usuario_interesse2.withColumn("aprovado", when(usuario_interesse2.aprovado == "True", "Aprovado")
                                                   .otherwise(usuario_interesse2.aprovado))

In [None]:
usuario_interesse2 = usuario_interesse2.withColumn('id_faixa_etaria',
            when((col('idade') >= 60) , 1)
            .when((col('idade') >= 45) , 2)
            .when((col('idade') >= 30) , 3)
            .when((col('idade') >= 18) , 4)
            .otherwise(5))
usuario_interesse2.show()

In [None]:
usuario_interesse2 = (usuario_interesse2.withColumn("data_nascimento",to_date("data_nascimento", "yyyy-MM-dd")))

In [None]:
usuario_interesse3 = usuario_interesse2.withColumn('nota_teste_form_interesse', (10/12)*F.col('nota_teste_form_interesse')). withColumnRenamed('nota_teste_form_interesse', 'nota_teste_form_interesse')

usuario_interesse3.show()

In [None]:
#arredondando o dataframe com duas casas decimais
usuario_interesse4 = usuario_interesse3.select('*', round(col('nota_teste_form_interesse'), 2))

#excluindo a coluna não arredondada
usuario_interesse5 = usuario_interesse4.drop('nota_teste_form_interesse')

#Renomeando a coluna
usuario_interesse6 = usuario_interesse5.withColumnRenamed('round(nota_teste_form_interesse, 2)', 'nota_teste_form_interesse')

#visualização do dataframe
usuario_interesse6.show()

In [None]:
usuario = read_database('[dbo].[usuario]')
usuario.show()

In [None]:
# Selecionando e renomeando
usuario1 = usuario.select(col('date_joined').alias('data_inicio_cadastro'),
                          col('id').alias('id_usuario'),
                          col('email').alias('E_mail'),                          
                         )

usuario1.show()

In [None]:
#união das colunas de interesse
info_usuarios = usuario_interesse6.join(usuario1, usuario_interesse6.Email == usuario1.E_mail, 'inner')

#excluir colunas iguais
info_usuarios1 = info_usuarios.drop('Email')

#visualização do dataframe
info_usuarios1.show()

In [None]:
#criando faixas etárias
dados_faixa_etaria = ([1, 'Maior de 60 anos'],
                [2, 'De 45 a 59 anos'],
                [3, 'De 30 a 44 anos'],
                [4, 'De 18 a 29 anos'],
                [5, 'De 0 a 17 anos']
               )

#mudança dos tipos de dados
schema_faixa_etaria = (StructType([
    StructField('id_faixa_etaria', IntegerType(), True),
    StructField('faixa_etaria', StringType(), True)
]))

#criando novo dataframe da faixa etária
df_faixa_etaria = spark.createDataFrame(data = dados_faixa_etaria, schema = schema_faixa_etaria)

df_faixa_etaria.show()


In [None]:
info_usuarios1.show()

In [None]:
#salvar tabela
write_database(info_usuarios1, 'dw.info_usuarios')

In [None]:
avaliacao_usu_acomp = read_database('[dbo].[avaliacao_usu_acomp]')



In [None]:
avaliacao_usu_acomp1 = avaliacao_usu_acomp.select(col('usuario_id').alias('user_id'),
                                                  col('aprovado').alias('aprovado_avaliacao'),
                                                  col('nota').alias('nota_avaliacao'),
                                                  col('avaliacao_id'),
                                                  col('data').alias('data_inicio_avaliacao'),
                                                  col('data_conclusao').alias('data_conclusao_avaliacao'),
                                                  col('qnt_acertos'),
                                                  col('opiniao_avaliacao'),
                                                  col('opiniao_conteudo')
                                                 
                                                 )
avaliacao_usu_acomp1.show()

In [None]:
avaliacao_usu_acomp2 = info_usuarios1.join(avaliacao_usu_acomp1, info_usuarios1.id_usuario == avaliacao_usu_acomp1.user_id, 'inner')
avaliacao_usu_acomp2.show()

In [None]:
avaliacao_usu_acomp3 = avaliacao_usu_acomp2.select(col('nome'),
                                                   col('sobrenome'),
                                                   col('telefone'),
                                                   col('E_mail').alias('e-mail'),
                                                   col('nota_teste_form_interesse'),
                                                   col('id_usuario'),
                                                   col('nota_avaliacao'),
                                                   col('aprovado_avaliacao'),
                                                   col('avaliacao_id'),
                                                   col('data_inicio_avaliacao'),
                                                   col('data_conclusao_avaliacao'),
                                                   col('qnt_acertos'),
                                                   col('opiniao_avaliacao'),
                                                   col('opiniao_conteudo'))         

avaliacao_usu_acomp3.show()

In [None]:
#cálculo tempo de realização das atividades
realizacao_atividade = col('data_conclusao_avaliacao').cast('long') - col('data_inicio_avaliacao').cast('long')

#passando o tempo para minutos
avaliacao_usu_acomp4 = avaliacao_usu_acomp3.withColumn('tempo_realizado_atividade', realizacao_atividade / 60)

#visualização do dataframe
avaliacao_usu_acomp4.show()

In [None]:
usuario_interesse6 = usuario_interesse5.withColumnRenamed('round(nota_teste_form_interesse, 2)', 'nota_teste_form_interesse')

In [None]:
#arredondando a coluna em dois decimais
avaliacao_usu_acomp5 = avaliacao_usu_acomp4.select('*', round(col('tempo_realizado_atividade'), 2))

#excluindo a coluna não arredondada
avaliacao_usu_acomp6 = avaliacao_usu_acomp5.drop('tempo_realizado_atividade')

#renomeando a coluna arredondada
avaliacao_usu_acomp7 = avaliacao_usu_acomp6.withColumnRenamed('round(tempo_realizado_atividade, 2)', 'tempo_realizado_atividade(min)')

avaliacao_usu_acomp7.show()


In [None]:
avaliacao_usu_acomp8 = avaliacao_usu_acomp7.na.drop(how="any")
avaliacao_usu_acomp8.show()

In [None]:
write_database(avaliacao_usu_acomp8, 'dw.avaliacao_usu_acomp4')

In [None]:
infos_gerais_avaliacao = avaliacao_usu_acomp4.groupBy('avaliacao_id') \
                                             .agg(avg('nota_avaliacao').alias('média_nota_avaliacao'), \
                                                  min('nota_avaliacao').alias('nota_minima_avaliacao'), \
                                                  max('nota_avaliacao').alias('nota_maxima_avaliacao'), \
                                                  count('nota_avaliacao').alias('qntde_notas_avaliacao'), \
                                                                                                  
infos_gerais_avaliacao.show()

avaliacao_id,média_nota_avaliacao,nota_minima_avaliacao,nota_maxima_avaliacao,qntde_notas_avaliacao
26,6.665,3.33,10.0,2
27,6.67,6.67,6.67,1
12,4.332,0.0,10.0,10
22,7.0,4.0,10.0,4
13,7.085,5.0,10.0,4
16,10.0,10.0,10.0,3
20,6.5,4.0,8.0,4
19,4.073333,0.0,10.0,9
15,7.2,4.0,10.0,5
9,5.8325,3.33,10.0,8


In [None]:
#arredondando a coluna em dois decimais
infos_gerais_avaliacao1 = infos_gerais_avaliacao.select('*', round(col('média_nota_avaliacao'), 2))

#excluindo coluna não arredondada
infos_gerais_avaliacao2 = infos_gerais_avaliacao1.drop('média_nota_avaliacao')

#renomeando coluna arredondada
infos_gerais_avaliacao3 = infos_gerais_avaliacao2.withColumnRenamed('round(média_nota_avaliacao, 2)', 'media_nota_avaliacoes')

#visualização do dataframe
infos_gerais_avaliacao3.show()

In [None]:
write_database(infos_gerais_avaliacao3, 'dw.infos_gerais_avaliacao4')

In [None]:
infos_gerais_aluno_avaliacao = avaliacao_usu_acomp4.groupBy('id_usuario') \
                                             .agg(avg('nota_avaliacao').alias('média_nota_avaliacao'), \
                                                  min('nota_avaliacao').alias('nota_minima_avaliacao'), \
                                                  max('nota_avaliacao').alias('nota_maxima_avaliacao'), \
                                                  count('nota_avaliacao').alias('qntde_notas_avaliacao'), \
                                                                                                  
                                                 )
infos_gerais_aluno_avaliacao.show()

In [None]:
write_database(infos_gerais_aluno_avaliacao, 'dw.infos_gerais_aluno_avaliacao')

In [None]:
#mensagem_usu_acomp = spark.read.format('jdbc').option('url', jdbcUrl).option('dbtable', '[dbo].[mensagem_usu_acomp]').option("user", username).option("password", password).load()

#mensagem_usu_acomp.display()

In [None]:
#write_database(mensagem_usu_acomp, 'mensagem_usu_acomp')

In [None]:
#dbutils.fs.unmount("/mnt/conteudo-databricks")