In [1]:
import requests
import json
import os

# mettre son appID et sa key puis supprimer.
appid = "a5b42571"
key = os.environ.get("api_adzuna")
url = "http://api.adzuna.com/v1/api/jobs/fr/categories?app_id={0}&app_key={1}&&content-type=application/json".format(appid, key)
req1 = requests.get(url).json()

categories = req1["results"]


tags = [] # j'initie cette liste qui récupèrera les tags qui correspondent aux catégories

for cat in categories:
    tags.append(cat["tag"]) # Pour chaque catégorie je récupère son tag

print(tags)

['accounting-finance-jobs', 'it-jobs', 'sales-jobs', 'customer-services-jobs', 'engineering-jobs', 'hr-jobs', 'healthcare-nursing-jobs', 'hospitality-catering-jobs', 'pr-advertising-marketing-jobs', 'logistics-warehouse-jobs', 'teaching-jobs', 'trade-construction-jobs', 'admin-jobs', 'legal-jobs', 'creative-design-jobs', 'graduate-jobs', 'retail-jobs', 'consultancy-jobs', 'manufacturing-jobs', 'scientific-qa-jobs', 'social-work-jobs', 'travel-jobs', 'energy-oil-gas-jobs', 'property-jobs', 'charity-voluntary-jobs', 'domestic-help-cleaning-jobs', 'maintenance-jobs', 'part-time-jobs', 'other-general-jobs', 'unknown']


Connexion aux bases de données

In [2]:
from sqlalchemy import create_engine, MetaData, text
from pymongo import MongoClient

### MYSQL
username = "ff"
password = os.environ.get("mysql_ff_jobmarket")
hostname = os.environ.get("ip_jobmarket")
port = "3306"
database_name = "jobmarket"

db_url = f"mysql+mysqlconnector://{username}:{password}@{hostname}:{port}/{database_name}"
metadata = MetaData()
engine = create_engine(db_url)

### MONGODB
client = MongoClient(
    host= os.environ.get("ip_jobmarket"),
    port = 27017,
    username = "ff",
    password = os.environ.get("mysql_ff_jobmarket")
    )

Création de la base de données sur SQL

In [3]:
with engine.connect() as connection:
    connection.execute(text("CREATE TABLE POSTES (id varchar(255) PRIMARY KEY,nom_poste varchar(255),date_creation datetime,ville varchar(255),pays varchar(255),url varchar(255),nom_entreprise varchar(255),salaire int,type_contrat varchar(255),categorie varchar(255),latitude float,longitude float);"))
    

Création de la base de données sur MONGODB

In [4]:
job = client["job"] #création de la base de données
descriptions = job.create_collection(name="descriptions") # Création de la collection
print(client["job"].list_collection_names())

['descriptions']


Journalisation et JSON

In [5]:
from datetime import datetime

def append_log_file(log_message, file_name):
        
    # Define the log directory and file name
    log_dir = "Logs"
    log_file_name = file_name
    log_file_path = os.path.join(log_dir, log_file_name)
    
    # Create the log directory if it doesn't exist
    os.makedirs(log_dir, exist_ok=True)
    
    # Append the log message to the file
    with open(log_file_path, 'a') as log_file:
        log_file.write(log_message + '\n')

def results_to_json(results, source):
        
    current_date = datetime.now().strftime("%Y-%m-%d")
    # Define the log directory and file name
    log_dir = "JSON"
    log_file_name = source + str(current_date) + ".json"
    log_file_path = os.path.join(log_dir, log_file_name)
    
    # Create the log directory if it doesn't exist
    os.makedirs(log_dir, exist_ok=True)
    
    # Append the log message to the file
    with open(log_file_path, 'w') as f:
        json.dump(results, f)

Récupération des données

In [6]:
def adzuna_to_json(countries, first_page, last_page):
    # This function take a list of different countries and the first and last page we wan to extract.
    results, success, failed = [], [], [] # Initialize 3 lists, one for extraction, two others for log.
    for country in countries:
        for page in range(first_page, last_page):
            url = "https://api.adzuna.com/v1/api/jobs/{3}/search/{0}?app_id={1}&app_key={2}".format(page, appid, key, country)
            req = requests.get(url)
            if req.status_code == 200: # If we get a positive answer we store data to result
                try:
                    data = req.json()
                    results.append(data["results"])
                except:
                    failed.append("{} : {}".format(country, page))
                    print(req.status_code, " pour la page ", page, "du pays ", country)
                else:
                    success.append("{} : {}".format(country, page))

            else :
                print(req.status_code, " pour la page ", page, "du pays ", country)

    # Get the current date and write into log file
    current_date = datetime.now().strftime("%Y-%m-%d")
    append_log_file(f"{current_date};{len(success)};{len(failed)}","adzuna_to_json.csv")
    print(f"ADZUNA TO JSON \nSuccess : {len(success)}\nFailed : {len(failed)}")

    # write results to JSON for audit
    results_to_json(results, "adzuna")
    return results



Importation des données V2 Flat

In [7]:
from flatten_json import flatten

def json_to_database(json_data):
    success_insert, duplicate_insert, failed_insert = 0, 0, 0
    columns = ["id", "title", "created", "location_display_name", "location_area_0", "redirect_url", "company_display_name", "salary_is_predicted", "contract_type", "category_label", "latitude", "longitude"]

    job = client["job"] #Connexion to MongoDB
    with engine.connect() as connection: # Connexion to SQL
        for page in json_data:
            for annonce in page:
                annonce_flatten = flatten(annonce)
                
            
                date_creation = str(annonce['created']).replace("T", " ").replace("Z", "")


                text_columns = "("
                text_values = "("
                for column in columns:
                    if column in annonce_flatten.keys():
                        text_columns = f"{text_columns}{column}, "
                        if column == "created": #Si c'est une date je formate
                            text_values = f'{text_values}"{annonce_flatten[column].replace("T", " ").replace("Z", "")}", '
                        elif column == "salary_is_predicted" or column == "latitude" or column == "longitude": # Si c'est un chiffre je récupère seulement
                            text_values = f'{text_values}{annonce_flatten[column]}, '
                        else: # Si c'est du texte, j'ajoute des double quotes
                            text_values = f'{text_values}"{annonce_flatten[column]}", '

                text_columns = text_columns[:-2] + ")"
                text_values = text_values[:-2] + ")"

                replacement = {"title" : "nom_poste", "created" : "date_creation", "location_display_name" : "ville", "location_area_0" : "pays", "redirect_url" : "url", "company_display_name" : "nom_entreprise", "salary_is_predicted" : "salaire", "contract_type" : "type_contrat", "category_label" : "categorie"}
                for key, value in replacement.items():
                    text_columns = text_columns.replace(key, value)

                request_text = f"INSERT INTO POSTES {text_columns} VALUES {text_values}"

                clean_description = annonce['description'].replace('"', " ")
                mongodb_value = r'{"id":' + str(annonce['id']) + ', "description":"' + clean_description + '"}'
                mongodb_json = json.loads(mongodb_value)
                


                # On ajoute la ligne à la base de données
                try:
                    connection.execute(text(request_text))
                except:
                    duplicate_insert += 1
                else:
                    success_insert += 1
                    job.descriptions.insert_one(mongodb_json) # Si ce n'est pas un doublon, alors on importe l'id et la description dans MONGODB

            
        connection.commit()
        # Get the current date and write into log file
        current_date = datetime.now().strftime("%Y-%m-%d")
        append_log_file(f"{current_date};{success_insert};{duplicate_insert};{failed_insert}","json_to_database.csv")     
        print(f"\nJSON TO DATABASE \nSuccess : {success_insert}\nDuplucate : {duplicate_insert}\nFailed : {failed_insert}")


Fonction main pour l'ETL

In [8]:
json_to_database(adzuna_to_json(['fr'], 1, 72))

ADZUNA TO JSON 
Success : 71
Failed : 0

JSON TO DATABASE 
Success : 710
Duplucate : 0
Failed : 0
