# PostgreSQL

In [1]:
# Instalação
!apt-get install -y postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl logrotate netbase
  postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common ssl-cert sysstat
Suggested packages:
  bsd-mailx | mailx postgresql-doc postgresql-doc-14 isag
The following NEW packages will be installed:
  libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl logrotate netbase
  postgresql postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common ssl-cert
  sysstat
0 upgraded, 13 newly installed, 0 to remove and 49 not upgraded.
Need to get 18.4 MB of archives.
After this operation, 51.6 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 logrotate amd64 3.19.0-1ubuntu1.1 [54.3 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd6

In [2]:
# Iniciando o serviço
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


In [3]:
# Configurando o usuário postgres
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';"

ALTER ROLE


In [4]:
# Criando o banco de dados
!sudo -u postgres psql -c "CREATE DATABASE anp;"

CREATE DATABASE


# Bibliotecas

In [5]:
import os
import glob
import shutil
import tarfile
import psycopg2
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, regexp_replace, to_date

# Caminho para os arquivos do projeto

In [6]:
# Área de trabalho do projeto
drive.mount('/content/drive')
workspace_path = '/content/drive/MyDrive/Workspace'

Mounted at /content/drive


In [7]:
# Caminho para os arquivos dos dados
data_path = f'{workspace_path}/fiap/pos-tech/data-analytics/fase-4/dados'

# Sessão Spark

In [8]:
spark = SparkSession.builder \
                    .appName('etl') \
                    .config('spark.driver.extraClassPath',
                            'postgresql-42.6.0.jar') \
                    .config('spark.executor.extraClassPath',
                            'postgresql-42.6.0.jar') \
                    .getOrCreate()

# Carga dos dados

## Carga dos dados em um dataframe Spark

In [9]:
# Pasta temporária
temp_path = 'tmp'
os.makedirs(temp_path, exist_ok=True)

In [10]:
# Extraindo os CSVs dos arquivos .tar.gz
for filename in os.listdir(data_path):
    if filename.endswith(".tar.gz"):
        tar_path = os.path.join(data_path, filename)
        with tarfile.open(tar_path, "r:gz") as tar:
            tar.extractall(path=temp_path)

In [11]:
# Carregando os CSVs no dataframe Spark
df = spark.read \
          .option('header', True) \
          .option('sep', ';') \
          .option('decimal', ',') \
          .option('inferSchema', True) \
          .csv(f'{temp_path}/*.csv')

In [12]:
# Função para ajustar o nome das colunas do dataframe
def to_snake_case(column_name):
    return column_name.strip().replace('-', '').replace(' ', '_') \
                      .replace('__', '_').lower()

In [13]:
# Ajustando o nome das colunas
for col_name in df.columns:
    df = df.withColumnRenamed(col_name, to_snake_case(col_name))

In [14]:
# Ajustando a coluna data_da_coleta
df = df.withColumn('data_da_coleta',
                   to_date(col('data_da_coleta'), 'dd/MM/yyyy'))

In [15]:
# Ajustando as colunas valor_de_venda e valor_de_compra
df = df.withColumn(
    'valor_de_venda',
    regexp_replace(col('valor_de_venda'), ',', '.').cast('double')
).withColumn(
    'valor_de_compra',
    regexp_replace(col('valor_de_compra'), ',', '.').cast('double')
)

In [16]:
# Informações do dataframe Spark
print('Número de linhas:', df.count())
df.printSchema()
df.show(1, truncate=False, vertical=True)

Número de linhas: 4401370
root
 |-- regiao_sigla: string (nullable = true)
 |-- estado_sigla: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- revenda: string (nullable = true)
 |-- cnpj_da_revenda: string (nullable = true)
 |-- nome_da_rua: string (nullable = true)
 |-- numero_rua: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- produto: string (nullable = true)
 |-- data_da_coleta: date (nullable = true)
 |-- valor_de_venda: double (nullable = true)
 |-- valor_de_compra: double (nullable = true)
 |-- unidade_de_medida: string (nullable = true)
 |-- bandeira: string (nullable = true)

-RECORD 0--------------------------------------
 regiao_sigla      | N                         
 estado_sigla      | AC                        
 municipio         | RIO BRANCO                
 revenda           | AUTO POSTO AMAPA - EIRELI 
 cnpj_da_revenda   |  00.529.581/0001-53       

## Carga dos dados no PostgreSQL

In [17]:
# Gravando os dados no PostgreSQL
df.write \
  .format('jdbc') \
  .option('url', 'jdbc:postgresql://localhost:5432/anp') \
  .option('dbtable', 'preco_combustivel') \
  .option('user', 'postgres') \
  .option('password', 'postgres') \
  .option('driver', 'org.postgresql.Driver') \
  .mode('overwrite') \
  .save()

## Teste de leitura dos dados à partir do PostgreSQL

In [18]:
# Conexão com o banco de dados
conn = psycopg2.connect(
    dbname='anp',
    user='postgres',
    password='postgres',
    host='localhost',
    port='5432'
)

# Cursor para execução da consulta
cursor = conn.cursor()

# Consulta a tabela
cursor.execute('SELECT COUNT(1) FROM preco_combustivel;')
record_count = cursor.fetchone()[0]
print('Número de linhas:', record_count)

cursor.execute('SELECT * FROM preco_combustivel LIMIT 3;')
rows = cursor.fetchall()
for row in rows:
    print(row)

Número de linhas: 4401370
('S', 'RS', 'CANOAS', 'METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA', ' 88.587.589/0001-17', 'AVENIDA GUILHERME SCHELL', '6340', None, 'CENTRO', '92310-000', 'GASOLINA', datetime.date(2019, 7, 1), 4.259, None, 'R$ / litro', 'BRANCA')
('S', 'RS', 'CANOAS', 'METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA', ' 88.587.589/0001-17', 'AVENIDA GUILHERME SCHELL', '6340', None, 'CENTRO', '92310-000', 'ETANOL', datetime.date(2019, 7, 1), 4.099, None, 'R$ / litro', 'BRANCA')
('S', 'RS', 'CANOAS', 'METROPOLITANO COMERCIO DE COMBUSTIVEIS LTDA', ' 88.587.589/0001-17', 'AVENIDA GUILHERME SCHELL', '6340', None, 'CENTRO', '92310-000', 'GNV', datetime.date(2019, 7, 1), 3.449, None, 'R$ / m³', 'BRANCA')


## Backup dos dados do PostgreSQL

In [19]:
# Criando o arquivo de backup do PostgreSQL
!PGPASSWORD=postgres pg_dump -U postgres -d anp -h localhost -F c -f tmp/anp_dump.backup

# Criando um arquivo compactado e dividido em partes de 9 MB do backup
!tar -czvf - tmp/anp_dump.backup | split -b 9M - tmp/anp_dump.tar.gz.part-

# Copiando os arquivos para o caminho persistente dos dados
for part_file in glob.glob('tmp/anp_dump.tar.gz.part-*'):
    shutil.copy(part_file, data_path)

tmp/anp_dump.backup
