In [1]:
import pyspark
import glob
from datetime import datetime
import sys
from delta import *
from re import search, sub
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, BooleanType, FloatType, TimestampType, StructType, StructField, \
    DateType

In [2]:
sys.path.append('/opt/script')

In [3]:
path = "/home/cristianosilveira/projetos/cnes-kafka/data/BASE_DE_DADOS_CNES_202201/"
path_output = "/home/cristianosilveira/projetos/cnes-kafka/data/cnes/"

In [4]:
def get_files_csv_in_directory() -> list:
    return glob.glob(f"{path}*.csv")

In [5]:
APP_NAME = "teste"

In [6]:
builder = pyspark.sql.SparkSession.builder.appName(APP_NAME).master("local") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.21,io.delta:delta-core_2.12:1.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS delta")


22/03/04 17:04:50 WARN Utils: Your hostname, iliaDigital004 resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp2s0)
22/03/04 17:04:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cristianosilveira/.ivy2/cache
The jars for the packages stored in: /home/cristianosilveira/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e58e6352-daa9-44d7-aa42-e91c2af0a0e1;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 248ms :: artifacts dl 10ms
	:: modules in use:
	io.delta#delta-core_2.12;1.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	--------------------------------------------------------

DataFrame[]

In [7]:
def get_date(x): 
    try:
        if x is not None:
            return datetime.strptime(x, '%d/%m/%Y')
    except:
        return None
    
udf_get_date = F.udf(lambda x: get_date(x), DateType())


def get_boolean_value(x): 
    if x and x == "S":
        return True
    return False
udf_get_boolean_value = F.udf(lambda x: get_boolean_value(x), BooleanType())


def get_string_value(x): 
    if x: 
        return x.replace("'", "").replace(",", ";")
    return ""
udf_get_string_value = F.udf(lambda x: get_string_value(x), StringType())

In [8]:
params = {
    "CNES_URL":"jdbc:postgresql://localhost:5432/saude?zeroDateTimeBehavior=convertToNull&useTimezone=true&serverTimezone=UTC",
    "CNES_USER": "saude",
    "CNES_PASSWORD": "saude"
}

In [9]:
TO_CHAR_REGEX = "(?<=({}))(.*)(?=({}))"
list_csv = get_files_csv_in_directory()
list_schemas = []
for path in list_csv:
    table_name = path.split('/')[-1]
    table_name = table_name.split('.')[0]
    table_name = table_name.replace('202201','')    
    table_name = sub( '(?<!^)(?=[A-Z])', '_',table_name).lower()    
    df = spark.read \
        .option("delimiter", ";") \
        .option("header", "true") \
        .csv(path)    
    for column in df.columns:        
        if "TO_CHAR" in column:
            init_regex = 'TO_CHAR'
            end_regex = ','
            regex = search(
                TO_CHAR_REGEX.format(init_regex, end_regex), column
            )            
            if regex:
                new_column = regex[0][1:].lower()
        else:
            new_column = column.lower()
        new_column = new_column.replace("'", "")
        df = df.withColumnRenamed(column, new_column)        
        if new_column[0:3] in ['co_', 'nu_']:
            df = df.withColumn(new_column, F.col(new_column).cast(IntegerType()))
        if new_column[0:3]  == 'dt_':
            df = df.withColumn(new_column, udf_get_date(F.col(new_column)))    
        if new_column[0:3]  == 'st_':
            df = df.withColumn(new_column, udf_get_boolean_value(F.col(new_column)))    
        if new_column[0:3]  == 'ds_':
            df = df.withColumn(new_column, udf_get_string_value(F.col(new_column)))    
    # df \
    #     .repartition(1) \
    #     .write.format("csv") \
    #     .mode("overwrite") \
    #     .option("header", "true") \
    #     .save(f"{path_output}/{table_name}.csv")
    list_schemas.append({"table": table_name, "schema": df.schema})
            
#             .option("truncate", "true") \
#             .option("url", params["CNES_URL"]) \
#             .option("user", params["CNES_USER"]) \
#             .option("password", params["CNES_PASSWORD"]) \
#             .option("driver", "org.postgresql.Driver") \
#             .option("dbtable", table_name) \
#             .mode("ignore") \
#             .save()

In [10]:
def get_field(field_name: str, field_type: str) -> str:    
    if field_type == "IntegerType":
        return f"{field_name} int"
    if field_type == "StringType":
        return f"{field_name} varchar(255)"
    if field_type == "BooleanType":
        return f"{field_name} boolean"
    if field_type == "DateType":
        return f"{field_name} timestamp"

scripts = []
scripts_copy = []

for item in list_schemas:    
    fields = []
    fields_copy = []
    for field in item['schema']:
        field_name = str(field).split('(')[1].split(',')[0]
        field_type = str(field).split('(')[1].split(',')[1]
        fields.append(get_field(field_name, field_type))
        fields_copy.append(field_name)
    sql = f"""
            create table if not exists cnes.{item['table']} (
                id serial not null,
                {','.join(fields)}
            )
            ;            
            ALTER TABLE cnes.{item['table']} ADD 
                CONSTRAINT PK_{item['table']} PRIMARY KEY 
                (
                    id
                )
            ;            
            """
    scripts.append(sql)        
    sql = f"""insert into cnes.{item['table']} ({','.join(fields_copy)}) values (field_values);"""
    scripts_copy.append({"table": item['table'], "sql": sql})

In [11]:
# textfile = open("/home/cristianosilveira/projetos/cnes-kafka/postgres/cnes_schema.sql", "w")
# for element in scripts:
#     textfile.write(element + "\n")
# textfile.close()

In [12]:
len(scripts_copy)

107

In [13]:
path_data = "/home/cristianosilveira/projetos/cnes-kafka/data/cnes/"

for file_name in glob.glob(f"{path_data}*.csv"):
    table_name = file_name.split('/')[-1].split('.')[0]        
    table = [d for d in scripts_copy if d['table'] == table_name][0]        
    df = spark.read \
        .option("delimiter", ";") \
        .option("header", "true") \
        .csv(file_name)
    records = df.collect()
    with open(f"/home/cristianosilveira/projetos/cnes-kafka/postgres/data/{table['table']}.sql", "w") as f:
        for record in records:            
            field_values = str(record[0]).replace('""', 'null').split(',')
            field_values = "'" + "','".join(field_values).replace("'null'", 'null') + "'"         
            element = table['sql'].replace("field_values", field_values).replace("D' ", 'D').replace("'null'", 'null')
            f.write(element + "\n")    

22/03/04 17:08:06 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ContextCleaner$$Lambda$858/0x00000008406d4040.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:186)
	at org.apache.spark.ContextCleaner$$Lambda$790/0x000000084069ac40.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:180)
	at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:77)
22/03/04 17:08:06 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
	at org.apa

Py4JJavaError: An error occurred while calling o11355.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:373)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:369)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:369)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1(SparkPlan.scala:391)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1$adapted(SparkPlan.scala:390)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2688/0x000000084118e840.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset$$Lambda$2642/0x0000000841167840.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.Dataset$$Lambda$1374/0x0000000840bacc40.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1382/0x0000000840bb9040.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1375/0x0000000840bad840.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at jdk.internal.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)


In [None]:
# [d for d in scripts_copy if d['table'] == 'rl_estab_equipe_prof']

In [None]:
# textfile = open("/home/cristianosilveira/projetos/cnes-kafka/postgres/cnes_script.sql", "w")
# for element in inserts:
#     textfile.write(element + "\n")
# textfile.close()