# <font color="red"> MBA em IA e Big Data</font>
## <span style="color:red">Linguagens e Ferramentas para Inteligência Artificial e Big Data (Python e SQL)</span>

### <span style="color:darkred">Importando e processando dados do Oracle via Python</span>

*Jose Fernando Rodrigues Junior*<br>
*ICMC/USP São Carlos*

# Instalação
* Oracle over Python: https://www.oracle.com/database/technologies/appdev/python/quickstartpythononprem.html#linux-tab
* Download **Instant Client Basic Light**:
    - Linux: https://download.oracle.com/otn_software/linux/instantclient/oracle-instantclient-basiclite-linuxx64.rpm
    - Windows: https://www.oracle.com/database/technologies/instant-client/winx64-64-downloads.html 
    - MacOS: https://www.oracle.com/database/technologies/instant-client/macos-intel-x86-downloads.html
* Linux:
    * sudo alien -i oracle-instantclient-basiclite-linuxx64.rpm
    * pip install cx_Oracle

In [1]:
#Instalar o Oracle Instant Client (basic ou light) - sudo alien -i oracle-instantclient-basiclite-linuxx64.rpm
#Depois instalar o cx_oracle: conda install -c anaconda cx_oracle
import cx_Oracle #documentacao https://cx-oracle.readthedocs.io/en/latest/user_guide/introduction.html
import pandas as pd
import sqlalchemy

* O `cx_Oracle` é um módulo de extensão Python que permite acesso ao banco de dados Oracle; o cx_Oracle está, parcialmente, em conformidade com a especificação da `DB-API 2.0` para banco de dados por meio de Python;
* A python DB-API é uma especificação para uma interface comum para bancos de dados relacionais definida no documento `Python Enhancement Proposal 249` (https://www.python.org/dev/peps/pep-0249/);
* `SQLAlchemy` é um módulo Python com dois objetivos:
    * Kit de ferramentas Python-SQL: oferece funcionalidades flexíveis para uso simplificado e abstrato de SQL;
    * Mapeador Relacional de Objetos: uma técnica de mapeamento objeto relacional que permite fazer uma relação dos objetos programados com os dados relacionais que os mesmos representams.

In [2]:
#Howto: https://www.oracle.com/database/technologies/appdev/python/quickstartpythononprem.html#copy
#executar uma vez por sessão - lib dir é o diretório onde se encontra o arquivo libclntsh.so
try:
    cx_Oracle.init_oracle_client(lib_dir="/usr/lib/oracle/21/client64/lib/")
except Exception as e:
    print(str(e))

In [3]:
dsnStr = cx_Oracle.makedsn("grad.icmc.usp.br", "15215", "orcl")
conexao = cx_Oracle.connect(user="M28804725800", password="M28804725800", dsn=dsnStr)
print(conexao.version)

#No caso de PostgreSQL, usa-se a biblioteca psycopg2
#import psycopg2
#conn = psycopg2.connect(host=host_address, database=name_of_database, user=user_name, password=user_password)

11.2.0.1.0


## Cursor
* Uma conexão fornece objetos do tipo `cursor`; os quais permitem operações DDL e DML no banco conectado.

In [5]:
cursorDDL = conexao.cursor()
cursorDML = conexao.cursor()

## Criação de um esquema a partir de um script sql

    1) Aqui fazemos a leitura completa de um arquivo;
    2) Quebra do texto usando a operação de split, considerando o ";" como separador;
    3) Para cada comando encontrado, executamos o SQL por meio de um cursor;
    4) Tratamos eventuais erros usando uma construção try-except.

In [17]:
#Primeiro o esquema
pathEsquemaFutebol = '/MBA_AI_e_ML/01CursoLinguagens e Ferramentas_Frameworks/Semana4-Python_e_Frameworks/00Dados_de_apoio-Aula07/'
fd = open(pathEsquemaFutebol+'Esquema_Futebol.sql', 'r')
sqlFile = fd.read()

fd.close()

sqlCommands = sqlFile.split(';')
for command in sqlCommands:
    if(command == ''): break;
    try:
        print(command)
        #print()
        cursorDDL.execute(command)
    except Exception as msg:
        print("Erro de SQL: "+ str(msg) + ": "+ command)
print('Script finalizado.')

Drop table time cascade constraints

Drop table joga cascade constraints

Drop table partida cascade constraints

Drop table jogador cascade constraints

Drop table posicao_jogador cascade constraints

Drop table diretor cascade constraints

Drop table uniforme cascade constraints


CREATE TABLE time (
      nome       VARCHAR2(40) NOT NULL,
      estado     CHAR(2),
      tipo       VARCHAR2(15),
      saldo_gols INTEGER,
      
      CONSTRAINT pk_time PRIMARY KEY (nome),
      CONSTRAINT ck_tipo CHECK (tipo in ('amador', 'profissional'))
)


CREATE TABLE joga (
      time1      VARCHAR2(40) NOT NULL,
      time2      VARCHAR2(40) NOT NULL,
      classico   CHAR(1),

      CONSTRAINT pk_joga PRIMARY KEY (time1, time2),
      CONSTRAINT fk_joga1 FOREIGN KEY (time1) REFERENCES time(NOME) ON DELETE CASCADE,
      CONSTRAINT fk_joga2 FOREIGN KEY (time2) REFERENCES time(NOME) ON DELETE CASCADE,
      CONSTRAINT ck_classico CHECK (classico in ('S', 'N'))
)


CREATE TABLE partida (
      ti

In [18]:
#Agora as relações instanciadas
fd = open(pathEsquemaFutebol+'Dados_Futebol.sql', 'r')
sqlFile = fd.read()
fd.close()

sqlCommands = sqlFile.split(';')
for command in sqlCommands:
    try:
        print(command)
        #print()
        cursorDDL.execute(command)        
    except Exception as msg:
        print("Erro de SQL: "+ str(msg) + ": |"+ command+'|')
print('Script finalizado.')

insert into time values ('Sao Paulo',     'SP', 'profissional', 8)

insert into time values ('Palmeiras',     'SP', 'profissional', 5)

insert into time values ('Santos',        'SP', 'profissional', 0)

insert into time values ('Corinthians',   'SP', 'profissional', 6)

insert into time values ('Paulistinha',   'SP', 'amador',       1)

insert into time values ('Ibate',         'SP', 'amador',       0)

insert into time values ('Cruzeiro',      'MG', 'profissional', 2)

insert into time values ('Atletico',      'MG', 'profissional', 3)

insert into time values ('Frutal',        'MG', 'amador',       1)


insert into joga values ('Sao Paulo',   'Palmeiras',   'S')

insert into joga values ('Ibate',       'Paulistinha', 'N')

insert into joga values ('Ibate',       'Frutal',      'S')

insert into joga values ('Santos',      'Corinthians', 'N')

insert into joga values ('Corinthians', 'Palmeiras',   'S')

insert into joga values ('Santos',      'Sao Paulo',    'N')

insert into joga val

# Consulta à estrutura do esquema por meio do Dicionário de Dados

* O dicionário de dados é o banco de dados que guarda informações sobre os esquemas presentes no banco de dados.
Ele é usado para a realização de meta operações sobre a base de dados; por exemplo, a interface gráfica de um cliente SQL, ou os campos que constituirão um formulário de dados.

In [19]:
cursorDML.execute('select * from user_tables')
print('Lista das tabelas no esquema do usuário que estabeleceu a conexão:')
for tupla in cursorDML:     
    print(tupla[0])

Lista das tabelas no esquema do usuário que estabeleceu a conexão:
DIRETOR
JOGA
JOGADOR
PARTIDA
POSICAO_JOGADOR
TIME
UNIFORME


* Aqui, define-se uma função para recuperar todas as chaves estrangeiras de uma dada tabela.

In [20]:
def printForeignKeys(sATableName = ''):
    SQL=('SELECT a.table_name, a.column_name, a.constraint_name, c.owner, c.r_owner, '
         'c_pk.table_name r_table_name, c_pk.constraint_name r_pk FROM user_cons_columns a '
         'JOIN user_constraints c ON a.owner = c.owner '
         'AND a.constraint_name = c.constraint_name '
         'JOIN user_constraints c_pk ON c.r_owner = c_pk.owner '
         'AND c.r_constraint_name = c_pk.constraint_name '
         'WHERE c.constraint_type = \'R\' ')
    if(sATableName != ''):
        SQL += 'AND a.table_name = \'' + sATableName.upper() +'\' '
    SQL += 'ORDER BY a.table_name, constraint_name'
    cursorTemp = conexao.cursor()
    cursorTemp.execute(SQL)
    for tupla in cursorTemp:
        print(tupla)
    cursorTemp.close()

In [22]:
printForeignKeys('')

('DIRETOR', 'TIMEDIR', 'FK_DIRETOR', 'M28804725800', 'M28804725800', 'TIME', 'PK_TIME')
('JOGA', 'TIME1', 'FK_JOGA1', 'M28804725800', 'M28804725800', 'TIME', 'PK_TIME')
('JOGA', 'TIME2', 'FK_JOGA2', 'M28804725800', 'M28804725800', 'TIME', 'PK_TIME')
('JOGADOR', 'TIME_ATUA', 'FK_JOGADOR', 'M28804725800', 'M28804725800', 'TIME', 'PK_TIME')
('PARTIDA', 'TIME1', 'FK_PARTIDA', 'M28804725800', 'M28804725800', 'JOGA', 'PK_JOGA')
('PARTIDA', 'TIME2', 'FK_PARTIDA', 'M28804725800', 'M28804725800', 'JOGA', 'PK_JOGA')
('POSICAO_JOGADOR', 'JOGADOR', 'FK_POSICAO_JOGADOR', 'M28804725800', 'M28804725800', 'JOGADOR', 'PK_JOGADOR')
('UNIFORME', 'TIMEUNIF', 'FK_UNIFORME', 'M28804725800', 'M28804725800', 'TIME', 'PK_TIME')


In [39]:
cursorDML.execute('select count(*) from uniforme')

for tupla in cursorDML:     
    print(tupla[0])

10


## Diagram da base de dados Futebol, usada neste notebook.

<img src="./EsquemaFutebol/EsquemaFutebol.png" width=350 height=350 />

* Inserindo alguns dados.

In [40]:
cursorDML.execute("insert into Time values ('Fortaleza', 'CE', 'profissional', 2)")
#usando binding variables
#reparar que a estrutura do SQL fica igual, porém logo após o () vem um arrei de mesma dimensão
cursorDML.execute("insert into Time values (:a,:b,:c,:d)",['Juventude', 'RS', 'profissional', 3])
cursorDML.execute("insert into Joga values (:a,:b,:c)",['Juventude', 'Fortaleza', 'N'])

* Se foi tudo ok, executar `commit()`; do contrário, executar `rollback()` para desfazer as atualizações

In [42]:
conexao.commit()

# Recuperando dados com cursores
* Pode-se requisitar os dados da base um por vez, o que consome mais recursos de rede e processamento no servidor; ou múltiplos por vez (batch), o que é mais eficiente. Se houver memória (ou poucos dados) pode-se recuperar todos os dados de uma vez.

* Um por vez

In [43]:
cursorDML.execute("select * from Joga where classico = \'N\'")
while True:
    row = cursorDML.fetchone()  #Um por vez
    if row is None: break
    print(row)

('Ibate', 'Paulistinha', 'N')
('Santos', 'Corinthians', 'N')
('Santos', 'Sao Paulo', 'N')
('Santos', 'Palmeiras', 'N')
('Juventude', 'Fortaleza', 'N')


* **Batch**: menos requisições de rede/transferência de dados; menos processamento no servidor

In [44]:
cursorDML.execute("select * from Time")
num_rows = 11
while True:
    rows = cursorDML.fetchmany(num_rows) #Muitos por vez
    if not rows: break
    for row in rows:
        print(row)
        
print()
print("Foram recuperadas "+str(cursorDML.rowcount) +" tuplas.")


('Sao Paulo', 'SP', 'profissional', 8)
('Palmeiras', 'SP', 'profissional', 5)
('Santos', 'SP', 'profissional', 0)
('Corinthians', 'SP', 'profissional', 6)
('Paulistinha', 'SP', 'amador', 1)
('Ibate', 'SP', 'amador', 0)
('Cruzeiro', 'MG', 'profissional', 2)
('Atletico', 'MG', 'profissional', 3)
('Frutal', 'MG', 'amador', 1)
('Fortaleza', 'CE', 'profissional', 2)
('Juventude', 'RS', 'profissional', 3)

Foram recuperadas 11 tuplas.


* É possível ainda recuperar meta-informações sobre as colunas retornadas por meio do cursor

In [45]:
for metadata in cursorDML.description:
    print(metadata)

('NOME', <cx_Oracle.DbType DB_TYPE_VARCHAR>, 40, 40, None, None, 0)
('ESTADO', <cx_Oracle.DbType DB_TYPE_CHAR>, 2, 2, None, None, 1)
('TIPO', <cx_Oracle.DbType DB_TYPE_VARCHAR>, 15, 15, None, None, 1)
('SALDO_GOLS', <cx_Oracle.DbType DB_TYPE_NUMBER>, 39, None, 38, 0, 1)


# Usando rowfactories

* Com rowfactories podemos processar os dados recuperados antes de retorná-los.

In [46]:
cursorDML.execute("select * from jogador")
columns = [col[0] for col in cursorDML.description] # array com os nomes dos atributos

cursorDML.rowfactory = lambda *args: zip(columns, args) #aplica lambda para cada tupla


In [47]:
#a operação de fetch garante a execução do lambda
data = cursorDML.fetchone()
#o resultado é o produto cartesiano {nomes do atributos} x {valores dos atributos}
print(list(data))

[('RG', '111111111'), ('NOME', 'Pele'), ('DATA_NASCIMENTO', datetime.datetime(1955, 5, 15, 0, 0)), ('NATURALIDADE', 'Santos'), ('TIME_ATUA', 'Santos')]


# Lendo dados do banco diretamente para um Pandas Dataframe

* Uma consulta SQL.

In [48]:
meuDataFrame = pd.read_sql_query('SELECT * FROM Partida', con=conexao)

In [49]:
meuDataFrame

Unnamed: 0,TIME1,TIME2,DATA,LOCAL,PLACAR
0,Sao Paulo,Palmeiras,2007-05-15,Morumbi,2x0
1,Santos,Sao Paulo,2007-05-20,Pacaembu,1x0
2,Santos,Palmeiras,2007-06-06,Vila Belmiro,0x1
3,Ibate,Paulistinha,2007-05-25,Luizao,0x0
4,Santos,Corinthians,2007-05-30,Pacaembu,0x0


* ATENCAO: um dataframe nada mais é do que `uma tabela (uma relação) em memória`. Muito provavelmente, ler uma tabela inteira em memória vai causar problemas de falta de memória e de processamento. Uma solução é ler pedaços (**chunks**) da tabela em um objeto do tipo **generator**;

* Pode-se fazer isso usando-se uma variação do `read_sql_query()` com o parâmetro `chunksize` fornecido. Neste caso, o resultado não será um DataFrame, mas sim um objeto do tipo **generator**, o qual fornecerá um DataFrame com chunksize elementos a cada iteração;

* **Generator**: uma abstração de dados que permite a geração de resultados/produtos sequencialmente, os quais são produzidos mediante requisição. Após um generator ser consumido, ele precisa ser regerado.

In [50]:
dataFrameGenerator = pd.read_sql_query('SELECT * FROM Time', con=conexao, chunksize=3)
type(dataFrameGenerator)

generator

In [51]:
for i, dataFrameChunk in enumerate(dataFrameGenerator):
    print('-'*10)
    print("Chunk "+str(i))
    print(dataFrameChunk)


----------
Chunk 0
        NOME ESTADO          TIPO  SALDO_GOLS
0  Sao Paulo     SP  profissional           8
1  Palmeiras     SP  profissional           5
2     Santos     SP  profissional           0
----------
Chunk 1
          NOME ESTADO          TIPO  SALDO_GOLS
0  Corinthians     SP  profissional           6
1  Paulistinha     SP        amador           1
2        Ibate     SP        amador           0
----------
Chunk 2
       NOME ESTADO          TIPO  SALDO_GOLS
0  Cruzeiro     MG  profissional           2
1  Atletico     MG  profissional           3
2    Frutal     MG        amador           1
----------
Chunk 3
        NOME ESTADO          TIPO  SALDO_GOLS
0  Fortaleza     CE  profissional           2
1  Juventude     RS  profissional           3


* Com o processamento em chunks é possível executar processamento pedaço por pedaço do dataset.

In [52]:
total_de_gols = 0
dataFrameGenerator = pd.read_sql_query('SELECT * FROM time', con=conexao, chunksize=3)

In [53]:
for i, dataFrameChunk in enumerate(dataFrameGenerator):
    total_chunk = dataFrameChunk['SALDO_GOLS'].sum()
    print('Total de gols do ' + str(i) + '-esimo chunk: '+str(total_chunk))
    total_de_gols += total_chunk
print("Total de gols da tabela inteira: " + str(total_de_gols))

Total de gols do 0-esimo chunk: 13
Total de gols do 1-esimo chunk: 7
Total de gols do 2-esimo chunk: 6
Total de gols do 3-esimo chunk: 5
Total de gols da tabela inteira: 31


* ATENCAO: o processamento de agregações é muito eficiente em SGBDs relacionais. Trazer dados do database para a memória, computar, e depois totalizar só é recomendado para tarefas que não podem, ou que são muito complexas, em um SGBD. Para o exemplo da totalização, basta um simples SQL:

In [54]:
for row in cursorDML.execute("Select SUM(saldo_gols) from time"): print(row[0])

31


# SQLAlchemy - SQL Expression Language

* O **SQLAlchemy** oferece um conjunto de ferramentas, o **SQL Expression Language**, que permitem executar SQL sem a necessidade de escrever código SQL;
* Documentação completa em https://docs.sqlalchemy.org/en/14/genindex.html#T

* Primeiro, cria-se um **engine** usando-se a string de conexão.

In [55]:
dsnStr

'(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=grad.icmc.usp.br)(PORT=15215))(CONNECT_DATA=(SID=orcl)))'

In [57]:
ora_engine = sqlalchemy.create_engine(f'oracle+cx_oracle://M28804725800:M28804725800@'+dsnStr, echo=True)
ora_engine.connect()

2021-07-15 02:20:56,636 INFO sqlalchemy.engine.Engine select sys_context( 'userenv', 'current_schema' ) from dual
2021-07-15 02:20:56,638 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-07-15 02:20:56,714 INFO sqlalchemy.engine.Engine select value from nls_session_parameters where parameter = 'NLS_NUMERIC_CHARACTERS'
2021-07-15 02:20:56,714 INFO sqlalchemy.engine.Engine [raw sql] {}


<sqlalchemy.engine.base.Connection at 0x7fd849be2d30>

* Em seguida, cria-se um **metaobjeto** com as informações da tabela onde se deseja realizar operações.
* No exemplo a seguir, o objeto jogador guardará em memória todas as informações que descrevem o esquema da relação Jogador, assim como suas restrições de integridade. O SQLAlchemy exibe o acesso feito ao dicionário de dados.

In [None]:
metadata = sqlalchemy.MetaData()
jogador = sqlalchemy.Table('jogador', metadata, autoload_with=ora_engine)

In [None]:
print(jogador.columns)

['jogador.rg', 'jogador.nome', 'jogador.data_nascimento', 'jogador.naturalidade', 'jogador.time_atua']


In [None]:
for constr in jogador.constraints:
    print(constr)
    print()

PrimaryKeyConstraint(Column('rg', VARCHAR(length=15), table=<jogador>, primary_key=True, nullable=False))

ForeignKeyConstraint(<sqlalchemy.sql.base.ColumnCollection object at 0x7f46145eeeb8>, None, name='fk_jogador', ondelete='SET NULL', link_to_name=True, table=Table('jogador', MetaData(bind=None), Column('rg', VARCHAR(length=15), table=<jogador>, primary_key=True, nullable=False), Column('nome', VARCHAR(length=40), table=<jogador>, nullable=False), Column('data_nascimento', DATE(), table=<jogador>), Column('naturalidade', VARCHAR(length=40), table=<jogador>), Column('time_atua', VARCHAR(length=40), ForeignKey('time.nome'), table=<jogador>), schema=None))



In [None]:
for constr in jogador.foreign_keys:
    print(constr)
    print()

ForeignKey('time.nome')



* A partir do meta objeto, é possível fazer requisições que montam SQL

In [None]:
insert_jogador = jogador.insert()

In [None]:
print(insert_jogador)

INSERT INTO jogador (rg, nome, data_nascimento, naturalidade, time_atua) VALUES (:rg, :nome, :data_nascimento, :naturalidade, :time_atua)


In [None]:
update_jogador = jogador.update()

In [None]:
print(update_jogador)

UPDATE jogador SET rg=:rg, nome=:nome, data_nascimento=:data_nascimento, naturalidade=:naturalidade, time_atua=:time_atua


In [None]:
delete_jogador = jogador.delete().where(jogador.c.time_atua=='Palmeiras')

In [None]:
delete_jogador.compile().params

{'time_atua_1': 'Palmeiras'}

In [None]:
result = ora_engine.execute(delete_jogador)

2021-06-29 18:15:36,908 INFO sqlalchemy.engine.base.Engine DELETE FROM jogador WHERE jogador.time_atua = :time_atua_1
2021-06-29 18:15:36,910 INFO sqlalchemy.engine.base.Engine {'time_atua_1': 'Palmeiras'}
2021-06-29 18:15:36,944 INFO sqlalchemy.engine.base.Engine COMMIT


* Para fazer a inserção, é necessário fornecer os valores que serão inseridos.

In [None]:
import datetime

In [None]:
insert_jogador = jogador.insert().values(rg='67891', nome='Edilson', data_nascimento=datetime.date(1980,9,20),
                                         naturalidade='Peruano',time_atua='Palmeiras')

In [None]:
print(insert_jogador)

INSERT INTO jogador (rg, nome, data_nascimento, naturalidade, time_atua) VALUES (:rg, :nome, :data_nascimento, :naturalidade, :time_atua)


* Os quais são processados com o comando `compile()`.

In [None]:
insert_jogador.compile().params

{'rg': '67891',
 'nome': 'Edilson',
 'data_nascimento': datetime.date(1980, 9, 20),
 'naturalidade': 'Peruano',
 'time_atua': 'Palmeiras'}

* E, finalmente, a execução.

In [None]:
result = ora_engine.execute(insert_jogador)

2021-06-29 18:17:41,686 INFO sqlalchemy.engine.base.Engine INSERT INTO jogador (rg, nome, data_nascimento, naturalidade, time_atua) VALUES (:rg, :nome, :data_nascimento, :naturalidade, :time_atua)
2021-06-29 18:17:41,689 INFO sqlalchemy.engine.base.Engine {'rg': '67891', 'nome': 'Edilson', 'data_nascimento': datetime.date(1980, 9, 20), 'naturalidade': 'Peruano', 'time_atua': 'Palmeiras'}
2021-06-29 18:17:41,719 INFO sqlalchemy.engine.base.Engine COMMIT


* O resultado mostra meta informações a respeito da operação.

In [None]:
result.inserted_primary_key

['67891']

In [59]:
conexao.commit()

InterfaceError: not connected

In [58]:
conexao.close()