In [2]:
from dataeng_services.session import Session

session_provider = Session().client("session-provider")
aws_auth_dict = session_provider.get_temporary_credentials().json()

In [3]:
# Módulo necessário para "encontrar" o Spark no ambiente
import findspark
findspark.init()

# Módulo do PySpark
import pyspark

# Classes necessárias para configurar e iniciar o Spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [4]:
# Definindo o nome que daremos ao seu job Spark
name = "sohtec_database_query" 

# Criando o objeto com as configurações do Spark e atribuindo o nome ao job
conf = SparkConf().setAppName(name)

# Adicionando as configurações do Spark

# Aqui definiremos até quantos cores (cpus) cada executor terá a sua disposição para executar sua query
conf.set("spark.kubernetes.executor.limit.cores", 2 )

# Aqui definiremos até quanto de memória cada executor terá a sua disposição para executar sua query
conf.set("spark.kubernetes.executor.limit.memory", "4Gi")

# Aqui é definido o request cores, valor mínimo que o Kubernetes garante de ciclos de máquina para cada CPU.
conf.set("spark.kubernetes.executor.request.cores", 1 )

# Aqui é definido a memória que cada executor do Spark possuirá para executar o job
conf.set("spark.kubernetes.executor.request.memory", "2G" )

# Aqui é definido o número de executores (workers) que o Spark possuirá para executar o job
conf.set("spark.executor.instances", 2)

# Aqui é definido o número de cores (cpus) para cada executor que o Spark possuirá para executar o job
conf.set("spark.executor.cores", 1)

# Aqui é definido a memória que cada executor do Spark possuirá para executar o job
conf.set("spark.executor.memory", "2g")

# Aqui nós passamos as credenciais retornadas do SessionProvider para o Spark
# Isso dará ao Spark as mesmas permissões do nosso usuário de acesso aos Data-Lake.
conf.set("spark.hadoop.fs.s3a.access.key", aws_auth_dict['AccessKeyId'])
conf.set("spark.hadoop.fs.s3a.secret.key", aws_auth_dict['SecretAccessKey'])
conf.set("spark.hadoop.fs.s3a.session.token", aws_auth_dict['SessionToken'])

<pyspark.conf.SparkConf at 0x7faa265513d0>

In [5]:
#CONFIG DO SPARK PARA DRIVER MICROSOFT
conf.set('spark.jars.packages', 'com.microsoft.azure:spark-mssql-connector_2.12:1.1.0')

<pyspark.conf.SparkConf at 0x7faa265513d0>

In [7]:
# Aqui nós efetivamente criaremos os executores do Spark e teremos um Sessão disponível para utilizá-lo
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.microsoft.azure#spark-mssql-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-31df146a-ffac-4a83-864c-dd74108b66fa;1.0
	confs: [default]
	found com.microsoft.azure#spark-mssql-connector_2.12;1.1.0 in central
downloading https://repo1.maven.org/maven2/com/microsoft/azure/spark-mssql-connector_2.12/1.1.0/spark-mssql-connector_2.12-1.1.0.jar ...
	[SUCCESSFUL ] com.microsoft.azure#spark-mssql-connector_2.12;1.1.0!spark-mssql-connector_2.12.jar (13ms)
:: resolution report :: resolve 622ms :: artifacts dl 15ms
	:: modules in use:
	com.microsoft.azure#spark-mssql-connector_2.12;1.1.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-------

In [17]:
#NICOLLE: ali embaixo temos a query feita no banco de dados de sohtec; A unica coisa que mudaremos nessa 
#query ;futuramente é a data de execução.Por hora, não precisa mexer, talvez apenas colocar como ultima 
#data 2023-10-01,já que fechamos o mês 10. 

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:sqlserver://sohtec-db-server.database.windows.net") \
    .option("databaseName", "sohtec") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("user", "sohtecleitura") \
    .option("password", "Pqrxum99") \
    .option("numPartitions", 1) \
    .option("query", '''
select E.Id as EMPRESAID, E.Nome as NOME_EMPRESA, CE.Modulo as PRETENSAO, Year(CE.DataCriacao) as Ano, Month(CE.DataCriacao) as Mes, Isnull( Origem, 'SITE') as Origem, Count(distinct CE.ClienteEmail) as TotalLeads
from ClienteEmpresa CE
join Empresas E on E.Id = CE.EmpresaId
where CE.Removido = 0
and CE.EmpresaID in (select ID from Empresas where Ativo = 1) -- imobs ZAP+
and CE.DataCriacao between '2023-07-01' and '2023-09-01'
and CE.EmpresaId not in (1)
group by E.Id , E.Nome, Year(CE.DataCriacao) , CE.Modulo, Month(CE.DataCriacao), Origem
''') \
    .load()

df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---------+--------------------+---------+----+---+--------------+----------+
|EMPRESAID|        NOME_EMPRESA|PRETENSAO| Ano|Mes|        Origem|TotalLeads|
+---------+--------------------+---------+----+---+--------------+----------+
|        4|Barcellos Assesso...|  LOCACAO|2023|  7|      VivaReal|        76|
|        4|Barcellos Assesso...|  LOCACAO|2023|  7|  WHATSAPP_API|        15|
|        4|Barcellos Assesso...|   VENDAS|2023|  8|          SITE|         2|
|        4|Barcellos Assesso...|   VENDAS|2023|  8|           ZAP|         9|
|       11|IMOBILIARIA COMER...|  LOCACAO|2023|  8|          SITE|        15|
|       11|IMOBILIARIA COMER...|  LOCACAO|2023|  8|           ZAP|       141|
|       11|IMOBILIARIA COMER...|   VENDAS|2023|  7|           ZAP|         1|
|       12|     Cancian Imóveis|  LOCACAO|2023|  7|           OLX|        21|
|       16|   Banco Imobiliário|  LOCACAO|2023|  7|          SITE|        14|
|       16|   Banco Imobiliário|  LOCACAO|2023|  7|           ZA

                                                                                

In [1]:
#NICOLLE: Aqui convertemos o dataframe pro formato PANDAS, que é o adequado pra gente
#esse df_p é o produto final desse código inteiro, a tabela de dados sohtec. Esse df_p será o input
#para a parte do outro código, em que jogamos os dados pro datalake
df_p=df.toPandas()

NameError: name 'df' is not defined

In [22]:
print(df_p)

      EMPRESAID                      NOME_EMPRESA PRETENSAO   Ano  Mes  \
0             4  Barcellos Assessoria Imobiliária   LOCACAO  2023    7   
1             4  Barcellos Assessoria Imobiliária   LOCACAO  2023    7   
2             4  Barcellos Assessoria Imobiliária    VENDAS  2023    8   
3             4  Barcellos Assessoria Imobiliária    VENDAS  2023    8   
4            11             IMOBILIARIA COMERLATO   LOCACAO  2023    8   
...         ...                               ...       ...   ...  ...   
6272       1037         Accioly Group Imobiliária    VENDAS  2023    8   
6273       1043                      Remax Mediar   LOCACAO  2023    8   
6274       1043                      Remax Mediar    VENDAS  2023    8   
6275       1044                         Lopes LRT    VENDAS  2023    8   
6276       1044                         Lopes LRT    VENDAS  2023    8   

            Origem  TotalLeads  
0         VivaReal          76  
1     WHATSAPP_API          15  
2           

In [24]:
#NICOLLE: Esse comando encerra a sessão e remove os executores
# Sempre lembre de executar isso ao terminar de utilizar o Spark para evitar desperdício de recursos
spark.stop()