## **Lab4: Caminhando do .CSV até o Apache Spark**

![image.png](attachment:image.png)

O docker permite a criação de máquinas virtuais leves, os dockers. Posso criar uma máquina local ou um ambiente em nuvem. 

**Containers x Images**

A imagem é o "template", diz o que vai estar dentro do container. Ao criar a imagem posso dizer o SO que quero, qual a aplicação, de onde vou buscá-la, a linguagem, etc. 
A partir da imagem,  posso criar containers.

![image.png](attachment:image.png)

**Volumes** 

São áreas em que podemos armazenar um grande volume de dados que aponta para um disco externo.

![image.png](attachment:image-2.png)

Se eu quiser rodar um SGBD (PostgreSQL) pelo Docker, eu posso usar uma imagem que já foi criada dele, disponível no DockerHub. 

É só rodar docker pull postgres no prompt de comando.

A minha máquina física não tem o Postgres, mas ela vai ficar "Ouvindo" a porta 5432. Quando chegar algo, ela vai mandar pro docker.

**Inicializa um container docker chamado "dsa" com SGBD PostgreSQL, user "postgres" e senha "senha123".**
- docker run --name dsa -e POSTGRES_PASSWORD=senha123 -p 5432:5432 -d postgres

![image.png](attachment:image-3.png)
![image.png](attachment:image-4.png)
![image.png](attachment:image-5.png)


### **Lab4: Manipulação de dados**

In [107]:
# Carregando os pacotes
import findspark
findspark.init()

In [108]:
# Instalamos novas bibs e vamos carregá-las aqui
import psycopg2                                             # conjunto de bibliotecas que faz a comunicação entre python e postgre. é o driver de conexão
import pandasql                                             # usado pra extrair dados do postgre 
import sqlalchemy                                           # facilita a conexão com o postgre, permite executar queries
import pandas as pd
from pandasql import sqldf                              
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT  # define o nivel de isolamente, o commit. toda query executada vai ter o commit automatico. o commit é salvar o que foi eito no banco de dados 
from sqlalchemy import create_engine                        # cria o motor de conexão
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

Vamos começar:

#### Carregando os dados com pandas

In [109]:
df1 = pd.read_csv('C:\DS\DSA-FCD\BigData-Python-Spark\Cap12\Lab4\dados\dataset1.csv', index_col=False)

In [110]:
df1.shape

(17588, 2)

In [111]:
df1.dtypes

Name    object
url     object
dtype: object

In [112]:
df1.head()

Unnamed: 0,Name,url
0,Cristiano Ronaldo,/player/20801/cristiano-ronaldo/
1,Lionel Messi,/player/158023/lionel-messi/
2,Neymar,/player/190871/neymar/
3,Luis Suárez,/player/176580/luis-su%C3%A1rez/
4,Manuel Neuer,/player/167495/manuel-neuer/


In [113]:
df2 = pd.read_csv('C:\DS\DSA-FCD\BigData-Python-Spark\Cap12\Lab4\dados\dataset2.csv', index_col=False)

In [114]:
df3 = pd.read_csv('C:\DS\DSA-FCD\BigData-Python-Spark\Cap12\Lab4\dados\dataset3.csv', index_col=False)

In [115]:
df4 = pd.read_csv('C:\DS\DSA-FCD\BigData-Python-Spark\Cap12\Lab4\dados\dataset4.csv', index_col=False)

#### Conectando ao SGBD PostgreSQL no Docker

O Postgre é apenas o container do software. Eu não tenho o banco de dados em si, mas o Postgre me permite gerenciar um banco. Por isso, vamos criar a conexão primeiro.

*Para criar a imagem, rodei o comando docker run --name dsa -e POSTGRES_PASSWORD=dsa123 -p 5432:5432 -d postgres*

In [158]:
# Criando a conexão:
# O pacote psycopg2 com o método .connect() permitem criar a conexão.
pgconn = psycopg2.connect(host = 'localhost',   # > Indica a máquina em que está o SGBD. Como o nosso é local, use 'localhost'
                          user = 'postgres',    # > esse user e senha vieram do comando que rodamos acima.
                          password = 'senha123')

In [159]:
# Criando o cursor
# O cursor é uma ferramenta que percorre bancos de dados
pgcursor = pgconn.cursor()

In [160]:
pgcursor

<cursor object at 0x000001A8247CF4C0; closed: 0>

In [119]:
# Define o nível de isolamento para executar queries SQL no SGBD
# Estou dizendo ao SGBD que qualquer coisa que executar deve ser "auto-salva". 
# Qualquer comando será salvo, isso significa que a tabela não será lockada.
pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [120]:
#pgconn.reset()

In [121]:
# Vamos chamar o cursor, executando um drop do banco de dados (caso ele exista)
pgcursor.execute('DROP DATABASE IF EXISTS db')

ObjectInUse: database "dbdsa" is being accessed by other users
DETAIL:  There is 1 other session using the database.


In [None]:
# Cria o banco de dados
pgcursor.execute('CREATE DATABASE db')

In [122]:
# Fecha a conexão
pgconn.close()

Agora, o banco foi criado. Vamos conectar a ele a seguir.

#### **Conectando ao Banco de Dados SGBD PostgreSQL no Docker**

In [147]:
# cria a conexão
# Igual acima, mas agora eu tenho um database e vou conectar a ele.
pgconn = psycopg2.connect(host = 'localhost',   # > Indica a máquina em que está o SGBD. Como o nosso é local, use 'localhost'
                          database = 'db',    # > O db que criamos
                          user = 'postgres',    # > esse user e senha vieram do comando que rodamos acima.
                          password = 'senha123')

#### **Criando engine SQLAlchemy de Conexão ao PostgreSQL no Docker**

O SQLAlchemy permite ganhar muita eficiência e simplicidade no processo de manipulação do banco de dados. Ele facilita muito a nossa vida.

In [148]:
# Cria o engine
engine = create_engine('postgresql+psycopg2://postgres:dsa123@localhost/db')

# psycopg2 é o driver que usamos anteriormente....

#### **Carregando Dados de Dataframes do Pandas no Banco de Dados PostgreSQL**

Fonte: https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2

Com a engine criada, posso começar a manipular os dados. Abaixo, vou salvar o primeiro dataframe que carregamos na tabela do PostgreSQL.

Se a tabela já existir, será sobrescrita.

A facilidade pra isso é o que torna o SQLAlchemy tão visado.

Nessa linha de código abaixo, ele executa a função de create/inserção da tabela sem que precisássemos colocar as instruções SQL.

In [125]:
df1.to_sql('tabela_df1', engine, if_exists= 'replace', index=False)

588

In [126]:
# Salvando o dataframe na tabela do PostgreSQL 
# Se a tabela já existir será sobrescrita
df2.to_sql('tabela_df2', engine, if_exists = 'replace', index = False)

633

In [127]:
# Salvando o dataframe na tabela do PostgreSQL 
# Se a tabela já existir será sobrescrita
df3.to_sql('tabela_df3', engine, if_exists = 'replace', index = False)

47

In [128]:
# Salvando o dataframe na tabela do PostgreSQL 
# Se a tabela já existir será sobrescrita
df4.to_sql('tabela_df4', engine, if_exists = 'replace', index = False)

340

Vamos acessar direto o SGBD agora, para verificar se as tabelas estão mesmo lá usando o pgAdmin, uma ferramenta que permite acesso direto ao banco de dados.

Observação: Problemas com o PGAdmin4.

Tive sérios problemas com a ferramenta. Por muito tempo, a única coisa que apareceu pra mim foi o ambiente "SERVER" na tela inicial, sem nenhum dado dentro dele.

![image.png](attachment:image.png)

Foi preciso configurar o banco de dados manualmente. A configuração pelo Docker e Python não funcionou.

![image.png](attachment:image-2.png)

- Nome: local
- Hostname/Address: localhost
- Port: 5432 (essa é a que o docker fica "ouvindo")
- User: postgres
- Password: senha123

![image.png](attachment:image-3.png)

continuando....

#### **Extraindo dados do PostgreSQL com o PandasSQL no Python**

Mais um problema, agora com o SQL Alchemy:

|| Na 1ªtentativa de rodar a query do bloco abaixo, recebi o erro: 
*AttributeError: 'OptionEngine' object has no attribute 'execute'*

Descobri que isso pode ter sido causado pelo SQL Alchemy ter perdido a função execute após a versão 2.0. A minha era 2.0.10 quando recebi esse erro:

![image.png](attachment:image.png)

A solução foi desinstalar o pacote e reinstalar a versão anterior à 2.0. Instalei a 1.4.47.

![image.png](attachment:image-2.png)

E aí, mais um erro:

    UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy

Resolvi assim:

- 1o. Cria um objeto em texto com a query a ser usada
- 2o. Usa o método .connect() pra abrir a conexão na engine
- 3o. na função read_sql do pandas, passa a query e a conexão da engine

In [167]:
# create an SQL expression using the text() function
stmt = text("SELECT * FROM tabela_df1")

# connect() method of the engine to open a new connection.
conn = engine.connect()

In [170]:
# Query de consulta aos metadados para obter detalhes de uma tabela
pd.read_sql_query(text('''select ordinal_position, column_name, data_type  
                     from information_schema.columns 
                     where table_name = 'tabela_df4'
                     '''), 
                  con=conn).head(10)

Unnamed: 0,ordinal_position,column_name,data_type
0,53,GK_Reflexes,bigint
1,41,Heading,bigint
2,42,Shot_Power,bigint
3,43,Finishing,bigint
4,44,Long_Shots,bigint
5,45,Curve,bigint
6,46,Freekick_Accuracy,bigint
7,47,Penalties,bigint
8,48,Volleys,bigint
9,49,GK_Positioning,bigint


#### **Carregando dados do PostgreSQL num database do Pandas**

In [173]:
df_tabela_df4 = pd.read_sql_query(text('SELECT * FROM tabela_df4'), con=conn)

In [178]:
print(df_tabela_df4.shape)

(17588, 53)


In [185]:
select_df = pd.read_sql('tabela_df4', 
                        con=conn, 
                        columns = ['Name', 'Age' ,'Speed', 'Height', 'Weight'])

#### **Consultas SQL em dataframes do Pandas**

O Pandas também permite o uso da linguagem SQL em seus dataframes através do pacote sqldf

Aqui, optamos pela criação de uma função lambda denominada pysqldf para passarmos as queries:

In [206]:
from pandasql import sqldf

In [199]:
pysqldf = lambda x: sqldf(x, globals())
type(pysqldf)

function

In [200]:
# Query
query = "SELECT * FROM select_df"

Encontrei um erro que não consegui resolver quando executei a query:

`executing.executing.NotOneValueFound: Expected one value, found 0`

Tentei resolvê-lo e não consegui. Parti pro método mais simples do pandas, o df.query()

In [208]:
#pysqldf(query)

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "C:\Users\andre\AppData\Roaming\Python\Python310\site-packages\IPython\core\interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\andre\AppData\Local\Temp\ipykernel_6740\646810089.py", line 1, in <module>
    pysqldf(query)
  File "C:\Users\andre\AppData\Local\Temp\ipykernel_6740\2558927797.py", line 1, in <lambda>
    pysqldf = lambda x: sqldf(x, globals())
  File "c:\Users\andre\AppData\Local\Programs\Python\Python310\lib\site-packages\pandasql\sqldf.py", line 156, in sqldf
    return PandaSQL(db_uri)(query, env)
  File "c:\Users\andre\AppData\Local\Programs\Python\Python310\lib\site-packages\pandasql\sqldf.py", line 26, in __init__
    self.engine = create_engine(db_uri, poolclass=NullPool)
  File "<string>", line 2, in create_engine
  File "c:\Users\andre\AppData\Local\Programs\Python\Python310\lib\site-packages\sqlalchemy\util\deprecations.py", line 283, in warned
    )
  File "

In [205]:
# print the result
select_df.query('Age > 20')

Unnamed: 0,Name,Age,Speed,Height,Weight
0,Cristiano Ronaldo,32,92,185 cm,80 kg
1,Lionel Messi,29,87,170 cm,72 kg
2,Neymar,25,90,174 cm,68 kg
3,Luis Suárez,30,77,182 cm,85 kg
4,Manuel Neuer,31,61,193 cm,92 kg
...,...,...,...,...,...
17571,Gavin Dowling,38,28,185 cm,80 kg
17572,Tommy Käßemodel,28,22,173 cm,75 kg
17576,Sam Hornby,22,46,188 cm,80 kg
17586,Mark Foden,21,40,180 cm,80 kg


#### **Carregando dados do PostgreSQL em dataframse do Apache Spark**

O Spark permite o uso de um cluster de computadores. Isso é útil especialmente quando o volume de dados é gigante, e o pandas não o comporta. Por isso, vamos experimentá-lo agora.

Primeiro, vamos criar a sessão do spark.

In [209]:
# Cria a sessão
spark = SparkSession.builder.master('local').appName('Lab4').getOrCreate()

Na sequência, vamos executar uma query SQL:

In [216]:
conn = engine.connect()

In [217]:
df = pd.read_sql_query(text('SELECT * FROM tabela_df4'), con=conn)

In [218]:
df.head()

Unnamed: 0,Name,Nationality,National_Position,National_Kit,Club,Club_Position,Club_Kit,Club_Joining,Contract_Expiry,Rating,...,Long_Shots,Curve,Freekick_Accuracy,Penalties,Volleys,GK_Positioning,GK_Diving,GK_Kicking,GK_Handling,GK_Reflexes
0,Cristiano Ronaldo,Portugal,LS,7.0,Real Madrid,LW,7.0,07/01/2009,2021.0,94,...,90,81,76,85,88,14,7,15,11,11
1,Lionel Messi,Argentina,RW,10.0,FC Barcelona,RW,10.0,07/01/2004,2018.0,93,...,88,89,90,74,85,14,6,15,11,8
2,Neymar,Brazil,LW,10.0,FC Barcelona,LW,11.0,07/01/2013,2021.0,92,...,77,79,84,81,83,15,9,15,9,11
3,Luis Suárez,Uruguay,LS,9.0,FC Barcelona,ST,9.0,07/11/2014,2021.0,92,...,86,86,84,85,88,33,27,31,25,37
4,Manuel Neuer,Germany,GK,1.0,FC Bayern,GK,1.0,07/01/2011,2021.0,92,...,16,14,11,47,11,91,89,95,90,89


Primeiro eu precisei salvar o objeto que veio do pd.read_sql_query em um dataframe do pandas, pra depois poder passar isso pra um objeto Spark.

In [219]:
df_spark = spark.createDataFrame(df)

In [220]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- National_Position: string (nullable = true)
 |-- National_Kit: double (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club_Position: string (nullable = true)
 |-- Club_Kit: double (nullable = true)
 |-- Club_Joining: string (nullable = true)
 |-- Contract_Expiry: double (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Preffered_Foot: string (nullable = true)
 |-- Birth_Date: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Preffered_Position: string (nullable = true)
 |-- Work_Rate: string (nullable = true)
 |-- Weak_foot: long (nullable = true)
 |-- Skill_Moves: long (nullable = true)
 |-- Ball_Control: long (nullable = true)
 |-- Dribbling: long (nullable = true)
 |-- Marking: long (nullable = true)
 |-- Sliding_Tackle: long (nullable = true)
 |-- Standing_Tackle: long (nullable = true)


In [221]:
df_spark.show()

+------------------+-----------+-----------------+------------+---------------+-------------+--------+------------+---------------+------+------+------+--------------+----------+---+------------------+---------------+---------+-----------+------------+---------+-------+--------------+---------------+----------+---------+------------------+-------------+------+---------+--------+----------+---------+------------+-----+-------+--------+-------+-------+-------+-------+----------+---------+----------+-----+-----------------+---------+-------+--------------+---------+----------+-----------+-----------+
|              Name|Nationality|National_Position|National_Kit|           Club|Club_Position|Club_Kit|Club_Joining|Contract_Expiry|Rating|Height|Weight|Preffered_Foot|Birth_Date|Age|Preffered_Position|      Work_Rate|Weak_foot|Skill_Moves|Ball_Control|Dribbling|Marking|Sliding_Tackle|Standing_Tackle|Aggression|Reactions|Attacking_Position|Interceptions|Vision|Composure|Crossing|Short_Pass|Lon

No Spark eu não preciso necessariamente aplicar a sintaxe SQL pura. Há funções que substituem isso, como a .select()

In [222]:
df_select = df_spark.select('Name',
                              'Nationality',
                              'Club_Joining',
                              'Height',
                              'Weight',
                              'Age',
                              'Speed',
                              'Reactions')

Ou filtros

In [223]:
df_select.where((df_spark.Age > 25) & (df_spark.Speed > 80)).show()

+--------------------+-----------+------------+------+------+---+-----+---------+
|                Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|
+--------------------+-----------+------------+------+------+---+-----+---------+
|   Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|
|        Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|
|  Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|       88|
|         Gareth Bale|      Wales|  09/02/2013|183 cm| 74 kg| 27|   95|       87|
|      Jérôme Boateng|    Germany|  07/14/2011|192 cm| 90 kg| 28|   81|       84|
|         Eden Hazard|    Belgium|  07/01/2012|173 cm| 74 kg| 26|   87|       85|
|     Gonzalo Higuaín|  Argentina|  07/26/2016|184 cm| 92 kg| 29|   81|       86|
|       Sergio Agüero|  Argentina|  07/28/2011|173 cm| 70 kg| 28|   86|       88|
|   Antoine Griezmann|     France|  07/29/2014|176 cm| 67 kg| 26|   86|       90|
|          Marco

#### **User-Defined Functions (UDF)**

A maioria dos bancos  de  dados  tradicionais  oferece  suporte  a  Funções  Definidas  pelo  Usuário  e  as  UDFs  do Spark têm o mesmo conceito.

UDFs  são  usadas  para  estender  as  funções  do  framework  e  reutilizar  esta  função  em vários  DataFrames.  Por  exemplo,  se  você  quiser  converter  todas  as  primeiras  letras  de  uma palavra  de  uma  frase  em  maiúsculas,  os  recursos  internos  do  Spark  não  têm  essa  função, portanto, você pode criá-la como UDF e reutilizá-la conforme necessário em muitos Data Frames. As UDFs, uma vez criadas, podem ser reutilizadas em vários DataFrame e expressões SQL.

Por exemplo, o uso de um CASE WHEN para tranformar uma variável em categórica pode ser uma UDF. E é isso que vamos fazer agora:

In [224]:
# Função para criar faixa etária:
def cria_faixa_etaria(idade):
    if idade < 20:
        return '10-20'
    elif idade < 30:
        return '20-30'
    elif idade < 40:
        return '30-40'
    else:
        return '40+'

In [225]:
# Cria a UDF
udf_agegroup = udf(lambda z: cria_faixa_etaria(z))

In [226]:
# Aplica a UDF. Criei a coluna 'AgeGroup' e os resultados dela são criados a partir da udf_agegroup;
userDFAgeGroup = df_select.withColumn('AgeGroup', udf_agegroup('age'))

In [228]:
# Mostra o dataframe
userDFAgeGroup.show()

+------------------+-----------+------------+------+------+---+-----+---------+--------+
|              Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|AgeGroup|
+------------------+-----------+------------+------+------+---+-----+---------+--------+
| Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|   30-40|
|      Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|   20-30|
|            Neymar|     Brazil|  07/01/2013|174 cm| 68 kg| 25|   90|       88|   20-30|
|       Luis Suárez|    Uruguay|  07/11/2014|182 cm| 85 kg| 30|   77|       93|   30-40|
|      Manuel Neuer|    Germany|  07/01/2011|193 cm| 92 kg| 31|   61|       85|   30-40|
|            De Gea|      Spain|  07/01/2011|193 cm| 82 kg| 26|   56|       88|   20-30|
|Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|       88|   20-30|
|       Gareth Bale|      Wales|  09/02/2013|183 cm| 74 kg| 27|   95|       87|   20-30|
|Zlatan Ibrahimović| 

In [229]:
# Função para criar faixa de velocidade
def cria_faixa_veloc(speed):
    if speed < 50:
        return 'Baixa Velocidade'
    elif speed < 80:
        return 'Velocidade Média'
    else:
        return 'Alta Velocidade'

In [230]:
# Cria a UDF
udf_speedgroup = udf(lambda z: cria_faixa_veloc(z))

In [231]:
userDFAgeGroup.show()

+------------------+-----------+------------+------+------+---+-----+---------+--------+
|              Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|AgeGroup|
+------------------+-----------+------------+------+------+---+-----+---------+--------+
| Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|   30-40|
|      Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|   20-30|
|            Neymar|     Brazil|  07/01/2013|174 cm| 68 kg| 25|   90|       88|   20-30|
|       Luis Suárez|    Uruguay|  07/11/2014|182 cm| 85 kg| 30|   77|       93|   30-40|
|      Manuel Neuer|    Germany|  07/01/2011|193 cm| 92 kg| 31|   61|       85|   30-40|
|            De Gea|      Spain|  07/01/2011|193 cm| 82 kg| 26|   56|       88|   20-30|
|Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|       88|   20-30|
|       Gareth Bale|      Wales|  09/02/2013|183 cm| 74 kg| 27|   95|       87|   20-30|
|Zlatan Ibrahimović| 

In [None]:
# Encerra as conexões
pgconn.close()
engine.dispose()