In [4]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("Jupyter") \
    .config("spark.master", "spark://spark-master:7077") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.cores", "2") \
    .config("spark.driver.memory", "2g") \
    .config("spark.cores.max", "4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport()

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

:: loading settings :: url = jar:file:/usr/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a1870ca9-875f-4874-88dd-2fe5c07b4ce5;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in local-m2-cache
	found io.delta#delta-storage;2.4.0 in local-m2-cache
	found org.antlr#antlr4-runtime;4.9.3 in local-m2-cache
:: resolution report :: resolve 109ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from local-m2-cache in [default]
	io.delta#delta-storage;2.4.0 from local-m2-cache in [default]
	org.antlr#antlr4-runtime;4.9.3 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      d

## Testando Conexão HDFS

In [2]:
#Criando diretorio
!hdfs dfs -mkdir /users/Teste

In [3]:
#movendo arquivos do local para o HDFS
!hdfs dfs -put /usr/notebooks/curva.csv /users/Teste

In [4]:
#Listando diretorios/arquivos
!hdfs dfs -ls /users/Teste/

Found 1 items
-rw-r--r--   1 root supergroup         46 2024-08-18 02:37 /users/Teste/curva.csv


## Testando Hive

In [5]:
spark.sql('CREATE DATABASE IF NOT EXISTS teste')

DataFrame[]

In [6]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
|    teste|
+---------+



## Testando Spark

In [7]:
spark.range(10).show()

[Stage 0:>                                                          (0 + 4) / 4]

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



                                                                                

In [8]:
#Lendo arquivos do HDFS
df = spark.read.csv('/users/Teste/curva.csv',header=True,sep=';')

df.show()

                                                                                

+-----+-------+
|curva|funding|
+-----+-------+
|SELIC|   0.75|
|SELIC|   0.65|
|SELIC|   0.70|
+-----+-------+



                                                                                

In [9]:
#salvando tabela delta
df.write.format("delta").mode("overwrite").saveAsTable('teste.curva')

24/08/18 02:37:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/08/18 02:37:59 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`teste`.`curva` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [10]:
#Verificando resultados
spark.sql("select * from teste.curva").show()

                                                                                

+-----+-------+
|curva|funding|
+-----+-------+
|SELIC|   0.75|
|SELIC|   0.65|
|SELIC|   0.70|
+-----+-------+



## Testando SQL Server

In [14]:
connection_properties = {
    "user": "SA",
    "password": "YourStrong!Passw0rd",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

### Full load table

In [7]:
jdbc_url = "jdbc:sqlserver://sqlserver:1433;databaseName=testeDani"

df = spark.read.jdbc(url=jdbc_url, table="daniBoy", properties=connection_properties)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+---+
| Nome| ID|
+-----+---+
| Dani|  1|
| Jess|  2|
|Jamal|  3|
+-----+---+



                                                                                

### Select

In [21]:
query_cliente = '''(
SELECT

PAR.CODPARC AS 'Cód Parceiro',
PAR.RAZAOSOCIAL AS 'Razão Social',
PAR.CGC_CPF AS CNPJ,
ISNULL (PAR.TELEFONE,'') AS Telefone,
isnull (par.fax,'') as Celular,
ISNULL (PAR.EMAIL,'') AS 'E-mail',
ISNULL (UFS.UF,'')AS UF,
ISNULL (CID.NOMECID,'') AS Cidade

FROM SANKHYA.TGFPAR PAR
JOIN SANKHYA.TSIEND END1 ON PAR.CODEND = END1.CODEND
JOIN SANKHYA.TSIBAI BAI ON PAR.CODBAI = BAI.CODBAI
JOIN SANKHYA.TSICID CID ON CID.CODCID = PAR.CODCID
JOIN SANKHYA.TSIUFS UFS ON UFS.CODUF = CID.UF

WHERE PAR.ATIVO = 'S'
AND PAR.CLIENTE = 'S'
AND CGC_CPF IS NOT NULL
AND PAR.CODPARC > 3
AND PAR.TIPPESSOA = 'J'
) as cliente
'''

In [23]:
query_produto = '''
(SELECT
PRO.CODPROD AS 'Cod_Prod',
PRO.DESCRPROD AS 'Descricao',
RTRIM (PRO.REFFORN) AS 'Ref_Forn',
PRO.FABRICANTE AS 'Ref_Futura',
RTRIM (PRO.COMPLDESC) AS Complemento,
PRO.Ativo,
PRO.AD_DHINCLUSAO AS 'Dt_Cad',
PRO.CODLOCALPADRAO AS 'Cod_local_padrao',
GRU.DESCRGRUPOPROD AS Grupo,
GRU1.DESCRGRUPOPROD AS 'Sub_Grupo_1',
GRU2.DESCRGRUPOPROD AS 'Sub_Grupo_2',
GRU3.DESCRGRUPOPROd AS 'Sub_Grupo_3'

FROM SANKHYA.TGFPRO PRO
JOIN SANKHYA.TGFGRU GRU ON GRU.CODGRUPOPROD = (SELECT (CASE WHEN LEN(CODGRUPOPROD) = 8 THEN CONCAT(SUBSTRING(CONVERT(VARCHAR(10),CODGRUPOPROD),1,2), '000000')
	 ELSE CONCAT(SUBSTRING(right(rtrim(CODGRUPOPROD),8),1,1), '000000') END)
	 FROM SANKHYA.TGFGRU WHERE CODGRUPOPROD = PRO.CODGRUPOPROD)
JOIN SANKHYA.TGFGRU GRU1 ON GRU1.CODGRUPOPROD = (SELECT (CASE WHEN LEN(CODGRUPOPROD) = 8 THEN CONCAT(SUBSTRING(CONVERT(VARCHAR(10),CODGRUPOPROD),1,4), '0000')
	 ELSE CONCAT(SUBSTRING(right(rtrim(CODGRUPOPROD),8),1,3), '0000') END)
	 FROM SANKHYA.TGFGRU WHERE CODGRUPOPROD = PRO.CODGRUPOPROD)
JOIN SANKHYA.TGFGRU GRU2 ON GRU2.CODGRUPOPROD = (SELECT (CASE WHEN LEN(CODGRUPOPROD) = 8 THEN CONCAT(SUBSTRING(CONVERT(VARCHAR(10),CODGRUPOPROD),1,6), '00')
	 ELSE CONCAT(SUBSTRING(right(rtrim(CODGRUPOPROD),8),1,5), '00') END)
	 FROM SANKHYA.TGFGRU WHERE CODGRUPOPROD = PRO.CODGRUPOPROD)
JOIN SANKHYA.TGFGRU GRU3 ON GRU3.CODGRUPOPROD = PRO.CODGRUPOPROD

WHERE PRO.CODPROD > 12) as prod

'''

In [24]:
jdbc_url = "jdbc:sqlserver://sqlserver:1433;databaseName=SANKHYA_PARI"
df = spark.read.jdbc(url=jdbc_url,
table=query_produto, properties=connection_properties)
df.show()

+--------+--------------------+--------+----------+-----------+-----+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|Cod_Prod|           Descricao|Ref_Forn|Ref_Futura|Complemento|Ativo|             Dt_Cad|Cod_local_padrao|               Grupo|         Sub_Grupo_1|         Sub_Grupo_2|         Sub_Grupo_3|
+--------+--------------------+--------+----------+-----------+-----+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|     774|Cubo Mágico 3x3 6...| AKT2635|      null|       null|    S|               null|         1001000|Brinquedos       ...|Jogos            ...|Cubo Magico      ...|Cubo Magico      ...|
|   71108|ALMOFADA DE PELÚC...| TGS1259|     71108|       null|    S|2024-02-15 00:00:00|         1001000|Decoracao e Prese...|Capas e Almofadas...|Almofada         ...|Almofada         ...|
|   71109|URSO DE PELÚCIA C...| TGS1260|     