# EXTRACT-TRANSFORM-LOAD (ETL)

<img src = "https://i.ibb.co/h7XVhvy/image-2.png" style="width:700px;height:500px">

-------------------------------------------------------------------------------------------------------------------------------------------------- 

## EXTRACT

<img src = "https://i.ibb.co/SB3sm9L/image.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/mSZtQsq/image-2.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/pj7zcfL/image-3.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/6Zk9pbp/image-4.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/1Rx9wPn/image-5.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/SB3sm9L/image.png" style="width:700px;height:500px">

### Summary of Extract :

In [None]:
import os
import csv
import tempfile
from zipfile import ZipFile

import requests

# Settings
base_path = os.path.abspath(__file__ + "/../../")

# START - Paths for new February 2021 data available

# External website file url
source_url = "https://assets.datacamp.com/production/repositories/5899/datasets/66691278303f789ca4acd3c6406baa5fc6adaf28/PPR-ALL.zip"

# Source path where we want to save the .zip file downloaded from the website
source_path = f"{base_path}/data/source/downloaded_at=2021-02-01/PPR-ALL.zip"

# Raw path where we want to extract the new .csv data
raw_path = f"{base_path}/data/raw/downloaded_at=2021-02-01/ppr-all.csv"

# END - Paths for new February 2021 data available


def create_folder_if_not_exists(path):
    """
    Create a new folder if it doesn't exists
    """
    os.makedirs(os.path.dirname(path), exist_ok=True)


def download_snapshot():
    """
    Download the new dataset from the source
    """
    create_folder_if_not_exists(source_path)
    with open(source_path, "wb") as source_ppr:
        response = requests.get(source_url, verify=False)
        source_ppr.write(response.content)


def save_new_raw_data():
    """
    Save new raw data from the source
    """

    create_folder_if_not_exists(raw_path)
    with tempfile.TemporaryDirectory() as dirpath:
        with ZipFile(
            source_path,
            "r",
        ) as zipfile:
            names_list = zipfile.namelist()
            csv_file_path = zipfile.extract(names_list[0], path=dirpath)
            # Open the CSV file in read mode
            with open(csv_file_path, mode="r", encoding="windows-1252") as csv_file:
                reader = csv.DictReader(csv_file)

                row = next(reader)  # Get first row from reader
                print("[Extract] First row example:", row)

                # Open the CSV file in write mode
                with open(
                    raw_path,
                    mode="w",
                    encoding="windows-1252"
                ) as csv_file:
                    # Rename field names so they're ready for the next step
                    fieldnames = {
                        "Date of Sale (dd/mm/yyyy)": "date_of_sale",
                        "Address": "address",
                        "Postal Code": "postal_code",
                        "County": "county",
                        "Price (€)": "price",
                        "Description of Property": "description",
                    }
                    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
                    # Write headers as first line
                    writer.writerow(fieldnames)
                    for row in reader:
                        # Write all rows in file
                        writer.writerow(row)

# Main function called inside the execute.py script
def main():
    print("[Extract] Start")
    print("[Extract] Downloading snapshot")
    download_snapshot.main()
    save_new_raw_data.main()
    print(f"[Extract] Saving data from '{source_path}' to '{raw_path}'")
    import extract
    if __name__ == "__main__":
        extract.main()
    print(f"[Extract] End")

------------------------------------------------------------------------------------------------------------------------------

## TRANSFORM

<img src = "https://i.ibb.co/qWdC7V2/image.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/0952BXK/image-1.png" style="width700px;height:500px">

<img src = "https://i.ibb.co/7YmNb6B/image-2.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/bQmM6ZY/image-3.png" style="width:700px;height:500px">

In [None]:
from datetime import datetime

def transform_case(input_string):
    """
    Lowercase string fields
    """
    return input_string.lower()
  
def update_date_of_sale(date_input):
    """
    Updates date format from DD/MM/YYYY to YYYY-MM-DD
    """
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format

def update_price(price_input):
    """
    Returns price as an integer by removing:
    - "€" and "," symbol
    - Converting to float first then to integer
    """
    # Replace € with an empty string
    price_input = price_input.replace("€", "")
    # Replace comma with an empty string
    price_input = price_input.replace(",", "")
    # Convert to float
    price_input = float(price_input)
    # Return price_input as integer
    return int(price_input)
  
def update_description(description_input):
    """
    Simplifies the description field for future analysis. Returns:
    - "new" if string contains "new" substring
    - "second-hand" if string contains "second-hand" substring
    """
    description_input = transform_case(description_input)
    # Check description and return "new" or "second-hand"
    if "new" in description_input:
        return "new"
    elif "second-hand" in description_input:
        return "second-hand"
    return description_input

<img src = "https://i.ibb.co/7jq9bTc/image-4.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/HPYZHzD/image-5.png" style="width:700px;height:500px">

### Summary of Transform :

In [None]:
import os
import csv
from datetime import datetime

from common.tables import PprRawAll
from common.base import session
from sqlalchemy import text

# Settings
base_path = os.path.abspath(__file__ + "/../../")

# START - Paths for new February 2021 data available

# Raw path where we want to extract the new CSV data
raw_path = f"{base_path}/data/raw/downloaded_at=2021-02-01/ppr-all.csv"

# END - Paths for new February 2021 data available


def transform_case(input_string):
    """
    Lowercase string fields
    """
    return input_string.lower()


def update_date_of_sale(date_input):
    """
    Update date format from DD/MM/YYYY to YYYY-MM-DD
    """
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format


def update_description(description_input):
    """
    Simplify the description field for potentialy future analysis, just return:
    - "new" if string contains "new" substring
    - "second-hand" if string contains "second-hand" substring
    """
    description_input = transform_case(description_input)
    if "new" in description_input:
        return "new"
    elif "second-hand" in description_input:
        return "second-hand"
    return description_input


def update_price(price_input):
    """
    Return price as integer by removing:
    - "€" symbol
    - "," to convert the number into float first (e.g. from "€100,000.00" to "100000.00")
    """
    price_input = price_input.replace("€", "")
    price_input = float(price_input.replace(",", ""))
    return int(price_input)


def truncate_table():
    """
    Ensure that "ppr_raw_all" table is always in empty state before running any transformations.
    And primary key (id) restarts from 1.
    """
    session.execute(
        text("TRUNCATE TABLE ppr_raw_all;ALTER SEQUENCE ppr_raw_all_id_seq RESTART;")
    )
    session.commit()


def transform_new_data():
    """
    Apply all transformations for each row in the .csv file before saving it into database
    """
    with open(raw_path, mode="r", encoding="windows-1252") as csv_file:
        # Read the new CSV snapshot ready to be processed
        reader = csv.DictReader(csv_file)
        # Initialize an empty list for our PprRawAll objects
        ppr_raw_objects = []
        for row in reader:
            # Apply transformations and save as PprRawAll object
            ppr_raw_objects.append(
                PprRawAll(
                    date_of_sale=update_date_of_sale(row["date_of_sale"]),
                    address=transform_case(row["address"]),
                    postal_code=transform_case(row["postal_code"]),
                    county=transform_case(row["county"]),
                    price=update_price(row["price"]),
                    description=update_description(row["description"]),
                )
            )
        # Save all new processed objects and commit
        session.bulk_save_objects(ppr_raw_objects)
        session.commit()


def main():
    print("[Transform] Start")
    print("[Transform] Remove any old data from ppr_raw_all table")
    truncate_table()
    print("[Transform] Transform new data available in ppr_raw_all table")
    transform_new_data()
    print("[Transform] End")

In [None]:
# Import the transform script
import extract
import transform

# Call its main function
if __name__ == "__main__":
    extract.main()
    transform.main() 

------------------------------------------------------------------------------------------------------------------------------

# LOAD

<img src = "https://i.ibb.co/gvMH6BL/image.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/D8JZFyh/image-1.png" style="width:700px;height:500px">

In [None]:
from sqlalchemy import cast, Column, Integer, String, Date
# Import the function required
from sqlalchemy.orm import column_property

from base import Base

class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"

    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(255))
    # Create a unique transaction id
    transaction_id = column_property(
        date_of_sale + "_" + address + "_" + county + "_" + price
    )

class PprCleanAll(Base):
    __tablename__ = "ppr_clean_all"

    id = Column(Integer, primary_key=True)
    date_of_sale = Column(Date)
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(Integer)
    description = Column(String(255))
    # Create a unique transaction id
    # all non-string columns are casted as string
    transaction_id = column_property(
        cast(date_of_sale, String) + "_" + address + "_" + county + "_" + cast(price, String)
    )

<img src = "https://i.ibb.co/sgLP4Jj/image-2.png" style="width:700px;height:400px">

<img src = "https://i.ibb.co/c2rZtgh/image-3.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/z5CrqXh/image-4.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/qxsdwPC/image-5.png" style="width:700px;height:500px">

In [None]:
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import insert

# Select the transaction ids
clean_transaction_ids = session.query(PprCleanAll.transaction_id)

# Select the columns and cast the appropriate types if needed
transactions_to_insert = session.query(
    cast(PprRawAll.date_of_sale, Date),
    PprRawAll.address,
    PprRawAll.postal_code,
    PprRawAll.county,
    cast(PprRawAll.price, Integer),
    PprRawAll.description,
  # Filter for the new rows
).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))

# Print total number of transactions to insert
# it should be 3154 if the transactions need to be inserted
# 0, if all transactions have been inserted
print("Transactions to insert:", transactions_to_insert.count())

# Insert the rows from the previously selected transactions
columns = ["date_of_sale", "address", "postal_code",
          "county", "price","description"]
stm = insert(PprCleanAll).from_select(columns, transactions_to_insert)

# Execute and commit the statement to make changes in the database.
session.execute(stm)
session.commit()

In [None]:
# Import the delete module
from sqlalchemy import delete

# Get all the ppr_raw_all transaction ids
raw_transaction_ids = session.query(PprRawAll.transaction_id)

# Filter all the ppr_clean_all table transactions that are not present in the ppr_raw_all table
transactions_to_delete = session.query(PprCleanAll).filter(~PprCleanAll.transaction_id.in_(raw_transaction_ids))

# Print transactions to delete
print("Transactions to delete:", transactions_to_delete.count())

# Delete the selected transactions
# (Please note: the param "synchronize_session=False" has been inserted
# to avoid inconsistent results if a session expires)
transactions_to_delete.delete(synchronize_session=False)

# Commit the session to make the changes in the database
session.commit()

### Summary of Load :

In [None]:
from common.base import session
from common.tables import PprRawAll, PprCleanAll

from sqlalchemy import cast, Integer, Date
from sqlalchemy.dialects.postgresql import insert


def insert_transactions():
    """
    Insert operation: add new data
    """
    # Retrieve all the transaction ids from the clean table
    clean_transaction_ids = session.query(PprCleanAll.transaction_id)

    # date_of_sale and price needs to be casted as their
    # datatype is not string but, respectively, Date and Integer
    transactions_to_insert = session.query(
        cast(PprRawAll.date_of_sale, Date),
        PprRawAll.address,
        PprRawAll.postal_code,
        PprRawAll.county,
        cast(PprRawAll.price, Integer),
        PprRawAll.description,
    ).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))
	
    # Print total number of transactions to insert
    print("Transactions to insert:", transactions_to_insert.count())
    
    # Insert the rows from the previously selected transactions
    stm = insert(PprCleanAll).from_select(
        ["date_of_sale", "address", "postal_code", "county", "price", "description"],
        transactions_to_insert,
    )

    # Execute and commit the statement to make changes in the database.
    session.execute(stm)
    session.commit()


def delete_transactions():
    """
    Delete operation: delete any row not present in the last snapshot
    """
    # Get all ppr_raw_all transaction ids
    raw_transaction_ids = session.query(PprRawAll.transaction_id)

    # Filter all the ppt_clean_all table transactions that are not present in the ppr_raw_all table
    # and delete them.
    # Passing synchronize_session as argument for the delete method.
    transactions_to_delete = session.query(PprCleanAll).filter(
        ~PprCleanAll.transaction_id.in_(raw_transaction_ids)
    )
    
    # Print transactions to delete
    print("Transactions to delete:", transactions_to_delete.count())

    # Delete transactions
    transactions_to_delete.delete(synchronize_session=False)

    # Commit the session to make the changes in the database
    session.commit()

def main():
    print("[Load] Start")
    print("[Load] Inserting new rows")
    insert_transactions()
    print("[Load] Deleting rows not available in the new transformed data")
    delete_transactions()
    print("[Load] End")

In [None]:
import extract
import transform
import load

if __name__ == "__main__":
    extract.main()
    transform.main()
    load.main()

------------------------------------------------------------------------------------------------------------------------------

# OPERATORS

In [None]:
# Import the operator you need
from sqlalchemy import or_

# Query the clean table to retrieve the total number of
# transactions for the Dublin or Cork counties
result = session.query(PprCleanAll) \
                .filter(or_(PprCleanAll.county == "dublin", PprCleanAll.county == "cork")) \
                .all()

print("First row address:", result[0].address)

In [None]:
# Import the and function needed
from sqlalchemy import and_

# Retrieve all sales transactions for January 2021
result = session.query(PprCleanAll).filter(and_(PprCleanAll.date_of_sale >= "2021-01-01", PprCleanAll.date_of_sale <= "2021-01-31")).all()

print("First row address:", result[0].address)

In [None]:
# Get the maximum, minimum and average values for each product category
result = session.query(Products.category,
                       func.max(Products.price),
                       func.min(Products.price),
                       func.avg(Products.price)) \
                .group_by(Products.category).all()

print("Result:", result)

In [None]:
from common.base import session

# Create the view with the appropriate metrics
query = """
CREATE OR REPLACE VIEW insights AS
SELECT county,
       COUNT(*) AS sales_count,
       SUM(CAST(price AS int)) AS sales_total,
       MAX(CAST(price AS int)) AS sales_max,
       MIN(CAST(price AS int)) AS sales_min,
       AVG(CAST(price AS int))::numeric(10,2) AS sales_avg
FROM ppr_clean_all
GROUP BY county
"""

# Execute and commit
session.execute(query)
session.commit()

In [None]:
# Import the submodule required
from sqlalchemy import func

# Get the maximum, minimum and average values for each product category
result = session.query(Products.category,
                       func.max(Products.price),
                       func.min(Products.price),
                       func.avg(Products.price)) \
                .group_by(Products.category).all()

print("Result:", result)

In [None]:
session.execute('SELECT COUNT(*) FROM insights').all()

<img src = "https://i.ibb.co/pvgNgqz/image.png" style="width:700px;height:500px">

<img src = "https://i.ibb.co/jLJ9z8W/image-1.png" style="width:700px;height:500px">

In [None]:
from datetime import datetime
import os

from common.base import session
from common.tables import PprCleanAll
import xlsxwriter


# Settings
base_path = os.path.abspath(__file__ + "/../../")
ref_month = datetime.today().strftime("%Y-%m")

if __name__ == "__main__":
    data = session.execute("SELECT * FROM insights").all()
    ref_month = datetime.today().strftime("%Y-%m")
    
    # Create the workbook
    workbook = xlsxwriter.Workbook(
        f"{base_path}/insights_export/InsightsExport_202102.xlsx"
    )
    
    # Add a new worksheet
    worksheet = workbook.add_worksheet()
    worksheet.set_column("B:G", 12)
    
    # Add the table with all results in the newly created worksheet
    worksheet.add_table(
        "B3:E20",
        {
            "data": data,
            "columns": [
                {"header": "County"},
                {"header": "Number of Sales 3 month"},
                {"header": "Tot sales 3 months"},
                {"header": "Max sales 3 months"},
                {"header": "Min sales 3 months"},
                {"header": "Avg sales 3 months"},
            ],
        },
    )
    workbook.close()
    
    print("Data exported:", f"{base_path}/insights_export/InsightsExport_202102.xlsx")