In [22]:
from collections import defaultdict, OrderedDict
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, TimestampType, DoubleType
import csv
import re
from collections import defaultdict
import pandas as pd

In [23]:


def clean_structure(structure):
   
    cleaned_structure = re.sub(r'(?<=[\w\d\'"])((?:\s*,\s*){2,})', ',', structure)
    cleaned_structure = cleaned_structure.replace("',,", "',")

    return cleaned_structure

def generate_schemas(file_name):
    schemas = defaultdict(OrderedDict)

    with open(file_name, mode='r', encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=';', quotechar='"')

        for row in csv_reader:
            
            table_name = row.get('Tabela', '').lower()
            column_structure = row.get('estrutura', '').strip()

          
            column_structure = column_structure.replace(
                "StructField('", "StructField('").replace(",","").replace(
                    ",, ",",").replace("' ","',").replace("() True)","(), True)")

            if column_structure:
                if table_name not in schemas:
                    schemas[table_name] = OrderedDict()
                schemas[table_name][column_structure] = column_structure


    for table_name, fields in schemas.items():
        column_count = len(fields)
        print(f"Nome da Tabela: {table_name}\n")
        formatted_fields = ', \\\n'.join(fields.values())
        formatted_schema = f"StructType([\n{formatted_fields}])"
        print(f"Critério de Agrupamento: {table_name}, Contagem de Colunas: {column_count}")
        print(f"Colunas:\n{formatted_schema}\n")

file_name = r"C:\mlflowjobs\tabelas_teste.csv"
generate_schemas(file_name)

Nome da Tabela: ft_alerta_operacao_horario_celular_diferente

Critério de Agrupamento: ft_alerta_operacao_horario_celular_diferente, Contagem de Colunas: 9
Colunas:
StructType([
StructField('id_ft_alerta_operacao',LongType(), True), \
StructField('id_url',StringType(), True), \
StructField('id_dim_ambiente',LongType(), True), \
StructField('id_dim_colaborador',LongType(), True), \
StructField('status',IntegerType(), True), \
StructField('detalhe_alerta_operacao',StringType(), True), \
StructField('modelo_celular',StringType(), True), \
StructField('horario_celular',TimestampType(), True), \
StructField('horario_atomico',TimestampType(), True)])

Nome da Tabela: ft_alerta_operacao_nivel_bateria

Critério de Agrupamento: ft_alerta_operacao_nivel_bateria, Contagem de Colunas: 7
Colunas:
StructType([
StructField('id_ft_alerta_operacao',LongType(), True), \
StructField('id_url',StringType(), True), \
StructField('id_dim_ambiente',LongType(), True), \
StructField('id_dim_colaborador',LongTyp

In [24]:


def clean_structure(structure):
    cleaned_structure = re.sub(r'(?<=[\w\d\'"])((?:\s*,\s*){2,})', ',', structure)
    cleaned_structure = cleaned_structure.replace("',,", "',")
    return cleaned_structure

def generate_sql_queries(file_name):
    with open(file_name, mode='r', encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=';', quotechar='"')

        queries = defaultdict(list)

        for row in csv_reader:
            table_name = row.get('Tabela', '').lower()
            column_structure = row.get('estrutura', '').strip().replace("StructField('","").split("',")[0]

            if column_structure:

                column_structure = clean_structure(column_structure)

                queries[table_name].append(column_structure)

        for table_name, columns in queries.items():
  
            column_names = ',\n'.join(columns)

            query = f"""
    spark.sql('''SELECT 
                silver.involves_pnt.dim_colaborador(
                {column_names},
                insertion_at  NOT NULL,
                update_at  NOT NULL
                )
                FROM silver.{providerName}.{table_name}
                WHERE insertion_at = (SELECT MAX(insertion_at) FROM silver.{providerName}.{table_name})''')
            """
            print(f"Consulta para a tabela {table_name}:\n{query}\n")

file_name = r"C:\mlflowjobs\tabelas_teste.csv"
providerName = "seu_provider"
generate_sql_queries(file_name)


Consulta para a tabela ft_alerta_operacao_horario_celular_diferente:

    spark.sql('''SELECT 
                silver.involves_pnt.dim_colaborador(
                id_ft_alerta_operacao,
id_url,
id_dim_ambiente,
id_dim_colaborador,
status,
detalhe_alerta_operacao,
modelo_celular,
horario_celular,
horario_atomico,
                insertion_at  NOT NULL,
                update_at  NOT NULL
                )
                FROM silver.seu_provider.ft_alerta_operacao_horario_celular_diferente
                WHERE insertion_at = (SELECT MAX(insertion_at) FROM silver.seu_provider.ft_alerta_operacao_horario_celular_diferente)''')
            

Consulta para a tabela ft_alerta_operacao_nivel_bateria:

    spark.sql('''SELECT 
                silver.involves_pnt.dim_colaborador(
                id_ft_alerta_operacao,
id_url,
id_dim_ambiente,
id_dim_colaborador,
status,
detalhe_alerta_operacao,
nivel,
                insertion_at  NOT NULL,
                update_at  NOT NULL
                )

In [25]:


def clean_structure(structure):
    cleaned_structure = re.sub(r'(?<=[\w\d\'"])((?:\s*,\s*){2,})', ',', structure)
    cleaned_structure = cleaned_structure.replace("',,", "',")
    return cleaned_structure

def generate_sql_create_table(file_name):
    with open(file_name, mode='r', encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=';', quotechar='"')

        table_columns = defaultdict(list)

        for row in csv_reader:
            table_name = row.get('Tabela', '').lower()
            column_structure = row.get('estrutura', '').strip().replace("StructField('","").split("',")[0]

            if column_structure:
                column_structure = clean_structure(column_structure)
                table_columns[table_name].append(column_structure)

        sql_statements = []

        for table_name, columns in table_columns.items():
            columns.append("insertion_at TIMESTAMP NOT NULL")
            column_names = ',\n'.join(columns)
            sql_statement = f"%sql\nCREATE TABLE IF NOT EXISTS silver.involves_pnt.{table_name} (\n{column_names}\n)"
            sql_statements.append(sql_statement)

        return sql_statements


file_name = r"C:\mlflowjobs\tabelas_teste.csv"
providerName = "seu_provider"
sql_create_table_statements = generate_sql_create_table(file_name)

for statement in sql_create_table_statements:
    print(statement)


%sql
CREATE TABLE IF NOT EXISTS silver.involves_pnt.ft_alerta_operacao_horario_celular_diferente (
id_ft_alerta_operacao,
id_url,
id_dim_ambiente,
id_dim_colaborador,
status,
detalhe_alerta_operacao,
modelo_celular,
horario_celular,
horario_atomico,
insertion_at TIMESTAMP NOT NULL
)
%sql
CREATE TABLE IF NOT EXISTS silver.involves_pnt.ft_alerta_operacao_nivel_bateria (
id_ft_alerta_operacao,
id_url,
id_dim_ambiente,
id_dim_colaborador,
status,
detalhe_alerta_operacao,
nivel,
insertion_at TIMESTAMP NOT NULL
)
%sql
CREATE TABLE IF NOT EXISTS silver.involves_pnt.ft_alerta_operacao_tarefas_nao_realizadas (
id_ft_alerta_operacao,
id_url,
id_dim_ambiente,
id_dim_colaborador,
id_dim_pdv,
id_ft_visita,
status,
detalhe_alerta_operacao,
data_roteiro,
data_entrada,
data_saida,
justificativa_tarefa,
insertion_at TIMESTAMP NOT NULL
)
%sql
CREATE TABLE IF NOT EXISTS silver.involves_pnt.ft_alerta_pesquisa (
id_ft_alerta_pesquisa,
id_url,
id_dim_ambiente,
id_dim_pdv,
id_dim_colaborador,
id_dim_produto,

In [56]:
from collections import defaultdict
import csv

def clean_structure(structure):
    cleaned_structure = re.sub(r'(?<=[\w\d\'"])((?:\s*,\s*){2,})', ',', structure)
    cleaned_structure = cleaned_structure.replace("',,", "',")
    return cleaned_structure

def generate_conversion_columns(file_name):
    with open(file_name, mode='r', encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=';', quotechar='"')

        queries = defaultdict(list)

        for row in csv_reader:
            table_name = row.get('Tabela', '').lower()
            column_structure = row.get('estrutura', '').strip().replace("StructField('","").split("',")[0]

            if column_structure:
                column_structure = clean_structure(column_structure)
                queries[table_name].append(column_structure)

        for table_name, columns in queries.items():
         
            float_columns = [col for col in columns if 'DoubleType()' in col or 'FloatType()' in col]

            column_names = ', '.join(float_columns)

            code = f"""
            from pyspark.sql.functions import col, regexp_replace
            from pyspark.sql.types import DoubleType, FloatType
            from pyspark.sql import DataFrame

            # Substitua 'df' pelo DataFrame real
            df = ...

            columns_to_process = [
                {column_names}
            ]

            def replace_comma_with_period_and_cast(column: str) -> col:
                return col(column).cast(DoubleType() if 'DoubleType()' in column else FloatType()).alias(column)

            for column_name in columns_to_process:
                df = df.withColumn(
                    column_name,
                    regexp_replace(col(column_name), ",", ".").cast(DoubleType() if 'DoubleType()' in column_name else FloatType())
                )
            """

            print(f"Código de conversão para a tabela {table_name}:\n{code}\n")

file_name = r"C:\mlflowjobs\tabelas_teste.csv"
providerName = "seu_provider"
generate_conversion_columns(file_name)



Código de conversão para a tabela ft_alerta_operacao_horario_celular_diferente:

            from pyspark.sql.functions import col, regexp_replace
            from pyspark.sql.types import DoubleType, FloatType
            from pyspark.sql import DataFrame

            # Substitua 'df' pelo DataFrame real
            df = ...

            columns_to_process = [
                
            ]

            def replace_comma_with_period_and_cast(column: str) -> col:
                return col(column).cast(DoubleType() if 'DoubleType()' in column else FloatType()).alias(column)

            for column_name in columns_to_process:
                df = df.withColumn(
                    column_name,
                    regexp_replace(col(column_name), ",", ".").cast(DoubleType() if 'DoubleType()' in column_name else FloatType())
                )
            

Código de conversão para a tabela ft_alerta_operacao_nivel_bateria:

            from pyspark.sql.functions import col, regexp_replace