# ETL Process

This application fetches real estate listings from a webpage, tidies up the data, and stores it in an Amazon S3 bucket. 

It uses Python libraries like requests and BeautifulSoup for web scraping, and Boto3 for interacting with S3. 

Additionally, it's orchestrated by Apache Airflow, which schedules and manages the entire ETL process.

This setup automates the extraction, transformation, and loading of real estate data, making it easy to keep the listings up-to-date and accessible.

## Summary of the ETL Process Application:

### Data Retrieval:
- Utilizes requests library to send HTTP requests and BeautifulSoup library to parse HTML content of a webpage.
- Scrapes specific data from the webpage and saves it to a pandas dataframe.

### Data Preprocessing and Cleaning:
- Defines functions to extract specific details from the dataframe and clean the data.
- Extracts details like area, number of bedrooms, suites, parking spaces, etc.
- Cleans and preprocesses price data.

### Loading Data to Amazon S3:
- Loads AWS configuration from a JSON file.
- Uploads a CSV string to an S3 bucket using Boto3 S3 client.

### DAG and Task Dependencies:
- Sets up a Directed Acyclic Graph (DAG) named "automated_etl_process" using Airflow.
- DAG is scheduled to run daily.
- Defines three tasks for data extraction, transformation, and loading, represented by PythonOperator.
- Establishes task dependencies where the extraction task precedes transformation, and transformation precedes loading.


## 1. Data Retrieval

The following script is used to retrieve data from a webpage. It utilizes the requests library to send HTTP requests and the BeautifulSoup library to parse the HTML content of the webpage. Only the data of interest was saved to a pandas dataframe.

In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd

In [None]:
import requests
from bs4 import BeautifulSoup

def extract_data(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }

    page = 1
    data = []

    while True:
        full_url = f'{url}?pagina={page}'
        response = requests.get(full_url, headers=headers)
        
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            cards = soup.find_all('a', class_='new-card')

            if not cards:
                print("No more pages found or no cards on the page.")
                break

            for card in cards:
                title = card.find('h2', class_='new-title phrase').get_text(strip=True) if card.find('h2', class_='new-title phrase') else None
                description = card.find('h3', class_='new-desc phrase').get_text(strip=True) if card.find('h3', class_='new-desc phrase') else None
                simple_description = card.find('h3', class_='new-simple phrase').get_text(strip=True) if card.find('h3', class_='new-simple phrase') else None
                price = card.find('div', class_='new-price').get_text(strip=True) if card.find('div', class_='new-price') else None
                details = [li.get_text(strip=True) for li in card.find_all('li')] if card.find_all('li') else []
                additional_info = card.find('div', class_='new-text phrase').get_text(strip=True) if card.find('div', class_='new-text phrase') else None
                agency = card.find('img', alt=True)['alt'] if card.find('img', alt=True) else None
                creci = card.find('div', class_='creci').get_text(strip=True) if card.find('div', class_='creci') else None

                data.append([title, description, simple_description, price, details, additional_info, agency, creci])

            page += 1
        else:
            print(f"Failed to retrieve the webpage. Status Code: {response.status_code}")
            break

    return data



## 2. Data Preprocessing and Cleaning

This code defines the transform_data function, on which we have the functions extract_details(), extract_price() and extract_numbers.  to extract specific details and do some data cleaning.

In [None]:
import re

In [None]:
def transform_data(df):
    def extract_details(details):
        area = None
        quartos = None
        suites = None
        vagas = None
        outros = None

        for detail in details:
            if 'm²' in detail:
                area = detail
            elif 'quarto' in detail.lower():
                quartos = detail
            elif 'suíte' in detail.lower():
                suites = detail
            elif 'vaga' in detail.lower():
                vagas = detail
            else:
                outros = detail

        return area, quartos, suites, vagas, outros

    df['area'], df['quartos'], df['suites'], df['vagas'], df['outros'] = zip(*df['Details'].apply(extract_details))

    df.fillna('na', inplace=True)

    df.drop(columns=['Details'], inplace=True)

    def extract_price(price_text):
        price = 'na'
        price_per_m2 = 'na'

        # Check if 'R$' appears twice in the text
        if price_text.count('R$') == 2:
            prices = re.findall(r'R\$(.*?)R\$(.*?)$', price_text)
            price = 'R$' + prices[0][0].strip()
            price_per_m2 = 'R$' + prices[0][1].strip()
        else:
            price = price_text

        return price, price_per_m2

    df['price'], df['price_per_m2'] = zip(*df['Price'].apply(extract_price))

    df.drop(columns=['Price'], inplace=True)

    def extract_numbers(text):
        numbers = re.findall(r'\d+\.?\d*', text)
        return ''.join(numbers)

    df['price'] = df['price'].apply(lambda x: extract_numbers(x.replace('R$', '').replace('Valor', '').replace('m²', '')))
    df['price_per_m2'] = df['price_per_m2'].apply(lambda x: extract_numbers(x.replace('R$', '').replace('Valor', '').replace('m²', '')))

    df['price'] = df['price'].replace('', 'na')
    df['price_per_m2'] = df['price_per_m2'].replace('', 'na')

    return df


## 3. Loading Data to Amazon S3 

Now we load AWS configuration from a JSON file named `config.json`, then uses it to upload a CSV string to an S3 bucket.

In [None]:
import boto3
import json
import pandas as pd

def load_data():
    #AWS configuration
    with open('config.json') as f:
        config = json.load(f)

    #Boto3 S3 client
    s3_client = boto3.client('s3', 
                             aws_access_key_id=config['aws_access_key_id'],
                             aws_secret_access_key=config['aws_secret_access_key'],
                             region_name=config['region_name'])

    # Define the target S3 bucket and key (file name)
    bucket_name = config['bucket_name']
    key = 'target_destination.csv'

    #convert DataFrame 
    csv_buffer = df.to_csv(index=False)

    #Upload
    s3_client.put_object(Bucket=bucket_name, Key=key, Body=csv_buffer)


## 3.DAG and Task Dependencies

This code sets up a DAG named "automated_etl_process" with default arguments specifying the owner, start date, retries, and retry delay. It schedules the DAG to run daily. Three tasks are defined for data extraction, transformation, and loading, each represented by a PythonOperator. Task dependencies are established where the data extraction task precedes the transformation task, and the transformation task precedes the loading task.




In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2022, 3, 7),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#Define the DAG
dag = DAG(
    'automated_etl_process',
    default_args=default_args,
    description='Automated ETL Process',
    schedule_interval='@daily',  # Run the DAG daily
)

#Define Tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

#Define task dependencies
extract_task >> transform_task >> load_task
