In [1]:
import requests
import numpy as np
import pandas as pd

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id="etl_pipeline",

schedule_interval="0 0 * * *")

etl_task = PythonOperator(task_id="etl_task",

python_callable=etl,

dag=dag)

etl_task.set_upstream(wait_for_this_task)

In [2]:
df_vagas = pd.read_csv("../stone2/data/jobs.csv", sep=";") 
df_inscritos = pd.read_csv("../stone2/data/applications.csv", sep=";")

In [3]:
getCity = lambda lista: lista[-1].strip()
getJob = lambda lista: lista[-2].strip()

In [4]:
splitted_job = df_vagas.job_name.str.split("-")

df_vagas['cidade'] = [getCity(row) for row in splitted_job]
df_vagas['cargo'] = [getJob(row) if len(row) > 1 else None for row in splitted_job]

In [5]:

df_vagas.cidade.value_counts()

Salvador              2
Cascavel              2
São Bento do Sul      2
Porto Alegre          2
Campo Grande          2
                     ..
Cuiabá                1
Florianópolis         1
Primavera do Leste    1
Petrópolis            1
Campo Mourão          1
Name: cidade, Length: 271, dtype: int64

In [6]:

df_vagas.cargo.value_counts()

Consultor Comercial Externo     99
Consultor Comercial             68
Agente Stone                    66
                                11
BA                               4
S                                3
BA 2                             2
SEAL                             2
E                                2
2                                2
PE                               2
PB                               2
CE                               1
PMS                              1
ES                               1
C E                              1
Ji                               1
L                                1
Consultor Comercial  Externo     1
PBRN                             1
Name: cargo, dtype: int64

In [7]:
print(len(df_vagas))
print(len(df_vagas.job_id.unique()))

276
276


In [8]:
def cleanJob(col):
    reps = {
    'valores_validos': ['Consultor Comercial Externo',
                        'Consultor Comercial',
                        'Agente Stone'],
    'valor_reposicao': np.nan
    }
    col = col.replace('Consultor Comercial  Externo', 'Consultor Comercial Externo')
    col = col.where(col.isin(reps['valores_validos']), reps['valor_reposicao'])
    return col
    

In [9]:
df_vagas['cargo'] = cleanJob(df_vagas['cargo'])

In [10]:
df_vagas

Unnamed: 0,job_id,job_name,cidade,cargo
0,1679689,- Feira de Santana,Feira de Santana,
1,1210619,- Itabaiana,Itabaiana,
2,1623539,- Mossoró,Mossoró,
3,1216644,- Parnamirim,Parnamirim,
4,1723540,- Salvador,Salvador,
...,...,...,...,...
271,1728998,- Recife,Recife,
272,1192982,- Vale do São Francisco,Vale do São Francisco,
273,1170258,- Sobral,Sobral,
274,1245731,Teste Unico - Agente Stone - Consultor Comerci...,Porto Alegre,Consultor Comercial


In [11]:
df_inscritos.head()

Unnamed: 0,application_id,job_id,application_stage
0,2900991,1127750,interview 1
1,2447626,1359157,interview 1
2,2838222,1448786,offer
3,2281269,1084916,interview 1
4,2533105,1931363,applied


In [50]:
df_inscritosVagas = df_inscritos.merge(df_vagas, left_on='job_id', right_on='job_id').drop(columns=['job_name'])

In [51]:
def generateId(row):
    pkRow = f'{row["application_id"]}_{"".join(row["application_stage"].split(" "))}'
    return pkRow

df_inscritosVagas['id'] = df_inscritosVagas.apply(generateId, axis=1)

In [52]:
df_inscritosVagas.cargo.value_counts()

Consultor Comercial Externo    17264
Consultor Comercial             6952
Agente Stone                     926
Name: cargo, dtype: int64

In [53]:
df_inscritosVagas.head()

Unnamed: 0,application_id,job_id,application_stage,cidade,cargo
0,2900991,1127750,interview 1,Brasília,Agente Stone
1,2544859,1127750,interview 1,Brasília,Agente Stone
2,2401778,1127750,interview 1,Brasília,Agente Stone
3,2290793,1127750,interview 1,Brasília,Agente Stone
4,2011363,1127750,interview 1,Brasília,Agente Stone


In [54]:
df_inscritosVagas.shape

(28288, 5)

In [55]:
len(df_inscritosVagas['application_id'].unique())

27893

In [56]:
from configparser import ConfigParser
import psycopg2
import psycopg2.extras as psql_extras
import pandas as pd
from typing import Dict

host=localhost
database=houses
user=postgres
password=postgres

In [57]:
import sqlalchemy
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker

In [58]:
db = 'postgresql+psycopg2'
user = 'airflow'
pwd = 'airflow'
host = '0.0.0.0'
port = '5432'
db_name = 'airflow'

db_engine = create_engine(f"{db}://{user}:{pwd}@{host}:{port}/{db_name}")

In [59]:
db_engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fe8c3df88b0>

In [60]:
connection = db_engine.raw_connection()
cursor = connection.cursor()

In [61]:
df_inscritosVagas

Unnamed: 0,application_id,job_id,application_stage,cidade,cargo
0,2900991,1127750,interview 1,Brasília,Agente Stone
1,2544859,1127750,interview 1,Brasília,Agente Stone
2,2401778,1127750,interview 1,Brasília,Agente Stone
3,2290793,1127750,interview 1,Brasília,Agente Stone
4,2011363,1127750,interview 1,Brasília,Agente Stone
...,...,...,...,...,...
28283,2257231,1391033,interview 2,Primavera do Leste,Agente Stone
28284,2247124,1674192,interview 1,Concórdia,Consultor Comercial Externo
28285,2424231,1674192,interview 2,Concórdia,Consultor Comercial Externo
28286,2814160,1881895,offer,Itumbiara,Agente Stone


In [62]:
df_inscritosVagas.columns

Index(['application_id', 'job_id', 'application_stage', 'cidade', 'cargo'], dtype='object')

In [68]:
df_inscritosVagas.to_sql("stone", db_engine, if_exists='replace')

In [69]:
result_set = db_engine.execute("SELECT * FROM stone")  
for r in result_set:  
    print(r)

ltor Comercial Externo')
(28011, 2200531, 1061121, 'interview 1', 'São Bento do Sul', 'Consultor Comercial Externo')
(28012, 2137633, 1061121, 'interview 1', 'São Bento do Sul', 'Consultor Comercial Externo')
(28013, 2123463, 1061121, 'offer', 'São Bento do Sul', 'Consultor Comercial Externo')
(28014, 2678139, 1061121, 'applied', 'São Bento do Sul', 'Consultor Comercial Externo')
(28015, 2570856, 1061121, 'applied', 'São Bento do Sul', 'Consultor Comercial Externo')
(28016, 2758884, 1061121, 'applied', 'São Bento do Sul', 'Consultor Comercial Externo')
(28017, 2268216, 1061121, 'interview 2', 'São Bento do Sul', 'Consultor Comercial Externo')
(28018, 2687499, 1671174, 'applied', 'Marabá', 'Agente Stone')
(28019, 2502584, 1671174, 'applied', 'Marabá', 'Agente Stone')
(28020, 2322484, 1671174, 'offer', 'Marabá', 'Agente Stone')
(28021, 2915439, 1671174, 'applied', 'Marabá', 'Agente Stone')
(28022, 2205336, 1671174, 'interview 2', 'Marabá', 'Agente Stone')
(28023, 2136093, 1671174, 'appli

In [66]:
#db_engine.execute("CREATE TABLE IF NOT EXISTS stone (application_id text, job_id text, application_stage text, cidade text, cargo text)")  
db_engine.execute("DROP TABLE IF EXISTS stone")
db_engine.execute("CREATE TABLE stone (application_id text, job_id text, application_stage text, cidade text, cargo text)")

db_engine.execute("INSERT INTO stone (application_id, job_id, application_stage, cidade, cargo) VALUES ('xxxxxxx', 'yyyyyyy', 'interview n', 'Carangola', 'Agente Stone')")

# Read
result_set = db_engine.execute("SELECT * FROM stone")  
for r in result_set:  
    print(r)

# Update
db_engine.execute("UPDATE stone SET application_id='zzzzzzzz' WHERE cargo='Agente Stone'")

# Delete
#db_engine.execute("DELETE FROM films WHERE year='2016'")  


('xxxxxxx', 'yyyyyyy', 'interview n', 'Carangola', 'Agente Stone')


<sqlalchemy.engine.result.ResultProxy at 0x7fe8c3aa5e80>

In [37]:
db

AttributeError: 'Engine' object has no attribute 'tables'

In [26]:
from sqlalchemy import create_engine

# Create 
db_engine.execute("CREATE TABLE IF NOT EXISTS films (title text, director text, year text)")  
db_engine.execute("INSERT INTO films (title, director, year) VALUES ('Doctor Strange', 'Scott Derrickson', '2016')")

# Read
result_set = db.execute("SELECT * FROM films")  
for r in result_set:  
    print(r)

# Update
db.execute("UPDATE films SET title='Some2016Film' WHERE year='2016'")

# Delete
db.execute("DELETE FROM films WHERE year='2016'") 

AttributeError: 'str' object has no attribute 'execute'

In [None]:
statement = films.insert().values(title="Doctor Strange", director="Scott Derrickson", year="2016")  
conn.execute(statement)  