In [1]:
# Import the required library
import requests
from zipfile import ZipFile
import csv
from pprint import pprint
import codecs


In [2]:
path = "https://assets.datacamp.com/production/repositories/5899/datasets/19d6cf619d6a771314f0eb489262a31f89c424c2/ppr-all.zip"
# Get the zip file
response = requests.get(path)
# Print the status code
print(response.status_code)

200


In [66]:
# Save the file locally (more about open() in the next lesson)
local_path = f"/Users/davidhoupapa/Code/etl-python-example/PPR-ALL.zip"
with open(local_path, "wb") as f:
    f.write(response.content)

In [34]:
with ZipFile(local_path, "r") as f:
    # Get the list of files
    file_names = f.namelist()
    print(file_names)
    # Extract the CSV file
    csv_file_path = f.extract(file_names[0])
    print(csv_file_path)

['ppr-all.csv']
/Users/davidhoupapa/Code/etl-python-example/ppr-all.csv


In [65]:
with open(csv_file_path) as csv_file:
    reader = csv.DictReader(csv_file)    
    # Print the first row
    row = next(reader)
    print(type(row))
    pprint(row)

<class 'dict'>
{'Address': '16 BURLEIGH COURT, BURLINGTON ROAD, DUBLIN 4',
 'County': 'Dublin',
 'Date of Sale (dd/mm/yyyy)': '03/01/2021',
 'Description of Property': 'Second-Hand Dwelling house /Apartment',
 'Not Full Market Price': 'No',
 'Postal Code': 'Dublin 4',
 'Price': '450,000.00',
 'Property Size Description': '',
 'VAT Exclusive': 'No'}


In [69]:
fieldnames = {
    "Date of Sale (dd/mm/yyyy)": "date_of_sale",
    "Address": "address",
    "Postal Code": "postal_code",
    "County": "county",
    "Price": "price",
    "Description of Property": "description",
    "Property Size Description": "size_description",
    "Not Full Market Price": "not_full_market_price",
    "VAT Exclusive": "vat_exclusive"   
}

In [None]:
local_path = f"/Users/davidhoupapa/Code/etl-python-example/tmp/PPR-ALL.zip"

In [71]:
with open(csv_file_path, mode="r") as reader_csv_file:
    reader = csv.DictReader(reader_csv_file)
    # The new file is called "PPR-2021-Dublin-new-headers.csv"
    # and will be saved inside the "tmp" folder    
    with open("/Users/davidhoupapa/Code/etl-python-example/tmp/PPR-2021-Dublin-new-headers.csv", mode="w") as writer_csv_file:
        writer = csv.DictWriter(writer_csv_file, fieldnames=fieldnames)
        # Write header as first line
        writer.writeheader()
        for row in reader:
	        # Write all rows in file
	        writer.writerow(row)

In [None]:
# Import the objects needed
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer

# Initialize the base and set inheritance
Base = declarative_base()

class PprRawAll(Base):
    # Set the table name
    __tablename__ = "ppr_raw_all"
    # Create a primary key integer column id
    id = Column(Integer, primary_key=True)

In [None]:
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import insert

# Select the transaction ids
clean_transaction_ids = session.query(PprCleanAll.transaction_id)

# Select the columns and cast the appropriate types if needed
transactions_to_insert = session.query(
    cast(PprRawAll.date_of_sale, Date),
    PprRawAll.address,
    PprRawAll.postal_code,
    PprRawAll.county,
    cast(PprRawAll.price, Integer),
    PprRawAll.description,
  # Filter for the new rows
).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))

# Print total number of transactions to insert
# it should be 3154 if the transactions need to be inserted
# 0, if all transactions have been inserted
print("Transactions to insert:", transactions_to_insert.count())

# Insert the rows from the previously selected transactions
columns = ["date_of_sale", "address", "postal_code",
          "county", "price","description"]
stm = insert(PprCleanAll).from_select(columns, transactions_to_insert)

# Execute and commit the statement to make changes in the database.
session.execute(stm)
session.commit()


In [None]:
# Import the delete module
from sqlalchemy import delete

# Get all the ppr_raw_all transaction ids
raw_transaction_ids = session.query(PprRawAll.transaction_id)

# Filter all the ppr_clean_all table transactions that are not present in the ppr_raw_all table
transactions_to_delete = session.query(PprCleanAll).filter(~PprCleanAll.transaction_id.in_(raw_transaction_ids))

# Print transactions to delete
print("Transactions to delete:", transactions_to_delete.count())

# Delete the selected transactions
# (Please note: the param "synchronize_session=False" has been inserted
# to avoid inconsistent results if a session expires)
transactions_to_delete.delete(synchronize_session=False)

# Commit the session to make the changes in the database
session.commit()

In [None]:
# Import the and function needed
from sqlalchemy import and_

# Retrieve all sales transactions for January 2021
result = session.query(PprCleanAll).filter(and_(PprCleanAll.date_of_sale>="2021-01-01"), PprCleanAll.date_of_sale<="2021-01-31").all()

print("First row address:", result[0].address)