In [0]:
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import *

In [0]:
#Lista o diretório dos arquivos e salva em uma variável
arquivo = dbutils.fs.ls('FileStore/tables/csv_sp/')
print(arquivo)

[FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A701_D_2024_01_01_2024_07_31.csv', name='dados_A701_D_2024_01_01_2024_07_31.csv', size=12401, modificationTime=1725316480000), FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A705_D_2024_01_01_2024_07_31.csv', name='dados_A705_D_2024_01_01_2024_07_31.csv', size=12373, modificationTime=1725316480000), FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A706_D_2024_01_01_2024_07_31.csv', name='dados_A706_D_2024_01_01_2024_07_31.csv', size=12267, modificationTime=1725316481000), FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A707_D_2024_01_01_2024_07_31.csv', name='dados_A707_D_2024_01_01_2024_07_31.csv', size=12878, modificationTime=1725316481000), FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A708_D_2024_01_01_2024_07_31.csv', name='dados_A708_D_2024_01_01_2024_07_31.csv', size=12658, modificationTime=1725316481000), FileInfo(path='dbfs:/FileStore/tables/csv_sp/dados_A711_D_2024_01_01_2024_07_31.csv', name='dados_A711_D_2024

In [0]:
#Looping para percorrer a lista
for i in arquivo:
     
    csv = f"/FileStore/tables/csv_sp/{i.name}"
    
    #Condição para ter certeza que apenas arquivos .csv serão lidos
    if csv.endswith(".csv"):
        
        #Abre o arquivo csv
        df_csv = spark.read\
        .format("csv")\
        .option("sep", ";")\
        .option("skipRows", 10)\
        .option("header", True)\
        .option("inferSchema", True)\
        .load(csv)

        #Realiza nova leitura do arquivo
        df = spark.read\
        .format("csv")\
        .option("sep", ";")\
        .load(csv)

        #Filtra pelas primeiras 5 linhas
        df = df.limit(5)
        local = df.collect()

        #Realiza a extração do nome da cidade, localização, altura e código da estação
        j = []
        for locais in local:
                nome_completo = locais._c0.split(':')[1]
                j.append(nome_completo)
       

        #Convertendo todas as colunas para double
        colunas = df_csv.columns
        
        for coluna in colunas:

                if coluna != 'Data Medicao':
                        df_csv = df_csv.withColumn(coluna, regexp_replace(df_csv[coluna], ',', '.').cast("double"))

        #Passa os valores das primeiras linhas do arquivo
        df_csv = df_csv.withColumn("Cidade", lit(j[0]))
        df_csv = df_csv.withColumn("Codigo_Estacao", lit(j[1]))
        df_csv = df_csv.withColumn("Latitude", lit(j[2]))
        df_csv = df_csv.withColumn("Longitude", lit(j[3]))
        df_csv = df_csv.withColumn("Altitude", lit(j[4]))

        df_csv = df_csv.na.drop() #Realiza a remoção de linhas com dados nulos
        df_csv = df_csv.dropDuplicates() #Remove linhas duplicadas
        
        #Cria view temporária
        df_csv.createOrReplaceTempView('tb_csv')

        #Cria tabela
        spark.sql("""CREATE TABLE IF NOT EXISTS raw_csv (
        Data_Medicao date,
        PRECIPITACAO_TOTAL_DIARIO_AUT_mm double,
        PRESSAO_ATMOSFERICA_MEDIA_DIARIA_AUT_mB double,
        TEMPERATURA_DO_PONTO_DE_ORVALHO_MEDIA_DIARIA_AUT_C double,
        TEMPERATURA_MAXIMA_DIARIA_AUT_C double,
        TEMPERATURA_MEDIA_DIARIA_AUT_C double,
        TEMPERATURA_MINIMA_DIARIA_AUT_C double,
        UMIDADE_RELATIVA_DO_AR_MEDIA_DIARIA_AUT_percen double,
        UMIDADE_RELATIVA_DO_AR_MINIMA_DIARIA_AUT_percen double,
        VENTO_RAJADA_MAXIMA_DIARIA__AUT_ms double,
        VENTO_VELOCIDADE_MEDIA_DIARIA_AUT_ms double,
        Cidade string,
        Codigo_Estacao string,
        Latitude string,
        Longitude string,
        Altitude string
        )
        """)
        
        #Insere os dados na tabela criada
        spark.sql("""INSERT INTO raw_csv
                SELECT * FROM tb_csv
                """)

In [0]:
query = spark.sql("""SELECT * FROM raw_csv""")

query.write\
    .format("csv")\
    .option("header", True)\
    .option("sep", ";")\
    .mode("overwrite")\
    .save("/FileStore/tables/csv_final/estacoes_sp_null.csv")

#Limpa a tabela
display(query)

Data_Medicao,PRECIPITACAO_TOTAL_DIARIO_AUT_mm,PRESSAO_ATMOSFERICA_MEDIA_DIARIA_AUT_mB,TEMPERATURA_DO_PONTO_DE_ORVALHO_MEDIA_DIARIA_AUT_C,TEMPERATURA_MAXIMA_DIARIA_AUT_C,TEMPERATURA_MEDIA_DIARIA_AUT_C,TEMPERATURA_MINIMA_DIARIA_AUT_C,UMIDADE_RELATIVA_DO_AR_MEDIA_DIARIA_AUT_percen,UMIDADE_RELATIVA_DO_AR_MINIMA_DIARIA_AUT_percen,VENTO_RAJADA_MAXIMA_DIARIA__AUT_ms,VENTO_VELOCIDADE_MEDIA_DIARIA_AUT_ms,Cidade,Codigo_Estacao,Latitude,Longitude,Altitude
2024-01-01,20.0,941.0,18.2,27.0,20.3,17.0,88.7,63.0,7.0,0.9,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-02,0.0,939.1,17.6,27.5,20.2,14.8,86.3,61.0,5.6,0.9,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-03,0.0,938.0,18.8,27.3,20.7,15.8,89.8,66.0,10.0,1.1,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-04,3.6,939.3,18.7,23.7,19.8,17.2,93.2,75.0,6.4,1.0,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-05,1.4,939.2,18.9,27.0,20.8,17.3,89.9,64.0,7.3,0.9,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-06,2.6,939.6,19.6,27.2,20.9,18.9,92.8,65.0,6.0,0.6,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-07,2.6,938.6,19.6,31.9,23.4,17.1,82.0,41.0,5.6,0.7,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-08,0.0,937.1,20.1,36.1,26.1,20.1,74.3,29.0,7.9,1.1,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-09,8.2,939.0,20.9,33.3,24.7,19.3,81.8,46.0,8.4,1.1,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89
2024-01-10,7.6,939.0,20.8,32.9,24.7,18.7,81.8,42.0,9.2,1.0,BARRA DO TURVO,A746,-24.96277777,-48.41638888,659.89


In [0]:
spark.sql("""TRUNCATE TABLE raw_csv""")

DataFrame[]