# **ETL IN PYTHON**

# **Downloading a ZIP file**
You now have a good understanding of the steps that make up our ETL pipeline. Now, let's get started implementing it.

First thing first: you need to download the .zip file containing the new dataset that will be processed through the pipeline.

The path to the zipped file is saved in a variable called path: you can see it printed to the IPython shell.

In [1]:
# Import the required library
import requests

# Get the zip file
response = requests.get(path)

# Print the status code
print(response.status_code)

# Save the file locally (more about open() in the next lesson)
local_path = f"tmp/data/source/downloaded_at=2021-02-01/PPR-ALL.zip"
with open(local_path, "wb") as f:
    f.write(response.content)

# **Exploring a ZIP file**
You just used requests to download a zipped file, which is now stored on your system. But what we actually need is the file it contains: a CSV storing data about real estate transactions and their characteristics.

The ZIP file path on your system is saved in a variable called path, that you can once again see printed to the IPython shell.

In [None]:
# Import the required method
from zipfile import ZipFile

with ZipFile(path, mode="r") as f:
  	# Get the list of files and print it
    file_names = f.namelist()
    print(file_names)

In [None]:
# Import the required method
from zipfile import ZipFile

with ZipFile(path, "r") as f:
    # Get the list of files
    file_names = f.namelist()
    print(file_names)
    # Extract the CSV file
    csv_file_path = f.extract(file_names[0])
    print(csv_file_path)

# **Reading from a CSV file**
You're now going to explore a CSV file, PPR-2021-Dublin.csv, containing a subset of the data to be processed in the ETL pipeline.

The full path to the file is saved in a variable called path which is printed in the IPython shell on the bottom right.

You're going to open it in read mode and get familiar with its header and rows. You'll discover that each row is a dict object.

You may have noticed the from pprint import pprint statement at the top of your script. pprint() is a built-in Python function that basically prints a dictionary with each key-value pair on its own line, rather than all key-value pairs on one line. It simply makes the output more humanly readable.

In [None]:
import csv
from pprint import pprint

# Open the csv file in read mode
with open(path, mode="r", encoding="windows-1252") as csv_file:
    # Open csv_file so that each row is a dictionary
    reader = csv.DictReader(csv_file)
    
    # Print the first row
    row = next(reader)
    print(type(row))
    pprint(row)

# **Writing to CSV**
You just read a CSV file and printed the first row. You're now ready to edit the file header and save the rows in a new .csv file. Changing the file header for shorter column names without spaces will make managing columns more manageable in the long run.

Then, you will write the header and the first row into a new CSV file named PPR-2021-Dublin-new-headers.csv.

In [None]:
import csv

with open(path, mode="r", encoding="windows-1252") as reader_csv_file:
    reader = csv.DictReader(reader_csv_file)
    # The new file is called "PPR-2021-Dublin-new-headers.csv"
    # and will be saved inside the "tmp" folder    
    with open("/tmp/PPR-2021-Dublin-new-headers.csv",
                    mode="w",
                    encoding="windows-1252",
                ) as writer_csv_file:
        writer = csv.DictWriter(writer_csv_file, fieldnames=new_column_names)
        # Write header as first line
        writer.writerow(new_column_names)
        for row in reader:
	        # Write all rows in file
	        writer.writerow(row)

# **Downloading the new dataset file from web**
So far, you've downloaded a file, unzipped it, and read and wrote to a CSV. You've done all of this in independent, self-contained scripts. Now, you're going wrap it all into functions to design the Extract step of your ETL pipeline.

You now have access to an IDE (an Integrated Development Environment). Nothing to fear: instead of having just one script, you now have a whole directory around which you can navigate to manage your scripts and data. This directory is going to grow throughout the course as you write more scripts for the ETL pipeline.

But let's start with the beginning. You're going to define a first function, create_directory_if_not_exists(), to create the local directory where the data should be saved.

You will then define a second function, download_snapshot(), to download the zipped file containing the house transaction information. You will get the file from an external URL saved in a variable called source_url. You will save the zipped file locally at the location specified by the variable source_path, leveraging create_directory_if_not_exists().

You can see the source_url and source_path variables definitions in the script. You're also provided a base_path variable, which refers to the current working directory (/home/repl/workspace).

In [None]:
import os
import requests

# Paths
base_path = "/home/repl/workspace"
source_url = "https://assets.datacamp.com/production/repositories/5899/datasets/66691278303f789ca4acd3c6406baa5fc6adaf28/PPR-ALL.zip"
source_path =  f"{base_path}/data/source/downloaded_at=2021-01-01/ppr-all.zip"

# Create a directory at the `path` passed as an argument
def create_directory_if_not_exists(path):
    """
    Create a new directory if it doesn't exists
    """
    # os.path.dirname() returns up to the directory path.
    # In this case it is: f"{base_path}/downloaded_at=2021-01-01"
    # "ppr-all.zip" is excluded
    os.makedirs(os.path.dirname(path), exist_ok=True)

# Write the file obtained to the specified directory
def download_snapshot():
    """
    Download the new dataset from the source
    """
    create_directory_if_not_exists(source_path)
    # Open the .zip file in binary mode
    with open(source_path, mode="wb") as source_ppr:
        # 'verify=False' skips the verification the SSL certificate
        response = requests.get(source_url, verify=False)
        source_ppr.write(response.content)

# Download the new dataset
download_snapshot()

# **Extract 'em all!**
You're now able to create a new directory to save the updated zipped file from an external source and save it locally inside the source directory. Great job!

You're close to completing the extract process. All that's left is getting the CSV file.

The January 2021 file has already been downloaded (see source/downloaded_at=2021-01-01/PPR-RAW.zip and raw/downloaded_at=2021-01-01/ppr-raw.csv). Now a new dataset (which contains new data) for February 2021 is available: you need to download it.

Note that, in extract.py, you can see already functions responsible for:

- downloading the ZIP file, and saving it in the source folder (download_snapshot())
- extracting the CSV file and saving it in the raw folder (save_new_raw_data())
In the extract.py, you need to call the above snippets of code in the main() function. Then, you will start automating the pipeline by editing execute.py.

Notice how the file structure is consistent in every step of the process. We move the file:

- from source/downloaded_at=<YYYY-MM-DD>/PPR-all.zip
- to raw/downloaded_at=<YYYY-MM-DD>/PPR-all.csv

# **Instructions**
- Open extract.py and complete the main() function responsible to download both the ZIP and the extracted CSV files.
- Open execute.py. Import the extract script (in Python, scripts are imported without using the .py extension in the import     	statement). Then, call its main() function.

In [None]:
import os
import csv
import tempfile
from zipfile import ZipFile

import requests

# Settings
base_path = os.path.abspath(__file__ + "/../../")

# START - Paths for new February 2021 data available

# External website file url
source_url = "https://assets.datacamp.com/production/repositories/5899/datasets/66691278303f789ca4acd3c6406baa5fc6adaf28/PPR-ALL.zip"

# Source path where we want to save the .zip file downloaded from the website
source_path = f"{base_path}/data/source/downloaded_at=2021-02-01/PPR-ALL.zip"

# Raw path where we want to extract the new .csv data
raw_path = f"{base_path}/data/raw/downloaded_at=2021-02-01/ppr-all.csv"

# END - Paths for new February 2021 data available


def create_folder_if_not_exists(path):
    """
    Create a new folder if it doesn't exists
    """
    os.makedirs(os.path.dirname(path), exist_ok=True)


def download_snapshot():
    """
    Download the new dataset from the source
    """
    create_folder_if_not_exists(source_path)
    with open(source_path, "wb") as source_ppr:
        response = requests.get(source_url, verify=False)
        source_ppr.write(response.content)


def save_new_raw_data():
    """
    Save new raw data from the source
    """

    create_folder_if_not_exists(raw_path)
    with tempfile.TemporaryDirectory() as dirpath:
        with ZipFile(
            source_path,
            "r",
        ) as zipfile:
            names_list = zipfile.namelist()
            csv_file_path = zipfile.extract(names_list[0], path=dirpath)
            # Open the CSV file in read mode
            with open(csv_file_path, mode="r", encoding="windows-1252") as csv_file:
                reader = csv.DictReader(csv_file)

                row = next(reader)  # Get first row from reader
                print("[Extract] First row example:", row)

                # Open the CSV file in write mode
                with open(
                    raw_path,
                    mode="w",
                    encoding="windows-1252"
                ) as csv_file:
                    # Rename field names so they're ready for the next step
                    fieldnames = {
                        "Date of Sale (dd/mm/yyyy)": "date_of_sale",
                        "Address": "address",
                        "Postal Code": "postal_code",
                        "County": "county",
                        "Price (€)": "price",
                        "Description of Property": "description",
                    }
                    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
                    # Write headers as first line
                    writer.writerow(fieldnames)
                    for row in reader:
                        # Write all rows in file
                        writer.writerow(row)

# Main function called inside the execute.py script
def main():
    print("[Extract] Start")
    print("[Extract] Downloading snapshot")
    download_snapshot()
    print(f"[Extract] Saving data from '{source_path}' to '{raw_path}'")
    save_new_raw_data()
    print(f"[Extract] End")

In [None]:
import extract

if __name__ == "__main__":
    extract.main()

# **SQLAlchemy core components**
You now know about SQLAlchemy engines and sessions, the two core components needed to work and communicate with SQL databases in Python.

### _**
### An engine is used to manage SQL dialects and connectors. It lets you interact with a database while a session establishes all conversations with the database and represents a "holding area" before committing all changes to the database.**_

# **Engines and sessions**
As you know by now, engines and sessions are key components enabling SQLAlchemy to interact with a database.

So let's create an engine and bind it to a session.

Remember:

postgresql is the dialect
psycopg2 is the connector
you're working on a local server, localhost

In [None]:
# Import the function needed
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Create the engine
engine = create_engine("postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod")

# Create the session
session = Session(engine)

# **Table class definition**
You already know that you can use engines and sessions to connect and edit a database. Now, you're going to define a Python class to create a SQL table.

But first, we need to map the Python table class with the database table called ppr_raw_all.

In [None]:
# Import the objects needed
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer

# Initialize the base and set inheritance
Base = declarative_base()

class PprRawAll(Base):
    # Set the table name
    __tablename__ = "ppr_raw_all"
    # Create a primary key integer column id
    id = Column(Integer, primary_key=True)

# **Columns definition**
You just mapped a Python class to a PostgreSQL table, declaring a table name and a primary key.

You can now declare the rest of the columns, according to this table definition: Table name: ppr_raw_all

- Column name	type
- id	integer
- date_of_sale	varchar(55)
- address	varchar(255)
- postal_code	varchar(55)
- county	varchar(55)
- price	varchar(55)
- description	varchar(55)

In [None]:
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"
    
    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(55))

# **Lower string and date format**
You now need to prepare and transform the data.

In particular, you're going to start by defining two functions:

transform_case() to lowercase strings, taking a generic string as an argument
update_date_of_sale(), to update the date format, taking a date format string (e.g. 12/02/2021) as an argument

In [None]:
# Import the submodule required
from datetime import datetime

def transform_case(input_string):
    """
    Lowercase string fields
    """
    # Return the string lowercase
    return input_string.lower()
  
def update_date_of_sale(date_input):
    """
    Update date format from DD/MM/YYYY to YYYY-MM-DD
    """
    # Create a datetime object
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    # Convert to the expected date format
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format

# **Price and description**
Strings can now be lowered and dates can be converted to the expected format. Let's now clean the price and description columns.

You're going to develop two functions:

update_price(), which takes a string like €200,000.00 as input and returns an integer
update_description(), which gets a description ("second-hand dwelling house /apartment" or "new dwelling house /apartment") as input and returns a new or second-hand.

In [None]:
from datetime import datetime

def transform_case(input_string):
    """
    Lowercase string fields
    """
    return input_string.lower()
  
def update_date_of_sale(date_input):
    """
    Updates date format from DD/MM/YYYY to YYYY-MM-DD
    """
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format

def update_price(price_input):
    """
    Returns price as an integer by removing:
    - "€" and "," symbol
    - Converting to float first then to integer
    """
    # Replace € with an empty string
    price_input = price_input.replace("€", "")
    # Replace comma with an empty string
    price_input = price_input.replace(",", "")
    # Convert to float
    price_input = float(price_input)
    # Return price_input as integer
    return int(price_input)
  
def update_description(description_input):
    """
    Simplifies the description field for future analysis. Returns:
    - "new" if string contains "new" substring
    - "second-hand" if string contains "second-hand" substring
    """
    description_input = transform_case(description_input)
    # Check description and return "new" or "second-hand"
    if "new" in description_input:
        return "new"
    elif "second-hand" in description_input:
        return "second-hand"
    return description_input

# **Setup base script**
You know how to connect to a database, apply changes to a CSV, and create tables. Now you're going to bring all of this together.

Your first action item is to create a base.py file. it will contain the engine and base that other scripts will rely upon.

# **Instruction**
- Import the modules you need to create engines, set up sessions, and declare bases.
- Create the engine, using the following URI: postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod.
- Initialize the session object.
- Initialize the declarative base.

In [None]:
# Import the modules required
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, Session

# Create the engine
engine = create_engine("postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod")

# Initialize the session
session = Session(engine)

# Initialize the declarative base
Base = declarative_base()

# **Create tables**
Having set up your engine, session and base in base.py, you're ready to create your first table and commit changes directly to the PostgreSQL database. You will do so in create_tables.py.

Remember that the base object has metadata about the schema and tables. It can also call methods to create missing tables.

# **Instruction**
- Import the PprRawAll class corresponding to the table you want to create in the SQL database. The table class is defined in common/tables.py.
- Use the metadata object from Base to create the table. Remember that you need to establish a connection to the database in order to push changes.
- 


In [None]:
from base import Base, engine
# Import the PprRawAll table
from tables import PprRawAll

# Create the table in the database
if __name__ == "__main__":
    Base.metadata.create_all(engine)

# **Transform 'em all!**
At this point, you have set up utilities to:

change the date format, manipulate strings, update values, and more
connect to a PostgreSQL database using Python
map Python classes to PostgreSQL tables
You have an existing empty table: now it's time to wrap it all up and save the transformed data in the previously created PprRawAll table.

You have to work on the transform.py script and, in particular, the main() function.

Some information to keep in mind:

We're moving the data from a CSV file to a table called PprRawAll.
This will make it easy to make transformation on the data by using python and SQL.
The transform_new_data() function is responsible to apply all transformations and save the newly updated rows into the database table PprRawAll.
The end result for ppr_raw_all table in our database will be something like the following:

PprRawAll table content

You might also notice the function truncate_table(), which you're seeing for the first time. It ensures that the ppr_raw_all table is empty every time the script is called (for example to avoid any inconsistent state if a job fails).

# Instruction
- Just as you did at the end of Chapter 1, complete the main() function. First, you need to truncate the table and then run all the transformations.
- Edit execute.py to import the transform script and call its main function, enabling you to automate the process.

# transform.py

In [None]:
import os
import csv
from datetime import datetime

from common.tables import PprRawAll
from common.base import session
from sqlalchemy import text

# Settings
base_path = os.path.abspath(__file__ + "/../../")

# START - Paths for new February 2021 data available

# Raw path where we want to extract the new CSV data
raw_path = f"{base_path}/data/raw/downloaded_at=2021-02-01/ppr-all.csv"

# END - Paths for new February 2021 data available


def transform_case(input_string):
    """
    Lowercase string fields
    """
    return input_string.lower()


def update_date_of_sale(date_input):
    """
    Update date format from DD/MM/YYYY to YYYY-MM-DD
    """
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format


def update_description(description_input):
    """
    Simplify the description field for potentialy future analysis, just return:
    - "new" if string contains "new" substring
    - "second-hand" if string contains "second-hand" substring
    """
    description_input = transform_case(description_input)
    if "new" in description_input:
        return "new"
    elif "second-hand" in description_input:
        return "second-hand"
    return description_input


def update_price(price_input):
    """
    Return price as integer by removing:
    - "€" symbol
    - "," to convert the number into float first (e.g. from "€100,000.00" to "100000.00")
    """
    price_input = price_input.replace("€", "")
    price_input = float(price_input.replace(",", ""))
    return int(price_input)


def truncate_table():
    """
    Ensure that "ppr_raw_all" table is always in empty state before running any transformations.
    And primary key (id) restarts from 1.
    """
    session.execute(
        text("TRUNCATE TABLE ppr_raw_all;ALTER SEQUENCE ppr_raw_all_id_seq RESTART;")
    )
    session.commit()


def transform_new_data():
    """
    Apply all transformations for each row in the .csv file before saving it into database
    """
    with open(raw_path, mode="r", encoding="windows-1252") as csv_file:
        # Read the new CSV snapshot ready to be processed
        reader = csv.DictReader(csv_file)
        # Initialize an empty list for our PprRawAll objects
        ppr_raw_objects = []
        for row in reader:
            # Apply transformations and save as PprRawAll object
            ppr_raw_objects.append(
                PprRawAll(
                    date_of_sale=update_date_of_sale(row["date_of_sale"]),
                    address=transform_case(row["address"]),
                    postal_code=transform_case(row["postal_code"]),
                    county=transform_case(row["county"]),
                    price=update_price(row["price"]),
                    description=update_description(row["description"]),
                )
            )
        # Save all new processed objects and commit
        session.bulk_save_objects(ppr_raw_objects)
        session.commit()


def main():
    print("[Transform] Start")
    print("[Transform] Remove any old data from ppr_raw_all table")
    truncate_table()
    print("[Transform] Transform new data available in ppr_raw_all table")
    transform_new_data()
    print("[Transform] End")

# execute.py

In [None]:
# Import the transform script
import extract
import transform

# Call its main function
if __name__ == "__main__":
    extract.main()
    transform.main()  

# **Date data type definition**
You've been introduced to two new concepts: the date data type, and the column uniqueness constraint. You've also learned about the existence of another table, the clean table.

Just as the raw table refers to ppr_raw_all, the clean table refers ppr_clean_all, which stores the complete and updated dataset used by analysts.

Let's start by defining a new column data type for the table ppr_clean_all.

You're going to work in the tables.py and create_tables.py files.

# Create_tables.py

In [None]:
from base import Base, engine
# Import the class corresponding to the clean table
from tables import PprRawAll, PprCleanALL

if __name__ == "__main__":
    Base.metadata.create_all(engine)

# tables.py

In [None]:
# Import the class you need
from sqlalchemy import Column, Integer, String, Date

from base import Base


class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"

    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(255))

class PprCleanAll(Base):
    __tablename__ = "ppr_clean_all"

    id = Column(Integer, primary_key=True)
    # Create a new column of type Date
    date_of_sale = Column(Date)
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(Integer)
    description = Column(String(255))

# Unique key definition
You just added a Date column. Now it's time to create the unique column needed for insert and delete operations between the raw table and the clean one.

The unique key should be a string called transaction_id. It will be a concatenation of four columns (date_of_sale, address, county and price) separated by a dash (_).

You're going to define transaction_id as a column_property for both PprRawAll and PprCleanAll.

Let's take this row as an example:

- date_of_sale	address	postal_code	county	price	description
- 2021-02-12	123 walkinstown park, walkinstown, dublin 12	dublin 12	dublin	297000	second-hand
- Its transaction_id value will be "2021-02-12_123 walkinstown park, walkinstown, dublin 12_dublin_297000".

You're going to work in tables.py.

# Instruction
- Import the function you need to create a unique key.
- Define the transaction_id on PprRawAll as a column property with the needed columns.
- Define the transaction_id on PprCleanAll as a column property with the needed columns. Some of the columns will need to be converted to string format.

In [None]:
from sqlalchemy import cast, Column, Integer, String, Date
# Import the function required
from sqlalchemy.orm import column_property

from base import Base

class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"

    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(255))
    # Create a unique transaction id
    transaction_id = column_property(
        date_of_sale + "_" + address + "_" + county + "_" + price
    )

class PprCleanAll(Base):
    __tablename__ = "ppr_clean_all"

    id = Column(Integer, primary_key=True)
    date_of_sale = Column(Date)
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(Integer)
    description = Column(String(255))
    # Create a unique transaction id
    # all non-string columns are casted as string
    transaction_id = column_property(
        cast(date_of_sale, String) + "_" + address + "_" + county + "_" + cast(price, String)
    )

# Querying
You've previously defined a PprRawAll class corresponding to the raw table. Now you're going to query it.

You need to retrieve the row with id equal to 2 and print the corresponding address value.

session and PprRawAll classes are already initialized and ready to use.

# Instruction
- Query the PprRawAll table, and filter for the row whose id column value is equal to 2.
- Print the address from the first row.

In [None]:
# Query the session to get row with id equal to 2
results = session.query(PprRawAll).filter(PprRawAll.id == 2)

# Get the corresponding address
print("Address:", results[0].address)

# **Insert and delete**
Let's now focus on our load phase. It relies heavily on two fundamental operations:

inserting rows in a table
deleting rows from a table
session and PprCleanAll classes are already initialized and re

# Instruction
Import the function you need to insert rows.
Insert the predefined values in the PprCleanAll table.
Execute the statement and [commit.]()

In [None]:
# Import the function required
from sqlalchemy.dialects.postgresql import insert
  
values = [{"date_of_sale": "2021-01-01",
           "address": "14 bow street",
           "postal_code": "dublin 7",
           "county": "dublin",
           "price": 350000,
           "description":"second-hand"}]

# Insert values in PprCleanAll
stmt = insert(PprCleanAll).values(values)

# Execute and commit
session.execute(stmt)
session.commit()

# Instruction
Import the function you need to delete rows.
Delete all the rows that don't have a description.
Execute the statement and commit.

In [None]:
# Import the function required
from sqlalchemy import delete

# Delete rows lacking a description value
stmt = delete(PprCleanAll).filter(PprCleanAll.description=="")

# Execute and commit
session.execute(stmt)
session.commit()

# Insert operation
Time to insert some data in the clean table! You're going to build upon the previous exercises and add all the new rows that are in the raw table, but not yet in the clean one.

The session, PprRawAll and PprCleanAll classes are already initialized and ready to use.

# Instructions
- Select the transaction ids from the clean table.
- Select the columns that need to be inserted: date of sale, address, postal code, county, price and description. The date of sale should be a date and the price an integer.
- Filter for the new rows (rows that are in the raw table, but not in the clean one).
- Use the list of new rows to complete the insert statement.

In [None]:
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import insert

# Select the transaction ids
clean_transaction_ids = session.query(PprCleanAll.transaction_id)

# Select the columns and cast the appropriate types if needed
transactions_to_insert = session.query(
    cast(PprRawAll.date_of_sale, Date),
    PprRawAll.address,
    PprRawAll.postal_code,
    PprRawAll.county,
    cast(PprRawAll.price, Integer),
    PprRawAll.description,
  # Filter for the new rows
).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))

# Print total number of transactions to insert
# it should be 3154 if the transactions need to be inserted
# 0, if all transactions have been inserted
print("Transactions to insert:", transactions_to_insert.count())

# Insert the rows from the previously selected transactions
columns = ["date_of_sale", "address", "postal_code",
          "county", "price","description"]
stm = insert(PprCleanAll).from_select(columns, transactions_to_insert)

# Execute and commit the statement to make changes in the database.
session.execute(stm)
session.commit()

# Delete operation
After inserting new rows from the raw table to the clean table, you need to remove rows that are in the clean table but no longer in the raw one.

The session, PprRawAll and PprCleanAll classes are already initialized and ready to use.

# Instruction
- Get the list of all transactions from the ppr_raw_all table.
- Query the clean table, and filter out the rows that are in the clean table but not in the list of of raw transactions you just created.
- Delete these rows.

In [None]:
# Import the delete module
from sqlalchemy import delete

# Get all the ppr_raw_all transaction ids
raw_transaction_ids = session.query(PprRawAll.transaction_id)

# Filter all the ppr_clean_all table transactions that are not present in the ppr_raw_all table
transactions_to_delete = session.query(PprCleanAll).filter(~PprCleanAll.transaction_id.in_(raw_transaction_ids))

# Print transactions to delete
print("Transactions to delete:", transactions_to_delete.count())

# Delete the selected transactions
# (Please note: the param "synchronize_session=False" has been inserted
# to avoid inconsistent results if a session expires)
transactions_to_delete.delete(synchronize_session=False)

# Commit the session to make the changes in the database
session.commit()

# Load 'em all!
You're now ready to complete the last step of your ETL pipeline, and bring everything you learned in this chapter together.

Notice that the insert and delete operations you just wrote have been wrapped into the corresponding functions insert_transactions() and delete_transactions().

You now need to make sure the load operation can be automated, like you did for the extract and transform operations at the end of Chapters 1 and 2.

# Instruction
- Just as you did at the end of Chapter 1, complete the load.py main() function.
- Then, edit execute.py to import the load script and call its main function, enabling you to automate the process.

# Load.py

In [None]:
from common.base import session
from common.tables import PprRawAll, PprCleanAll

from sqlalchemy import cast, Integer, Date
from sqlalchemy.dialects.postgresql import insert


def insert_transactions():
    """
    Insert operation: add new data
    """
    # Retrieve all the transaction ids from the clean table
    clean_transaction_ids = session.query(PprCleanAll.transaction_id)

    # date_of_sale and price needs to be casted as their
    # datatype is not string but, respectively, Date and Integer
    transactions_to_insert = session.query(
        cast(PprRawAll.date_of_sale, Date),
        PprRawAll.address,
        PprRawAll.postal_code,
        PprRawAll.county,
        cast(PprRawAll.price, Integer),
        PprRawAll.description,
    ).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))
	
    # Print total number of transactions to insert
    print("Transactions to insert:", transactions_to_insert.count())
    
    # Insert the rows from the previously selected transactions
    stm = insert(PprCleanAll).from_select(
        ["date_of_sale", "address", "postal_code", "county", "price", "description"],
        transactions_to_insert,
    )

    # Execute and commit the statement to make changes in the database.
    session.execute(stm)
    session.commit()


def delete_transactions():
    """
    Delete operation: delete any row not present in the last snapshot
    """
    # Get all ppr_raw_all transaction ids
    raw_transaction_ids = session.query(PprRawAll.transaction_id)

    # Filter all the ppt_clean_all table transactions that are not present in the ppr_raw_all table
    # and delete them.
    # Passing synchronize_session as argument for the delete method.
    transactions_to_delete = session.query(PprCleanAll).filter(
        ~PprCleanAll.transaction_id.in_(raw_transaction_ids)
    )
    
    # Print transactions to delete
    print("Transactions to delete:", transactions_to_delete.count())

    # Delete transactions
    transactions_to_delete.delete(synchronize_session=False)

    # Commit the session to make the changes in the database
    session.commit()

def main():
    print("[Load] Start")
    print("[Load] Inserting new rows")
    insert_transactions()
    print("[Load] Deleting rows not available in the new transformed data")
    delete_transactions
    print("[Load] End")

# execute.py

In [None]:
import extract
import transform
import load

if __name__ == "__main__":
    extract.main()
    transform.main()
    load.main()

# Sales for Dublin and Cork
One of the DCG Capital shareholders wants to get all the sales transactions for properties located in the Dublin or Cork counties.

The session and PprCleanAll class are already initialized and ready to use.

**Instructions**

- Import the operator you need to get sales transactions in the Dublin or Cork counties.
- Query all the requested transactions.
- Filter to get only Dublin and Cork.

In [None]:
# Import the operator you need
from sqlalchemy import or_

# Query the clean table to retrieve the total number of
# transactions for the Dublin or Cork counties
result = session.query(PprCleanAll) \
                .filter(or_(PprCleanAll.county == "dublin", PprCleanAll.county == "cork")) \
                .all()

print("First row address:", result[0].address)

# First month 2021 sales
Pleased by your previous report, the same shareholder immediately asks you to list all the sales transactions that happened in January 2021.

The session and PprCleanAll class are already initialized and ready to use.

**Instructions**

- Import the operator needed.
- Query all the requested transactions.
- Filter for January sales only: from 2021-01-01 to 2021-01-31.



In [None]:
# Import the and function needed
from sqlalchemy import and_

# Retrieve all sales transactions for January 2021
result = session.query(PprCleanAll).filter(and_(PprCleanAll.date_of_sale >= "2021-01-01", PprCleanAll.date_of_sale <= "2021-01-31")).all()

print("First row address:", result[0].address)

# Average, max and min functions
Let's consider the following product table:

- id	category	name	price
- 1	books	Sapiens	12
- 2	electronics	iPhone 12	900
- 3	books	Measure What Matters	10
- 4	books	Greenlights	14
- 5	electronics	Macbook Pro 13	1500
- Your goal is to calculate the maximum, minimum and average values of these products' prices, for each category.

The session and Products classes are already defined.

**Instructions**

- Import the submodule you need to perform aggregate functions in SQLAlchemy.
- Calculate the maximum, minimum and average values of the price column.
- Make sure you get these aggregates for each category of product.

In [None]:
# Import the submodule required
from sqlalchemy import func

# Get the maximum, minimum, and average values for each product category
result = session.query(Products.category,
                       func.max(Products.price),
                       func.min(Products.price),
                       func.avg(Products.price)) \
                .group_by(Products.category).all()

print("Result:", result)

# Creating the insights view
You've been tasked with creating a view called insights that contains insights shareholders are regularly asking for.

**Instructions**

- Create the new view insights or replace it if it already exists.
- The view needs to indicate the total number of sales, the sales total, the max sale, the min sale and the sales average.
- Execute the query to create the view, then commit the changes to the database.

In [None]:
from common.base import session

# Create the view with the appropriate metrics
query = """
CREATE OR Replace view insights AS
SELECT county,
       Count(*) AS sales_count,
       SUM(CAST(price AS int)) AS sales_total,
       MAX(CAST(price AS int)) AS sales_max,
       MIN(CAST(price AS int)) AS sales_min,
       AVG(CAST(price AS int))::numeric(10,2) AS sales_avg
FROM ppr_clean_all
GROUP BY county
"""

# Execute and commit
session.execute(query)
session.commit()

# How many counties?
The shareholder is visiting the office today and heard you were done creating the view she requested. She stops by your desk to ask if you could tell her how many counties are present in the insights view.

Remember:

- The insights view is grouped by county.
- You can use raw SQL to query the insights table.
- You need to build something like SELECT ____ FROM insights and use it in the session.execute().all() method.
- ____ should be replaced by the SQL function to count the number of rows…which means, the number of counties.
- The session is already available and ready to use.

In [None]:
In [1]:
sql_query = "SELECT COUNT(DISTINCT county) FROM insights"
In [2]:
session.execute(sql_query).all()
Out[2]:
[(26,)]

# Create a simple Excel file
- Here is an Excel table. Let's create it using Python.

- ![image](image.png)
**Instructions**

- Import the library you need to work with Excel files.
- Create a new workbook.
- Create a new worksheet from the current workbook.
- Add Hello in cell (A1) and Datacamp in cell (B1).

In [None]:
# Import the library needed
import xlsxwriter

# Create a new Excel file
workbook = xlsxwriter.Workbook("insights.xlsx")

# Initialize a worksheet
worksheet = workbook.add_worksheet()

# Write the data in the current sheet
data = ["Hello", "Datacamp"]
worksheet.write(0, 0, data[0])
worksheet.write(1, 1, data[1])

# Close the file
workbook.close()

# Add a table into Excel file
- You're now tasked with creating an Excel file containing an entire table. The end result should look like this:
- ![image-2](image-2.png)


In [None]:
import xlsxwriter

workbook = xlsxwriter.Workbook("Products.xlsx")
worksheet = workbook.add_worksheet()
    
# Create a table with the available data in the current sheet
worksheet.add_table(
    "B3:E8",
    {
        "data": data,
        "columns": [
          	# Use the appropriate names for the columns
            {"header": "id"},
             {"header": "category"},
             {"header": "name"},
             {"header": "price"},
        ],
    },
)

# Close the current file
workbook.close()

# Export monthly insights
You're now ready to create a monthly report to be sent out to the shareholders. As a Business Analyst, you need to build the right query to retrieve the data and export it to an Excel file. Let's put everything together in a single script called insights_export.py, and generate monthly insights! The exported Excel file will be saved in the insights_export folder.

Remember that for the previous month, and for each country, the shareholders want to know:

- the number of sales
- the total sales
- the highest price a property sold for
- the lowest price a property sold for
- the average sales prices
- The results are saved in the file called InsightsExport_YYYYMM.xlsx

In [None]:
from datetime import datetime
import os

from common.base import session
from common.tables import PprCleanAll
import xlsxwriter


# Settings
base_path = os.path.abspath(__file__ + "/../../")
ref_month = datetime.today().strftime("%Y-%m")

if __name__ == "__main__":
    data = session.execute("SELECT * FROM insights").all()
    ref_month = datetime.today().strftime("%Y-%m")
    
    # Create the workbook
    workbook = xlsxwriter.Workbook(
        f"{base_path}/insights_export/InsightsExport_202102.xlsx"
    )
    
    # Add a new worksheet
    worksheet = workbook.add_worksheet()
    worksheet.set_column("B:G", 12)
    
    # Add the table with all results in the newly created worksheet
    worksheet.add_table(
        "B3:E20",
        {
            "data": data,
            "columns": [
                {"header": "County"},
                {"header": "Number of Sales 3 month"},
                {"header": "Tot sales 3 months"},
                {"header": "Max sales 3 months"},
                {"header": "Min sales 3 months"},
                {"header": "Avg sales 3 months"},
            ],
        },
    )
    workbook.close()
    
    print("Data exported:", f"{base_path}/insights_export/InsightsExport_202102.xlsx")