In [8]:
import platform 
from pyspark.sql import SparkSession
from delta import *
import pandas as pd
from pyspark.sql import types

try:
    spark.stop()
except:
    pass
    
if platform.system() == 'Windows':
    path = 'C:'
    spark = (SparkSession.builder.master("local[*]")
        .appName("TesteHive")
        .config("hive.metastore.uris", "thrift://192.168.15.4:9083")
        .config("spark.sql.warehouse.dir","hdfs://localhost:9000/users/hive/warehouse")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.sql.adaptive.enabled", "true")
        .enableHiveSupport()
        .getOrCreate())
    spark
else:
    path = '/mnt/c'
    builder = (SparkSession.builder.appName("MyApp")
            .config("hive.metastore.uris", "thrift://192.168.15.4:9083")
            .config("spark.sql.warehouse.dir","hdfs://localhost:9000/users/hive/warehouse")
            .config("spark.jars.packages","io.delta:delta-core_2.12:2.4.0")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .enableHiveSupport())
    
    spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark

In [9]:
import sys
sys.path.insert(0, f"{path}/Users/Daniel/Desktop/Estudos/DataSus/scripts")
from ingestors import IngestaoBronze
import dbtools

In [10]:
table = "rd_sih"
path_full_load = f'/DataSus/rd/csv'
path_incremental = f'/DataSus/rd/csv'
file_format = 'csv'
table_name=table
database_name='bronze'
id_fields = ["N_AIH","DT_SAIDA","IDENT"]
timestamp_field= 'DT_SAIDA'
partition_fields=["ANO_CMPT","MES_CMPT"]
read_options = {'sep': ';','header': 'true'}

ingestao = IngestaoBronze(
            path_full_load=path_full_load,
            path_incremental=path_incremental,
            file_format=file_format,
            table_name=table_name,
            database_name=database_name,
            id_fields=id_fields,
            timestamp_field=timestamp_field,
            partition_fields=partition_fields,
            read_options=read_options,
            spark=spark)

Inferindo schema...
ok
Carregando query default
Ok.


In [11]:
if not spark._jsparkSession.catalog().tableExists(database_name, table):
    df_null = spark.createDataFrame(data=[], schema=ingestao.schema)
    ingestao.save_full(df_null)
    #dbutils.fs.rm(ingestao.checkpoint_path, True) -- Ver como fazer essa parte - acho q vou usar a lib do hdfs

In [12]:
ingestao.process_stream()

23/08/27 01:59:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7fd24ce90c70>

In [13]:
spark.sql('''

select 

substring(UF_zi,1,2) as UF,
count (distinct 0),
count (distinct mes_cmpt),
count (distinct N_AIH,DT_SAIDA,IDENT),
count(*)

from bronze.rd_sih

group by 1
order by 1
''').show(30)

[Stage 11:>                                                       (0 + 10) / 10]

+---+-----------------+------------------------+--------------------------------------+--------+
| UF|count(DISTINCT 0)|count(DISTINCT mes_cmpt)|count(DISTINCT N_AIH, DT_SAIDA, IDENT)|count(1)|
+---+-----------------+------------------------+--------------------------------------+--------+
| 12|                1|                      12|                                 83986|   83986|
| 16|                1|                      12|                                 71273|   71273|
+---+-----------------+------------------------+--------------------------------------+--------+



                                                                                

In [14]:
df = spark.read.csv('/DataSus/rd/csv',sep=";",header=True)
df.createOrReplaceTempView("test")

spark.sql('''

select 

substring(UF_zi,1,2) as UF,
count (distinct ano_cmpt),
count (distinct mes_cmpt),
count (distinct N_AIH,DT_SAIDA,IDENT),
count(*)

from test

group by 1
order by 1
''').show(30)



+---+------------------------+------------------------+--------------------------------------+--------+
| UF|count(DISTINCT ano_cmpt)|count(DISTINCT mes_cmpt)|count(DISTINCT N_AIH, DT_SAIDA, IDENT)|count(1)|
+---+------------------------+------------------------+--------------------------------------+--------+
| 12|                       3|                      12|                                 83986|  125396|
| 16|                       3|                      12|                                 71273|  105800|
+---+------------------------+------------------------+--------------------------------------+--------+



                                                                                